diff options
Diffstat (limited to 'src/libstore/s3-binary-cache-store.cc')
-rw-r--r-- | src/libstore/s3-binary-cache-store.cc | 178 |
1 files changed, 114 insertions, 64 deletions
diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index ccb71f1ee..3053f908c 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -1,15 +1,17 @@ -#include "config.h" - #if ENABLE_S3 -#if __linux__ +#include "s3.hh" #include "s3-binary-cache-store.hh" #include "nar-info.hh" #include "nar-info-disk-cache.hh" #include "globals.hh" +#include "compression.hh" +#include "download.hh" +#include "istringstream_nocopy.hh" #include <aws/core/Aws.h> #include <aws/core/client/ClientConfiguration.h> +#include <aws/core/client/DefaultRetryStrategy.h> #include <aws/s3/S3Client.h> #include <aws/s3/model/CreateBucketRequest.h> #include <aws/s3/model/GetBucketLocationRequest.h> @@ -20,15 +22,6 @@ namespace nix { -struct istringstream_nocopy : public std::stringstream -{ - istringstream_nocopy(const std::string & s) - { - rdbuf()->pubsetbuf( - (char *) s.data(), s.size()); - } -}; - struct S3Error : public Error { Aws::S3::S3Errors err; @@ -62,21 +55,92 @@ static void initAWS() }); } +S3Helper::S3Helper(const string & region) + : config(makeConfig(region)) + , client(make_ref<Aws::S3::S3Client>(*config)) +{ +} + +/* Log AWS retries. */ +class RetryStrategy : public Aws::Client::DefaultRetryStrategy +{ + long CalculateDelayBeforeNextRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>& error, long attemptedRetries) const override + { + auto res = Aws::Client::DefaultRetryStrategy::CalculateDelayBeforeNextRetry(error, attemptedRetries); + printError("AWS error '%s' (%s), will retry in %d ms", + error.GetExceptionName(), error.GetMessage(), res); + return res; + } +}; + +ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig(const string & region) +{ + initAWS(); + auto res = make_ref<Aws::Client::ClientConfiguration>(); + res->region = region; + res->requestTimeoutMs = 600 * 1000; + res->retryStrategy = std::make_shared<RetryStrategy>(); + res->caFile = settings.caFile; + return res; +} + +S3Helper::DownloadResult S3Helper::getObject( + const std::string & bucketName, const std::string & key) +{ + debug("fetching ‘s3://%s/%s’...", bucketName, key); + + auto request = + Aws::S3::Model::GetObjectRequest() + .WithBucket(bucketName) + .WithKey(key); + + request.SetResponseStreamFactory([&]() { + return Aws::New<std::stringstream>("STRINGSTREAM"); + }); + + DownloadResult res; + + auto now1 = std::chrono::steady_clock::now(); + + try { + + auto result = checkAws(fmt("AWS error fetching ‘%s’", key), + client->GetObject(request)); + + res.data = decodeContent( + result.GetContentEncoding(), + make_ref<std::string>( + dynamic_cast<std::stringstream &>(result.GetBody()).str())); + + } catch (S3Error & e) { + if (e.err != Aws::S3::S3Errors::NO_SUCH_KEY) throw; + } + + auto now2 = std::chrono::steady_clock::now(); + + res.durationMs = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count(); + + return res; +} + struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore { std::string bucketName; - ref<Aws::Client::ClientConfiguration> config; - ref<Aws::S3::S3Client> client; - Stats stats; + S3Helper s3Helper; + + std::string narinfoCompression, lsCompression, logCompression; + S3BinaryCacheStoreImpl( const Params & params, const std::string & bucketName) : S3BinaryCacheStore(params) , bucketName(bucketName) - , config(makeConfig()) - , client(make_ref<Aws::S3::S3Client>(*config)) + , s3Helper(get(params, "aws-region", Aws::Region::US_EAST_1)) + , narinfoCompression(get(params, "narinfo-compression", "")) + , lsCompression(get(params, "ls-compression", "")) + , logCompression(get(params, "log-compression", "")) { diskCache = getNarInfoDiskCache(); } @@ -86,15 +150,6 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore return "s3://" + bucketName; } - ref<Aws::Client::ClientConfiguration> makeConfig() - { - initAWS(); - auto res = make_ref<Aws::Client::ClientConfiguration>(); - res->region = Aws::Region::US_EAST_1; // FIXME: make configurable - res->requestTimeoutMs = 600 * 1000; - return res; - } - void init() override { if (!diskCache->cacheExists(getUri(), wantMassQuery_, priority)) { @@ -102,7 +157,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore /* Create the bucket if it doesn't already exists. */ // FIXME: HeadBucket would be more appropriate, but doesn't return // an easily parsed 404 message. - auto res = client->GetBucketLocation( + auto res = s3Helper.client->GetBucketLocation( Aws::S3::Model::GetBucketLocationRequest().WithBucket(bucketName)); if (!res.IsSuccess()) { @@ -110,7 +165,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore throw Error(format("AWS error checking bucket ‘%s’: %s") % bucketName % res.GetError().GetMessage()); checkAws(format("AWS error creating bucket ‘%s’") % bucketName, - client->CreateBucket( + s3Helper.client->CreateBucket( Aws::S3::Model::CreateBucketRequest() .WithBucket(bucketName) .WithCreateBucketConfiguration( @@ -148,7 +203,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore { stats.head++; - auto res = client->HeadObject( + auto res = s3Helper.client->HeadObject( Aws::S3::Model::HeadObjectRequest() .WithBucket(bucketName) .WithKey(path)); @@ -164,13 +219,20 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore return true; } - void upsertFile(const std::string & path, const std::string & data) override + void uploadFile(const std::string & path, const std::string & data, + const std::string & mimeType, + const std::string & contentEncoding) { auto request = Aws::S3::Model::PutObjectRequest() .WithBucket(bucketName) .WithKey(path); + request.SetContentType(mimeType); + + if (contentEncoding != "") + request.SetContentEncoding(contentEncoding); + auto stream = std::make_shared<istringstream_nocopy>(data); request.SetBody(stream); @@ -181,7 +243,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore auto now1 = std::chrono::steady_clock::now(); auto result = checkAws(format("AWS error uploading ‘%s’") % path, - client->PutObject(request)); + s3Helper.client->PutObject(request)); auto now2 = std::chrono::steady_clock::now(); @@ -193,6 +255,19 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore stats.putTimeMs += duration; } + void upsertFile(const std::string & path, const std::string & data, + const std::string & mimeType) override + { + if (narinfoCompression != "" && hasSuffix(path, ".narinfo")) + uploadFile(path, *compress(narinfoCompression, data), mimeType, narinfoCompression); + else if (lsCompression != "" && hasSuffix(path, ".ls")) + uploadFile(path, *compress(lsCompression, data), mimeType, lsCompression); + else if (logCompression != "" && hasPrefix(path, "log/")) + uploadFile(path, *compress(logCompression, data), mimeType, logCompression); + else + uploadFile(path, data, mimeType, ""); + } + void getFile(const std::string & path, std::function<void(std::shared_ptr<std::string>)> success, std::function<void(std::exception_ptr exc)> failure) override @@ -200,42 +275,18 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore sync2async<std::shared_ptr<std::string>>(success, failure, [&]() { debug(format("fetching ‘s3://%1%/%2%’...") % bucketName % path); - auto request = - Aws::S3::Model::GetObjectRequest() - .WithBucket(bucketName) - .WithKey(path); - - request.SetResponseStreamFactory([&]() { - return Aws::New<std::stringstream>("STRINGSTREAM"); - }); - stats.get++; - try { - - auto now1 = std::chrono::steady_clock::now(); + auto res = s3Helper.getObject(bucketName, path); - auto result = checkAws(format("AWS error fetching ‘%s’") % path, - client->GetObject(request)); + stats.getBytes += res.data ? res.data->size() : 0; + stats.getTimeMs += res.durationMs; - auto now2 = std::chrono::steady_clock::now(); + if (res.data) + printTalkative("downloaded ‘s3://%s/%s’ (%d bytes) in %d ms", + bucketName, path, res.data->size(), res.durationMs); - auto res = dynamic_cast<std::stringstream &>(result.GetBody()).str(); - - auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count(); - - printMsg(lvlTalkative, format("downloaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms") - % bucketName % path % res.size() % duration); - - stats.getBytes += res.size(); - stats.getTimeMs += duration; - - return std::make_shared<std::string>(res); - - } catch (S3Error & e) { - if (e.err == Aws::S3::S3Errors::NO_SUCH_KEY) return std::shared_ptr<std::string>(); - throw; - } + return res.data; }); } @@ -248,7 +299,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore debug(format("listing bucket ‘s3://%s’ from key ‘%s’...") % bucketName % marker); auto res = checkAws(format("AWS error listing bucket ‘%s’") % bucketName, - client->ListObjects( + s3Helper.client->ListObjects( Aws::S3::Model::ListObjectsRequest() .WithBucket(bucketName) .WithDelimiter("/") @@ -286,4 +337,3 @@ static RegisterStoreImplementation regStore([]( } #endif -#endif |