aboutsummaryrefslogtreecommitdiff
path: root/src/libstore/s3-binary-cache-store.cc
diff options
context:
space:
mode:
authorEelco Dolstra <edolstra@gmail.com>2018-10-30 14:25:00 +0100
committerEelco Dolstra <edolstra@gmail.com>2018-10-30 14:25:00 +0100
commit9f99d62480cf7c58c0a110b180f2096b7d25adab (patch)
treee99f4a412eb90a7f23db294a74ff95f2bee57d63 /src/libstore/s3-binary-cache-store.cc
parent0163e8928c624251456adacb669a94a4adf230ff (diff)
S3BinaryCacheStore: Allow disabling multipart uploads
The use of TransferManager has several issues, including that it doesn't allow setting a Content-Encoding without a patch, and it doesn't handle exceptions in worker threads (causing termination on memory allocation failure). Fixes #2493.
Diffstat (limited to 'src/libstore/s3-binary-cache-store.cc')
-rw-r--r--src/libstore/s3-binary-cache-store.cc88
1 files changed, 57 insertions, 31 deletions
diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc
index 13ee257ba..c5c6b89b1 100644
--- a/src/libstore/s3-binary-cache-store.cc
+++ b/src/libstore/s3-binary-cache-store.cc
@@ -173,6 +173,8 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
const Setting<std::string> narinfoCompression{this, "", "narinfo-compression", "compression method for .narinfo files"};
const Setting<std::string> lsCompression{this, "", "ls-compression", "compression method for .ls files"};
const Setting<std::string> logCompression{this, "", "log-compression", "compression method for log/* files"};
+ const Setting<bool> multipartUpload{
+ this, false, "multipart-upload", "whether to use multi-part uploads"};
const Setting<uint64_t> bufferSize{
this, 5 * 1024 * 1024, "buffer-size", "size (in bytes) of each part in multi-part uploads"};
@@ -261,46 +263,70 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
static std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>
executor = std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(maxThreads);
- std::call_once(transferManagerCreated, [&]() {
+ std::call_once(transferManagerCreated, [&]()
+ {
+ if (multipartUpload) {
+ TransferManagerConfiguration transferConfig(executor.get());
+
+ transferConfig.s3Client = s3Helper.client;
+ transferConfig.bufferSize = bufferSize;
+
+ transferConfig.uploadProgressCallback =
+ [](const TransferManager *transferManager,
+ const std::shared_ptr<const TransferHandle>
+ &transferHandle)
+ {
+ //FIXME: find a way to properly abort the multipart upload.
+ //checkInterrupt();
+ debug("upload progress ('%s'): '%d' of '%d' bytes",
+ transferHandle->GetKey(),
+ transferHandle->GetBytesTransferred(),
+ transferHandle->GetBytesTotalSize());
+ };
+
+ transferManager = TransferManager::Create(transferConfig);
+ }
+ });
- TransferManagerConfiguration transferConfig(executor.get());
+ auto now1 = std::chrono::steady_clock::now();
- transferConfig.s3Client = s3Helper.client;
- transferConfig.bufferSize = bufferSize;
+ if (transferManager) {
- transferConfig.uploadProgressCallback =
- [](const TransferManager *transferManager,
- const std::shared_ptr<const TransferHandle>
- &transferHandle)
- {
- //FIXME: find a way to properly abort the multipart upload.
- //checkInterrupt();
- debug("upload progress ('%s'): '%d' of '%d' bytes",
- transferHandle->GetKey(),
- transferHandle->GetBytesTransferred(),
- transferHandle->GetBytesTotalSize());
- };
+ std::shared_ptr<TransferHandle> transferHandle =
+ transferManager->UploadFile(
+ stream, bucketName, path, mimeType,
+ Aws::Map<Aws::String, Aws::String>(),
+ nullptr, contentEncoding);
- transferManager = TransferManager::Create(transferConfig);
- });
+ transferHandle->WaitUntilFinished();
- auto now1 = std::chrono::steady_clock::now();
+ if (transferHandle->GetStatus() == TransferStatus::FAILED)
+ throw Error("AWS error: failed to upload 's3://%s/%s': %s",
+ bucketName, path, transferHandle->GetLastError().GetMessage());
- std::shared_ptr<TransferHandle> transferHandle =
- transferManager->UploadFile(
- stream, bucketName, path, mimeType,
- Aws::Map<Aws::String, Aws::String>(),
- nullptr, contentEncoding);
+ if (transferHandle->GetStatus() != TransferStatus::COMPLETED)
+ throw Error("AWS error: transfer status of 's3://%s/%s' in unexpected state",
+ bucketName, path);
- transferHandle->WaitUntilFinished();
+ } else {
- if (transferHandle->GetStatus() == TransferStatus::FAILED)
- throw Error("AWS error: failed to upload 's3://%s/%s': %s",
- bucketName, path, transferHandle->GetLastError().GetMessage());
+ auto request =
+ Aws::S3::Model::PutObjectRequest()
+ .WithBucket(bucketName)
+ .WithKey(path);
- if (transferHandle->GetStatus() != TransferStatus::COMPLETED)
- throw Error("AWS error: transfer status of 's3://%s/%s' in unexpected state",
- bucketName, path);
+ request.SetContentType(mimeType);
+
+ if (contentEncoding != "")
+ request.SetContentEncoding(contentEncoding);
+
+ auto stream = std::make_shared<istringstream_nocopy>(data);
+
+ request.SetBody(stream);
+
+ auto result = checkAws(fmt("AWS error uploading '%s'", path),
+ s3Helper.client->PutObject(request));
+ }
printTalkative("upload of '%s' completed", path);