aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEelco Dolstra <edolstra@gmail.com>2018-05-10 12:06:02 +0200
committerEelco Dolstra <edolstra@gmail.com>2018-05-10 12:06:02 +0200
commit38def176271c97a427febaf381a32bebe6b3790b (patch)
treef7b71b5ebb3521f5e6d5ae4953f8b34dd1159ce5
parent39c4d7f5b0aee228d33e5a0d83b18cff8a4b49cc (diff)
parent854c0860f4ec68e81869473cf0708529afd0486e (diff)
Merge branch 's3-multipart-uploads' of https://github.com/AmineChikhaoui/nix
-rw-r--r--release-common.nix2
-rw-r--r--src/libstore/local.mk2
-rw-r--r--src/libstore/s3-binary-cache-store.cc80
3 files changed, 66 insertions, 18 deletions
diff --git a/release-common.nix b/release-common.nix
index 0c12bc7ce..d7fb8125f 100644
--- a/release-common.nix
+++ b/release-common.nix
@@ -61,7 +61,7 @@ rec {
++ lib.optional (stdenv.isLinux || stdenv.isDarwin) libsodium
++ lib.optional (stdenv.isLinux || stdenv.isDarwin)
(aws-sdk-cpp.override {
- apis = ["s3"];
+ apis = ["s3" "transfer"];
customMemoryManagement = false;
});
diff --git a/src/libstore/local.mk b/src/libstore/local.mk
index a7279aa39..3799257f8 100644
--- a/src/libstore/local.mk
+++ b/src/libstore/local.mk
@@ -18,7 +18,7 @@ libstore_FILES = sandbox-defaults.sb sandbox-minimal.sb sandbox-network.sb
$(foreach file,$(libstore_FILES),$(eval $(call install-data-in,$(d)/$(file),$(datadir)/nix/sandbox)))
ifeq ($(ENABLE_S3), 1)
- libstore_LDFLAGS += -laws-cpp-sdk-s3 -laws-cpp-sdk-core
+ libstore_LDFLAGS += -laws-cpp-sdk-transfer -laws-cpp-sdk-s3 -laws-cpp-sdk-core
endif
ifeq ($(OS), SunOS)
diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc
index 23af45209..96673a5b0 100644
--- a/src/libstore/s3-binary-cache-store.cc
+++ b/src/libstore/s3-binary-cache-store.cc
@@ -17,6 +17,7 @@
#include <aws/core/client/DefaultRetryStrategy.h>
#include <aws/core/utils/logging/FormattedLogSystem.h>
#include <aws/core/utils/logging/LogMacros.h>
+#include <aws/core/utils/threading/Executor.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/CreateBucketRequest.h>
#include <aws/s3/model/GetBucketLocationRequest.h>
@@ -24,6 +25,9 @@
#include <aws/s3/model/HeadObjectRequest.h>
#include <aws/s3/model/ListObjectsRequest.h>
#include <aws/s3/model/PutObjectRequest.h>
+#include <aws/transfer/TransferManager.h>
+
+using namespace Aws::Transfer;
namespace nix {
@@ -169,6 +173,8 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
const Setting<std::string> narinfoCompression{this, "", "narinfo-compression", "compression method for .narinfo files"};
const Setting<std::string> lsCompression{this, "", "ls-compression", "compression method for .ls files"};
const Setting<std::string> logCompression{this, "", "log-compression", "compression method for log/* files"};
+ const Setting<uint64_t> bufferSize{
+ this, 5 * 1024 * 1024, "buffer-size", "size (in bytes) of each part in multi-part uploads. defaults to 5Mb"};
std::string bucketName;
@@ -271,34 +277,76 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
const std::string & mimeType,
const std::string & contentEncoding)
{
- auto request =
- Aws::S3::Model::PutObjectRequest()
- .WithBucket(bucketName)
- .WithKey(path);
+ auto stream = std::make_shared<istringstream_nocopy>(data);
- request.SetContentType(mimeType);
+ auto maxThreads = std::thread::hardware_concurrency();
- if (contentEncoding != "")
- request.SetContentEncoding(contentEncoding);
+ static std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>
+ executor = std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(maxThreads);
- auto stream = std::make_shared<istringstream_nocopy>(data);
+ TransferManagerConfiguration transferConfig(executor.get());
- request.SetBody(stream);
+ transferConfig.s3Client = s3Helper.client;
+ transferConfig.bufferSize = bufferSize;
- stats.put++;
- stats.putBytes += data.size();
+ if (contentEncoding != "")
+ transferConfig.createMultipartUploadTemplate.SetContentEncoding(
+ contentEncoding);
+
+ transferConfig.uploadProgressCallback =
+ [&](const TransferManager *transferManager,
+ const std::shared_ptr<const TransferHandle>
+ &transferHandle) {
+ //FIXME: find a way to properly abort the multipart upload.
+ checkInterrupt();
+ printTalkative("upload progress ('%s'): '%d' of '%d' bytes",
+ path,
+ transferHandle->GetBytesTransferred(),
+ transferHandle->GetBytesTotalSize());
+ };
+
+ transferConfig.transferStatusUpdatedCallback =
+ [&](const TransferManager *,
+ const std::shared_ptr<const TransferHandle>
+ &transferHandle) {
+ switch (transferHandle->GetStatus()) {
+ case TransferStatus::COMPLETED:
+ printTalkative("upload of '%s' completed", path);
+ stats.put++;
+ stats.putBytes += data.size();
+ break;
+ case TransferStatus::IN_PROGRESS:
+ break;
+ case TransferStatus::FAILED:
+ throw Error("AWS error: failed to upload 's3://%s/%s'",
+ bucketName, path);
+ break;
+ default:
+ throw Error("AWS error: transfer status of 's3://%s/%s' "
+ "in unexpected state",
+ bucketName, path);
+ };
+ };
+
+ std::shared_ptr<TransferManager> transferManager =
+ TransferManager::Create(transferConfig);
auto now1 = std::chrono::steady_clock::now();
- auto result = checkAws(format("AWS error uploading '%s'") % path,
- s3Helper.client->PutObject(request));
+ std::shared_ptr<TransferHandle> transferHandle =
+ transferManager->UploadFile(stream, bucketName, path, mimeType,
+ Aws::Map<Aws::String, Aws::String>());
+
+ transferHandle->WaitUntilFinished();
auto now2 = std::chrono::steady_clock::now();
- auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
+ auto duration =
+ std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1)
+ .count();
- printInfo(format("uploaded 's3://%1%/%2%' (%3% bytes) in %4% ms")
- % bucketName % path % data.size() % duration);
+ printInfo(format("uploaded 's3://%1%/%2%' (%3% bytes) in %4% ms") %
+ bucketName % path % data.size() % duration);
stats.putTimeMs += duration;
}