aboutsummaryrefslogtreecommitdiff
path: root/src/libstore/s3-binary-cache-store.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstore/s3-binary-cache-store.cc')
-rw-r--r--src/libstore/s3-binary-cache-store.cc178
1 files changed, 114 insertions, 64 deletions
diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc
index ccb71f1ee..3053f908c 100644
--- a/src/libstore/s3-binary-cache-store.cc
+++ b/src/libstore/s3-binary-cache-store.cc
@@ -1,15 +1,17 @@
-#include "config.h"
-
#if ENABLE_S3
-#if __linux__
+#include "s3.hh"
#include "s3-binary-cache-store.hh"
#include "nar-info.hh"
#include "nar-info-disk-cache.hh"
#include "globals.hh"
+#include "compression.hh"
+#include "download.hh"
+#include "istringstream_nocopy.hh"
#include <aws/core/Aws.h>
#include <aws/core/client/ClientConfiguration.h>
+#include <aws/core/client/DefaultRetryStrategy.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/CreateBucketRequest.h>
#include <aws/s3/model/GetBucketLocationRequest.h>
@@ -20,15 +22,6 @@
namespace nix {
-struct istringstream_nocopy : public std::stringstream
-{
- istringstream_nocopy(const std::string & s)
- {
- rdbuf()->pubsetbuf(
- (char *) s.data(), s.size());
- }
-};
-
struct S3Error : public Error
{
Aws::S3::S3Errors err;
@@ -62,21 +55,92 @@ static void initAWS()
});
}
+S3Helper::S3Helper(const string & region)
+ : config(makeConfig(region))
+ , client(make_ref<Aws::S3::S3Client>(*config))
+{
+}
+
+/* Log AWS retries. */
+class RetryStrategy : public Aws::Client::DefaultRetryStrategy
+{
+ long CalculateDelayBeforeNextRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>& error, long attemptedRetries) const override
+ {
+ auto res = Aws::Client::DefaultRetryStrategy::CalculateDelayBeforeNextRetry(error, attemptedRetries);
+ printError("AWS error '%s' (%s), will retry in %d ms",
+ error.GetExceptionName(), error.GetMessage(), res);
+ return res;
+ }
+};
+
+ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig(const string & region)
+{
+ initAWS();
+ auto res = make_ref<Aws::Client::ClientConfiguration>();
+ res->region = region;
+ res->requestTimeoutMs = 600 * 1000;
+ res->retryStrategy = std::make_shared<RetryStrategy>();
+ res->caFile = settings.caFile;
+ return res;
+}
+
+S3Helper::DownloadResult S3Helper::getObject(
+ const std::string & bucketName, const std::string & key)
+{
+ debug("fetching ‘s3://%s/%s’...", bucketName, key);
+
+ auto request =
+ Aws::S3::Model::GetObjectRequest()
+ .WithBucket(bucketName)
+ .WithKey(key);
+
+ request.SetResponseStreamFactory([&]() {
+ return Aws::New<std::stringstream>("STRINGSTREAM");
+ });
+
+ DownloadResult res;
+
+ auto now1 = std::chrono::steady_clock::now();
+
+ try {
+
+ auto result = checkAws(fmt("AWS error fetching ‘%s’", key),
+ client->GetObject(request));
+
+ res.data = decodeContent(
+ result.GetContentEncoding(),
+ make_ref<std::string>(
+ dynamic_cast<std::stringstream &>(result.GetBody()).str()));
+
+ } catch (S3Error & e) {
+ if (e.err != Aws::S3::S3Errors::NO_SUCH_KEY) throw;
+ }
+
+ auto now2 = std::chrono::steady_clock::now();
+
+ res.durationMs = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
+
+ return res;
+}
+
struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
{
std::string bucketName;
- ref<Aws::Client::ClientConfiguration> config;
- ref<Aws::S3::S3Client> client;
-
Stats stats;
+ S3Helper s3Helper;
+
+ std::string narinfoCompression, lsCompression, logCompression;
+
S3BinaryCacheStoreImpl(
const Params & params, const std::string & bucketName)
: S3BinaryCacheStore(params)
, bucketName(bucketName)
- , config(makeConfig())
- , client(make_ref<Aws::S3::S3Client>(*config))
+ , s3Helper(get(params, "aws-region", Aws::Region::US_EAST_1))
+ , narinfoCompression(get(params, "narinfo-compression", ""))
+ , lsCompression(get(params, "ls-compression", ""))
+ , logCompression(get(params, "log-compression", ""))
{
diskCache = getNarInfoDiskCache();
}
@@ -86,15 +150,6 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
return "s3://" + bucketName;
}
- ref<Aws::Client::ClientConfiguration> makeConfig()
- {
- initAWS();
- auto res = make_ref<Aws::Client::ClientConfiguration>();
- res->region = Aws::Region::US_EAST_1; // FIXME: make configurable
- res->requestTimeoutMs = 600 * 1000;
- return res;
- }
-
void init() override
{
if (!diskCache->cacheExists(getUri(), wantMassQuery_, priority)) {
@@ -102,7 +157,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
/* Create the bucket if it doesn't already exists. */
// FIXME: HeadBucket would be more appropriate, but doesn't return
// an easily parsed 404 message.
- auto res = client->GetBucketLocation(
+ auto res = s3Helper.client->GetBucketLocation(
Aws::S3::Model::GetBucketLocationRequest().WithBucket(bucketName));
if (!res.IsSuccess()) {
@@ -110,7 +165,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
throw Error(format("AWS error checking bucket ‘%s’: %s") % bucketName % res.GetError().GetMessage());
checkAws(format("AWS error creating bucket ‘%s’") % bucketName,
- client->CreateBucket(
+ s3Helper.client->CreateBucket(
Aws::S3::Model::CreateBucketRequest()
.WithBucket(bucketName)
.WithCreateBucketConfiguration(
@@ -148,7 +203,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
{
stats.head++;
- auto res = client->HeadObject(
+ auto res = s3Helper.client->HeadObject(
Aws::S3::Model::HeadObjectRequest()
.WithBucket(bucketName)
.WithKey(path));
@@ -164,13 +219,20 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
return true;
}
- void upsertFile(const std::string & path, const std::string & data) override
+ void uploadFile(const std::string & path, const std::string & data,
+ const std::string & mimeType,
+ const std::string & contentEncoding)
{
auto request =
Aws::S3::Model::PutObjectRequest()
.WithBucket(bucketName)
.WithKey(path);
+ request.SetContentType(mimeType);
+
+ if (contentEncoding != "")
+ request.SetContentEncoding(contentEncoding);
+
auto stream = std::make_shared<istringstream_nocopy>(data);
request.SetBody(stream);
@@ -181,7 +243,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
auto now1 = std::chrono::steady_clock::now();
auto result = checkAws(format("AWS error uploading ‘%s’") % path,
- client->PutObject(request));
+ s3Helper.client->PutObject(request));
auto now2 = std::chrono::steady_clock::now();
@@ -193,6 +255,19 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
stats.putTimeMs += duration;
}
+ void upsertFile(const std::string & path, const std::string & data,
+ const std::string & mimeType) override
+ {
+ if (narinfoCompression != "" && hasSuffix(path, ".narinfo"))
+ uploadFile(path, *compress(narinfoCompression, data), mimeType, narinfoCompression);
+ else if (lsCompression != "" && hasSuffix(path, ".ls"))
+ uploadFile(path, *compress(lsCompression, data), mimeType, lsCompression);
+ else if (logCompression != "" && hasPrefix(path, "log/"))
+ uploadFile(path, *compress(logCompression, data), mimeType, logCompression);
+ else
+ uploadFile(path, data, mimeType, "");
+ }
+
void getFile(const std::string & path,
std::function<void(std::shared_ptr<std::string>)> success,
std::function<void(std::exception_ptr exc)> failure) override
@@ -200,42 +275,18 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
sync2async<std::shared_ptr<std::string>>(success, failure, [&]() {
debug(format("fetching ‘s3://%1%/%2%’...") % bucketName % path);
- auto request =
- Aws::S3::Model::GetObjectRequest()
- .WithBucket(bucketName)
- .WithKey(path);
-
- request.SetResponseStreamFactory([&]() {
- return Aws::New<std::stringstream>("STRINGSTREAM");
- });
-
stats.get++;
- try {
-
- auto now1 = std::chrono::steady_clock::now();
+ auto res = s3Helper.getObject(bucketName, path);
- auto result = checkAws(format("AWS error fetching ‘%s’") % path,
- client->GetObject(request));
+ stats.getBytes += res.data ? res.data->size() : 0;
+ stats.getTimeMs += res.durationMs;
- auto now2 = std::chrono::steady_clock::now();
+ if (res.data)
+ printTalkative("downloaded ‘s3://%s/%s’ (%d bytes) in %d ms",
+ bucketName, path, res.data->size(), res.durationMs);
- auto res = dynamic_cast<std::stringstream &>(result.GetBody()).str();
-
- auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
-
- printMsg(lvlTalkative, format("downloaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms")
- % bucketName % path % res.size() % duration);
-
- stats.getBytes += res.size();
- stats.getTimeMs += duration;
-
- return std::make_shared<std::string>(res);
-
- } catch (S3Error & e) {
- if (e.err == Aws::S3::S3Errors::NO_SUCH_KEY) return std::shared_ptr<std::string>();
- throw;
- }
+ return res.data;
});
}
@@ -248,7 +299,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
debug(format("listing bucket ‘s3://%s’ from key ‘%s’...") % bucketName % marker);
auto res = checkAws(format("AWS error listing bucket ‘%s’") % bucketName,
- client->ListObjects(
+ s3Helper.client->ListObjects(
Aws::S3::Model::ListObjectsRequest()
.WithBucket(bucketName)
.WithDelimiter("/")
@@ -286,4 +337,3 @@ static RegisterStoreImplementation regStore([](
}
#endif
-#endif