diff options
-rw-r--r-- | release.nix | 9 | ||||
-rw-r--r-- | src/libstore/local.mk | 2 | ||||
-rw-r--r-- | src/libstore/s3-binary-cache-store.cc | 218 | ||||
-rw-r--r-- | src/libstore/s3-binary-cache-store.hh | 34 |
4 files changed, 261 insertions, 2 deletions
diff --git a/release.nix b/release.nix index b81571e2d..9d9923afc 100644 --- a/release.nix +++ b/release.nix @@ -71,7 +71,14 @@ let src = tarball; buildInputs = - [ curl perl bzip2 xz openssl pkgconfig sqlite boehmgc ] + [ curl perl bzip2 xz openssl pkgconfig sqlite boehmgc + + (aws-sdk-cpp.override { + apis = ["s3"]; + customMemoryManagement = false; + }) + + ] ++ lib.optional stdenv.isLinux libsodium; configureFlags = '' diff --git a/src/libstore/local.mk b/src/libstore/local.mk index 9a01596c3..15fa91b5c 100644 --- a/src/libstore/local.mk +++ b/src/libstore/local.mk @@ -8,7 +8,7 @@ libstore_SOURCES := $(wildcard $(d)/*.cc) libstore_LIBS = libutil libformat -libstore_LDFLAGS = $(SQLITE3_LIBS) -lbz2 $(LIBCURL_LIBS) $(SODIUM_LIBS) +libstore_LDFLAGS = $(SQLITE3_LIBS) -lbz2 $(LIBCURL_LIBS) $(SODIUM_LIBS) -laws-cpp-sdk-s3 -laws-cpp-sdk-core ifeq ($(OS), SunOS) libstore_LDFLAGS += -lsocket diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc new file mode 100644 index 000000000..9ac79cf41 --- /dev/null +++ b/src/libstore/s3-binary-cache-store.cc @@ -0,0 +1,218 @@ +#include "s3-binary-cache-store.hh" +#include "nar-info.hh" +#include "nar-info-disk-cache.hh" +#include "globals.hh" + +#include <aws/core/client/ClientConfiguration.h> +#include <aws/s3/S3Client.h> +#include <aws/s3/model/CreateBucketRequest.h> +#include <aws/s3/model/GetBucketLocationRequest.h> +#include <aws/s3/model/GetObjectRequest.h> +#include <aws/s3/model/HeadObjectRequest.h> +#include <aws/s3/model/PutObjectRequest.h> + +namespace nix { + +struct S3Error : public Error +{ + Aws::S3::S3Errors err; + S3Error(Aws::S3::S3Errors err, const FormatOrString & fs) + : Error(fs), err(err) { }; +}; + +/* Helper: given an Outcome<R, E>, return R in case of success, or + throw an exception in case of an error. */ +template<typename R, typename E> +R && checkAws(const FormatOrString & fs, Aws::Utils::Outcome<R, E> && outcome) +{ + if (!outcome.IsSuccess()) + throw S3Error( + outcome.GetError().GetErrorType(), + fs.s + ": " + outcome.GetError().GetMessage()); + return outcome.GetResultWithOwnership(); +} + +struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore +{ + std::string bucketName; + + ref<Aws::Client::ClientConfiguration> config; + ref<Aws::S3::S3Client> client; + + Stats stats; + + S3BinaryCacheStoreImpl(std::shared_ptr<Store> localStore, + const Path & secretKeyFile, const std::string & bucketName) + : S3BinaryCacheStore(localStore, secretKeyFile) + , bucketName(bucketName) + , config(makeConfig()) + , client(make_ref<Aws::S3::S3Client>(*config)) + { + diskCache = getNarInfoDiskCache(); + } + + std::string getUri() + { + return "s3://" + bucketName; + } + + ref<Aws::Client::ClientConfiguration> makeConfig() + { + auto res = make_ref<Aws::Client::ClientConfiguration>(); + res->region = Aws::Region::US_EAST_1; // FIXME: make configurable + res->requestTimeoutMs = 600 * 1000; + return res; + } + + void init() + { + if (!diskCache->cacheExists(getUri())) { + + /* Create the bucket if it doesn't already exists. */ + // FIXME: HeadBucket would be more appropriate, but doesn't return + // an easily parsed 404 message. + auto res = client->GetBucketLocation( + Aws::S3::Model::GetBucketLocationRequest().WithBucket(bucketName)); + + if (!res.IsSuccess()) { + if (res.GetError().GetErrorType() != Aws::S3::S3Errors::NO_SUCH_BUCKET) + throw Error(format("AWS error checking bucket ‘%s’: %s") % bucketName % res.GetError().GetMessage()); + + checkAws(format("AWS error creating bucket ‘%s’") % bucketName, + client->CreateBucket( + Aws::S3::Model::CreateBucketRequest() + .WithBucket(bucketName) + .WithCreateBucketConfiguration( + Aws::S3::Model::CreateBucketConfiguration() + /* .WithLocationConstraint( + Aws::S3::Model::BucketLocationConstraint::US) */ ))); + } + + BinaryCacheStore::init(); + + diskCache->createCache(getUri()); + } + } + + const Stats & getS3Stats() + { + return stats; + } + + /* This is a specialisation of isValidPath() that optimistically + fetches the .narinfo file, rather than first checking for its + existence via a HEAD request. Since .narinfos are small, doing + a GET is unlikely to be slower than HEAD. */ + bool isValidPathUncached(const Path & storePath) + { + try { + queryPathInfo(storePath); + return true; + } catch (InvalidPath & e) { + return false; + } + } + + bool fileExists(const std::string & path) + { + stats.head++; + + auto res = client->HeadObject( + Aws::S3::Model::HeadObjectRequest() + .WithBucket(bucketName) + .WithKey(path)); + + if (!res.IsSuccess()) { + auto & error = res.GetError(); + if (error.GetErrorType() == Aws::S3::S3Errors::UNKNOWN // FIXME + && error.GetMessage().find("404") != std::string::npos) + return false; + throw Error(format("AWS error fetching ‘%s’: %s") % path % error.GetMessage()); + } + + return true; + } + + void upsertFile(const std::string & path, const std::string & data) + { + auto request = + Aws::S3::Model::PutObjectRequest() + .WithBucket(bucketName) + .WithKey(path); + + auto stream = std::make_shared<std::stringstream>(data); + + request.SetBody(stream); + + stats.put++; + stats.putBytes += data.size(); + + auto now1 = std::chrono::steady_clock::now(); + + auto result = checkAws(format("AWS error uploading ‘%s’") % path, + client->PutObject(request)); + + auto now2 = std::chrono::steady_clock::now(); + + auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count(); + + printMsg(lvlInfo, format("uploaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms") + % bucketName % path % data.size() % duration); + + stats.putTimeMs += duration; + } + + std::shared_ptr<std::string> getFile(const std::string & path) + { + printMsg(lvlDebug, format("fetching ‘s3://%1%/%2%’...") % bucketName % path); + + auto request = + Aws::S3::Model::GetObjectRequest() + .WithBucket(bucketName) + .WithKey(path); + + request.SetResponseStreamFactory([&]() { + return Aws::New<std::stringstream>("STRINGSTREAM"); + }); + + stats.get++; + + try { + + auto now1 = std::chrono::steady_clock::now(); + + auto result = checkAws(format("AWS error fetching ‘%s’") % path, + client->GetObject(request)); + + auto now2 = std::chrono::steady_clock::now(); + + auto res = dynamic_cast<std::stringstream &>(result.GetBody()).str(); + + auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count(); + + printMsg(lvlTalkative, format("downloaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms") + % bucketName % path % res.size() % duration); + + stats.getBytes += res.size(); + stats.getTimeMs += duration; + + return std::make_shared<std::string>(res); + + } catch (S3Error & e) { + if (e.err == Aws::S3::S3Errors::NO_SUCH_KEY) return 0; + throw; + } + } + +}; + +static RegisterStoreImplementation regStore([](const std::string & uri) -> std::shared_ptr<Store> { + if (std::string(uri, 0, 5) != "s3://") return 0; + auto store = std::make_shared<S3BinaryCacheStoreImpl>(std::shared_ptr<Store>(0), + settings.get("binary-cache-secret-key-file", string("")), + std::string(uri, 5)); + store->init(); + return store; +}); + +} diff --git a/src/libstore/s3-binary-cache-store.hh b/src/libstore/s3-binary-cache-store.hh new file mode 100644 index 000000000..0425f6bb9 --- /dev/null +++ b/src/libstore/s3-binary-cache-store.hh @@ -0,0 +1,34 @@ +#pragma once + +#include "binary-cache-store.hh" + +#include <atomic> + +namespace nix { + +class S3BinaryCacheStore : public BinaryCacheStore +{ +protected: + + S3BinaryCacheStore(std::shared_ptr<Store> localStore, + const Path & secretKeyFile) + : BinaryCacheStore(localStore, secretKeyFile) + { } + +public: + + struct Stats + { + std::atomic<uint64_t> put{0}; + std::atomic<uint64_t> putBytes{0}; + std::atomic<uint64_t> putTimeMs{0}; + std::atomic<uint64_t> get{0}; + std::atomic<uint64_t> getBytes{0}; + std::atomic<uint64_t> getTimeMs{0}; + std::atomic<uint64_t> head{0}; + }; + + const Stats & getS3Stats(); +}; + +} |