aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/libstore/download.cc28
-rw-r--r--src/libstore/download.hh2
-rw-r--r--src/libstore/s3-binary-cache-store.cc141
-rw-r--r--src/libstore/s3.hh33
-rw-r--r--src/libutil/logging.hh1
5 files changed, 142 insertions, 63 deletions
diff --git a/src/libstore/download.cc b/src/libstore/download.cc
index 074e0ca66..85215439a 100644
--- a/src/libstore/download.cc
+++ b/src/libstore/download.cc
@@ -4,6 +4,7 @@
#include "hash.hh"
#include "store-api.hh"
#include "archive.hh"
+#include "s3.hh"
#include <unistd.h>
#include <fcntl.h>
@@ -480,6 +481,31 @@ struct CurlDownloader : public Downloader
std::function<void(const DownloadResult &)> success,
std::function<void(std::exception_ptr exc)> failure) override
{
+ /* Ugly hack to support s3:// URIs. */
+ if (hasPrefix(request.uri, "s3://")) {
+ // FIXME: do this on a worker thread
+ sync2async<DownloadResult>(success, failure, [&]() {
+#ifdef ENABLE_S3
+ S3Helper s3Helper;
+ auto slash = request.uri.find('/', 5);
+ if (slash == std::string::npos)
+ throw nix::Error("bad S3 URI ‘%s’", request.uri);
+ std::string bucketName(request.uri, 5, slash - 5);
+ std::string key(request.uri, slash + 1);
+ // FIXME: implement ETag
+ auto s3Res = s3Helper.getObject(bucketName, key);
+ DownloadResult res;
+ if (!s3Res.data)
+ throw DownloadError(NotFound, fmt("S3 object ‘%s’ does not exist", request.uri));
+ res.data = s3Res.data;
+ return res;
+#else
+ throw nix::Error("cannot download ‘%s’ because Nix is not built with S3 support", request.uri);
+#endif
+ });
+ return;
+ }
+
auto item = std::make_shared<DownloadItem>(*this, request);
item->success = success;
item->failure = failure;
@@ -629,7 +655,7 @@ bool isUri(const string & s)
size_t pos = s.find("://");
if (pos == string::npos) return false;
string scheme(s, 0, pos);
- return scheme == "http" || scheme == "https" || scheme == "file" || scheme == "channel" || scheme == "git";
+ return scheme == "http" || scheme == "https" || scheme == "file" || scheme == "channel" || scheme == "git" || scheme == "s3";
}
diff --git a/src/libstore/download.hh b/src/libstore/download.hh
index 82b5d641f..bdb5011e7 100644
--- a/src/libstore/download.hh
+++ b/src/libstore/download.hh
@@ -23,7 +23,7 @@ struct DownloadRequest
struct DownloadResult
{
- bool cached;
+ bool cached = false;
std::string etag;
std::string effectiveUrl;
std::shared_ptr<std::string> data;
diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc
index cc1b33104..ac083410b 100644
--- a/src/libstore/s3-binary-cache-store.cc
+++ b/src/libstore/s3-binary-cache-store.cc
@@ -1,6 +1,6 @@
#if ENABLE_S3
-#if __linux__
+#include "s3.hh"
#include "s3-binary-cache-store.hh"
#include "nar-info.hh"
#include "nar-info-disk-cache.hh"
@@ -18,15 +18,6 @@
namespace nix {
-struct istringstream_nocopy : public std::stringstream
-{
- istringstream_nocopy(const std::string & s)
- {
- rdbuf()->pubsetbuf(
- (char *) s.data(), s.size());
- }
-};
-
struct S3Error : public Error
{
Aws::S3::S3Errors err;
@@ -60,21 +51,81 @@ static void initAWS()
});
}
+S3Helper::S3Helper()
+ : config(makeConfig())
+ , client(make_ref<Aws::S3::S3Client>(*config))
+{
+}
+
+ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig()
+{
+ initAWS();
+ auto res = make_ref<Aws::Client::ClientConfiguration>();
+ res->region = Aws::Region::US_EAST_1; // FIXME: make configurable
+ res->requestTimeoutMs = 600 * 1000;
+ return res;
+}
+
+S3Helper::DownloadResult S3Helper::getObject(
+ const std::string & bucketName, const std::string & key)
+{
+ debug("fetching ‘s3://%s/%s’...", bucketName, key);
+
+ auto request =
+ Aws::S3::Model::GetObjectRequest()
+ .WithBucket(bucketName)
+ .WithKey(key);
+
+ request.SetResponseStreamFactory([&]() {
+ return Aws::New<std::stringstream>("STRINGSTREAM");
+ });
+
+ DownloadResult res;
+
+ auto now1 = std::chrono::steady_clock::now();
+
+ try {
+
+ auto result = checkAws(fmt("AWS error fetching ‘%s’", key),
+ client->GetObject(request));
+
+ res.data = std::make_shared<std::string>(
+ dynamic_cast<std::stringstream &>(result.GetBody()).str());
+
+ } catch (S3Error & e) {
+ if (e.err != Aws::S3::S3Errors::NO_SUCH_KEY) throw;
+ }
+
+ auto now2 = std::chrono::steady_clock::now();
+
+ res.durationMs = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
+
+ return res;
+}
+
+#if __linux__
+
+struct istringstream_nocopy : public std::stringstream
+{
+ istringstream_nocopy(const std::string & s)
+ {
+ rdbuf()->pubsetbuf(
+ (char *) s.data(), s.size());
+ }
+};
+
struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
{
std::string bucketName;
- ref<Aws::Client::ClientConfiguration> config;
- ref<Aws::S3::S3Client> client;
-
Stats stats;
+ S3Helper s3Helper;
+
S3BinaryCacheStoreImpl(
const Params & params, const std::string & bucketName)
: S3BinaryCacheStore(params)
, bucketName(bucketName)
- , config(makeConfig())
- , client(make_ref<Aws::S3::S3Client>(*config))
{
diskCache = getNarInfoDiskCache();
}
@@ -84,15 +135,6 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
return "s3://" + bucketName;
}
- ref<Aws::Client::ClientConfiguration> makeConfig()
- {
- initAWS();
- auto res = make_ref<Aws::Client::ClientConfiguration>();
- res->region = Aws::Region::US_EAST_1; // FIXME: make configurable
- res->requestTimeoutMs = 600 * 1000;
- return res;
- }
-
void init() override
{
if (!diskCache->cacheExists(getUri(), wantMassQuery_, priority)) {
@@ -100,7 +142,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
/* Create the bucket if it doesn't already exists. */
// FIXME: HeadBucket would be more appropriate, but doesn't return
// an easily parsed 404 message.
- auto res = client->GetBucketLocation(
+ auto res = s3Helper.client->GetBucketLocation(
Aws::S3::Model::GetBucketLocationRequest().WithBucket(bucketName));
if (!res.IsSuccess()) {
@@ -108,7 +150,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
throw Error(format("AWS error checking bucket ‘%s’: %s") % bucketName % res.GetError().GetMessage());
checkAws(format("AWS error creating bucket ‘%s’") % bucketName,
- client->CreateBucket(
+ s3Helper.client->CreateBucket(
Aws::S3::Model::CreateBucketRequest()
.WithBucket(bucketName)
.WithCreateBucketConfiguration(
@@ -146,7 +188,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
{
stats.head++;
- auto res = client->HeadObject(
+ auto res = s3Helper.client->HeadObject(
Aws::S3::Model::HeadObjectRequest()
.WithBucket(bucketName)
.WithKey(path));
@@ -179,7 +221,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
auto now1 = std::chrono::steady_clock::now();
auto result = checkAws(format("AWS error uploading ‘%s’") % path,
- client->PutObject(request));
+ s3Helper.client->PutObject(request));
auto now2 = std::chrono::steady_clock::now();
@@ -198,42 +240,18 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
sync2async<std::shared_ptr<std::string>>(success, failure, [&]() {
debug(format("fetching ‘s3://%1%/%2%’...") % bucketName % path);
- auto request =
- Aws::S3::Model::GetObjectRequest()
- .WithBucket(bucketName)
- .WithKey(path);
-
- request.SetResponseStreamFactory([&]() {
- return Aws::New<std::stringstream>("STRINGSTREAM");
- });
-
stats.get++;
- try {
-
- auto now1 = std::chrono::steady_clock::now();
-
- auto result = checkAws(format("AWS error fetching ‘%s’") % path,
- client->GetObject(request));
-
- auto now2 = std::chrono::steady_clock::now();
+ auto res = s3Helper.getObject(bucketName, path);
- auto res = dynamic_cast<std::stringstream &>(result.GetBody()).str();
+ stats.getBytes += res.data ? res.data->size() : 0;
+ stats.getTimeMs += res.durationMs;
- auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
+ if (res.data)
+ printTalkative("downloaded ‘s3://%s/%s’ (%d bytes) in %d ms",
+ bucketName, path, res.data->size(), res.durationMs);
- printMsg(lvlTalkative, format("downloaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms")
- % bucketName % path % res.size() % duration);
-
- stats.getBytes += res.size();
- stats.getTimeMs += duration;
-
- return std::make_shared<std::string>(res);
-
- } catch (S3Error & e) {
- if (e.err == Aws::S3::S3Errors::NO_SUCH_KEY) return std::shared_ptr<std::string>();
- throw;
- }
+ return res.data;
});
}
@@ -246,7 +264,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
debug(format("listing bucket ‘s3://%s’ from key ‘%s’...") % bucketName % marker);
auto res = checkAws(format("AWS error listing bucket ‘%s’") % bucketName,
- client->ListObjects(
+ s3Helper.client->ListObjects(
Aws::S3::Model::ListObjectsRequest()
.WithBucket(bucketName)
.WithDelimiter("/")
@@ -281,7 +299,8 @@ static RegisterStoreImplementation regStore([](
return store;
});
+#endif
+
}
#endif
-#endif
diff --git a/src/libstore/s3.hh b/src/libstore/s3.hh
new file mode 100644
index 000000000..5d5d3475c
--- /dev/null
+++ b/src/libstore/s3.hh
@@ -0,0 +1,33 @@
+#pragma once
+
+#if ENABLE_S3
+
+#include "ref.hh"
+
+namespace Aws { namespace Client { class ClientConfiguration; } }
+namespace Aws { namespace S3 { class S3Client; } }
+
+namespace nix {
+
+struct S3Helper
+{
+ ref<Aws::Client::ClientConfiguration> config;
+ ref<Aws::S3::S3Client> client;
+
+ S3Helper();
+
+ ref<Aws::Client::ClientConfiguration> makeConfig();
+
+ struct DownloadResult
+ {
+ std::shared_ptr<std::string> data;
+ unsigned int durationMs;
+ };
+
+ DownloadResult getObject(
+ const std::string & bucketName, const std::string & key);
+};
+
+}
+
+#endif
diff --git a/src/libutil/logging.hh b/src/libutil/logging.hh
index 3e6c4b548..3f8366479 100644
--- a/src/libutil/logging.hh
+++ b/src/libutil/logging.hh
@@ -78,6 +78,7 @@ extern Verbosity verbosity; /* suppress msgs > this */
#define printError(args...) printMsg(lvlError, args)
#define printInfo(args...) printMsg(lvlInfo, args)
+#define printTalkative(args...) printMsg(lvlTalkative, args)
#define debug(args...) printMsg(lvlDebug, args)
#define vomit(args...) printMsg(lvlVomit, args)