From 129394fb953475888173de5d8f864de3a2569e0e Mon Sep 17 00:00:00 2001 From: AmineChikhaoui Date: Mon, 7 May 2018 14:23:51 +0100 Subject: Support multi-part uploads for large NARs that exceed the size of 5Gb. --- src/libstore/local.mk | 2 +- src/libstore/s3-binary-cache-store.cc | 80 ++++++++++++++++++++++++++++------- 2 files changed, 65 insertions(+), 17 deletions(-) (limited to 'src/libstore') diff --git a/src/libstore/local.mk b/src/libstore/local.mk index a7279aa39..3799257f8 100644 --- a/src/libstore/local.mk +++ b/src/libstore/local.mk @@ -18,7 +18,7 @@ libstore_FILES = sandbox-defaults.sb sandbox-minimal.sb sandbox-network.sb $(foreach file,$(libstore_FILES),$(eval $(call install-data-in,$(d)/$(file),$(datadir)/nix/sandbox))) ifeq ($(ENABLE_S3), 1) - libstore_LDFLAGS += -laws-cpp-sdk-s3 -laws-cpp-sdk-core + libstore_LDFLAGS += -laws-cpp-sdk-transfer -laws-cpp-sdk-s3 -laws-cpp-sdk-core endif ifeq ($(OS), SunOS) diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index 23af45209..b55ff5c6d 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -24,6 +25,9 @@ #include #include #include +#include + +using namespace Aws::Transfer; namespace nix { @@ -169,6 +173,8 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore const Setting narinfoCompression{this, "", "narinfo-compression", "compression method for .narinfo files"}; const Setting lsCompression{this, "", "ls-compression", "compression method for .ls files"}; const Setting logCompression{this, "", "log-compression", "compression method for log/* files"}; + const Setting bufferSize{ + this, 5 * 1024 * 1024, "buffer-size", "size (in bytes) of each part in multi-part uploads. defaults to 5Mb"}; std::string bucketName; @@ -271,34 +277,76 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore const std::string & mimeType, const std::string & contentEncoding) { - auto request = - Aws::S3::Model::PutObjectRequest() - .WithBucket(bucketName) - .WithKey(path); + auto stream = std::make_shared(data); - request.SetContentType(mimeType); + auto maxThreads = std::thread::hardware_concurrency(); - if (contentEncoding != "") - request.SetContentEncoding(contentEncoding); + auto executor = + std::make_shared(maxThreads); - auto stream = std::make_shared(data); + TransferManagerConfiguration transferConfig(executor.get()); - request.SetBody(stream); + transferConfig.s3Client = s3Helper.client; + transferConfig.bufferSize = bufferSize; - stats.put++; - stats.putBytes += data.size(); + if (contentEncoding != "") + transferConfig.createMultipartUploadTemplate.SetContentEncoding( + contentEncoding); + + transferConfig.uploadProgressCallback = + [&](const TransferManager *transferManager, + const std::shared_ptr + &transferHandle) + { + checkInterrupt(); + printTalkative("upload progress ('%s'): '%d' of '%d' bytes", + path, + transferHandle->GetBytesTransferred(), + transferHandle->GetBytesTotalSize()); + }; + + transferConfig.transferStatusUpdatedCallback = + [&](const TransferManager *, + const std::shared_ptr + &transferHandle) { + switch (transferHandle->GetStatus()) { + case TransferStatus::COMPLETED: + printTalkative("upload of '%s' completed", path); + stats.put++; + stats.putBytes += data.size(); + break; + case TransferStatus::IN_PROGRESS: + break; + case TransferStatus::FAILED: + throw Error("AWS error: failed to upload 's3://%s/%s'", + bucketName, path); + break; + default: + throw Error("AWS error: transfer status of 's3://%s/%s' " + "in unexpected state", + bucketName, path); + }; + }; + + std::shared_ptr transferManager = + TransferManager::Create(transferConfig); auto now1 = std::chrono::steady_clock::now(); - auto result = checkAws(format("AWS error uploading '%s'") % path, - s3Helper.client->PutObject(request)); + std::shared_ptr transferHandle = + transferManager->UploadFile(stream, bucketName, path, mimeType, + Aws::Map()); + + transferHandle->WaitUntilFinished(); auto now2 = std::chrono::steady_clock::now(); - auto duration = std::chrono::duration_cast(now2 - now1).count(); + auto duration = + std::chrono::duration_cast(now2 - now1) + .count(); - printInfo(format("uploaded 's3://%1%/%2%' (%3% bytes) in %4% ms") - % bucketName % path % data.size() % duration); + printInfo(format("uploaded 's3://%1%/%2%' (%3% bytes) in %4% ms") % + bucketName % path % data.size() % duration); stats.putTimeMs += duration; } -- cgit v1.2.3 From 591e75cd01f1650aab5538432a8637683ad1a9d0 Mon Sep 17 00:00:00 2001 From: AmineChikhaoui Date: Mon, 7 May 2018 14:27:53 +0100 Subject: add a FIXME note to find a way to abort the multipart uploads in case the nix command is interrupted. --- src/libstore/s3-binary-cache-store.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/libstore') diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index b55ff5c6d..37da44da8 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -296,8 +296,8 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore transferConfig.uploadProgressCallback = [&](const TransferManager *transferManager, const std::shared_ptr - &transferHandle) - { + &transferHandle) { + //FIXME: find a way to properly abort the multipart upload. checkInterrupt(); printTalkative("upload progress ('%s'): '%d' of '%d' bytes", path, -- cgit v1.2.3 From 854c0860f4ec68e81869473cf0708529afd0486e Mon Sep 17 00:00:00 2001 From: AmineChikhaoui Date: Mon, 7 May 2018 15:07:00 +0100 Subject: share the executor between multiple copy threads. --- src/libstore/s3-binary-cache-store.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/libstore') diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index 37da44da8..96673a5b0 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -281,8 +281,8 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore auto maxThreads = std::thread::hardware_concurrency(); - auto executor = - std::make_shared(maxThreads); + static std::shared_ptr + executor = std::make_shared(maxThreads); TransferManagerConfiguration transferConfig(executor.get()); -- cgit v1.2.3