aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/libstore/binary-cache-store.cc11
-rw-r--r--src/libstore/binary-cache-store.hh4
-rw-r--r--src/libstore/http-binary-cache-store.cc4
-rw-r--r--src/libstore/local-binary-cache-store.cc5
-rw-r--r--src/libstore/s3-binary-cache-store.cc37
-rw-r--r--src/libutil/serialise.hh23
6 files changed, 57 insertions, 27 deletions
diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc
index e71240558..b791c125b 100644
--- a/src/libstore/binary-cache-store.cc
+++ b/src/libstore/binary-cache-store.cc
@@ -15,6 +15,7 @@
#include <chrono>
#include <future>
#include <regex>
+#include <fstream>
#include <nlohmann/json.hpp>
@@ -58,11 +59,10 @@ void BinaryCacheStore::init()
}
void BinaryCacheStore::upsertFile(const std::string & path,
- const std::string & data,
+ std::string && data,
const std::string & mimeType)
{
- StringSource source(data);
- upsertFile(path, source, mimeType);
+ upsertFile(path, std::make_shared<std::stringstream>(std::move(data)), mimeType);
}
void BinaryCacheStore::getFile(const std::string & path,
@@ -279,8 +279,9 @@ void BinaryCacheStore::addToStore(const ValidPathInfo & info, Source & narSource
/* Atomically write the NAR file. */
if (repair || !fileExists(narInfo->url)) {
stats.narWrite++;
- FileSource source(fnTemp);
- upsertFile(narInfo->url, source, "application/x-nix-nar");
+ upsertFile(narInfo->url,
+ std::make_shared<std::fstream>(fnTemp, std::ios_base::in),
+ "application/x-nix-nar");
} else
stats.narWriteAverted++;
diff --git a/src/libstore/binary-cache-store.hh b/src/libstore/binary-cache-store.hh
index 2bf8d56b4..9bcdf5901 100644
--- a/src/libstore/binary-cache-store.hh
+++ b/src/libstore/binary-cache-store.hh
@@ -36,11 +36,11 @@ public:
virtual bool fileExists(const std::string & path) = 0;
virtual void upsertFile(const std::string & path,
- Source & source,
+ std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType) = 0;
void upsertFile(const std::string & path,
- const std::string & data,
+ std::string && data,
const std::string & mimeType);
/* Note: subclasses must implement at least one of the two
diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc
index d9a292368..c1ceb08cf 100644
--- a/src/libstore/http-binary-cache-store.cc
+++ b/src/libstore/http-binary-cache-store.cc
@@ -100,11 +100,11 @@ protected:
}
void upsertFile(const std::string & path,
- Source & source,
+ std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType) override
{
auto req = FileTransferRequest(cacheUri + "/" + path);
- req.data = std::make_shared<string>(source.drain());
+ req.data = std::make_shared<string>(StreamToSourceAdapter(istream).drain());
req.mimeType = mimeType;
try {
getFileTransfer()->upload(req);
diff --git a/src/libstore/local-binary-cache-store.cc b/src/libstore/local-binary-cache-store.cc
index 3d531d3a7..87d8334d7 100644
--- a/src/libstore/local-binary-cache-store.cc
+++ b/src/libstore/local-binary-cache-store.cc
@@ -31,12 +31,13 @@ protected:
bool fileExists(const std::string & path) override;
void upsertFile(const std::string & path,
- Source & source,
- const std::string & mimeType)
+ std::shared_ptr<std::basic_iostream<char>> istream,
+ const std::string & mimeType) override
{
auto path2 = binaryCacheDir + "/" + path;
Path tmp = path2 + ".tmp." + std::to_string(getpid());
AutoDelete del(tmp, false);
+ StreamToSourceAdapter source(istream);
writeFile(tmp, source);
if (rename(tmp.c_str(), path2.c_str()))
throw SysError("renaming '%1%' to '%2%'", tmp, path2);
diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc
index 31ad4a3be..1b7dff085 100644
--- a/src/libstore/s3-binary-cache-store.cc
+++ b/src/libstore/s3-binary-cache-store.cc
@@ -261,12 +261,11 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
std::shared_ptr<TransferManager> transferManager;
std::once_flag transferManagerCreated;
- void uploadFile(const std::string & path, const std::string & data,
+ void uploadFile(const std::string & path,
+ std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType,
const std::string & contentEncoding)
{
- auto stream = std::make_shared<std::stringstream>(data);
-
auto maxThreads = std::thread::hardware_concurrency();
static std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>
@@ -306,7 +305,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
std::shared_ptr<TransferHandle> transferHandle =
transferManager->UploadFile(
- stream, bucketName, path, mimeType,
+ istream, bucketName, path, mimeType,
Aws::Map<Aws::String, Aws::String>(),
nullptr /*, contentEncoding */);
@@ -332,9 +331,7 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
if (contentEncoding != "")
request.SetContentEncoding(contentEncoding);
- auto stream = std::make_shared<std::stringstream>(data);
-
- request.SetBody(stream);
+ request.SetBody(istream);
auto result = checkAws(fmt("AWS error uploading '%s'", path),
s3Helper.client->PutObject(request));
@@ -346,26 +343,34 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1)
.count();
- printInfo(format("uploaded 's3://%1%/%2%' (%3% bytes) in %4% ms") %
- bucketName % path % data.size() % duration);
+ auto size = istream->tellg();
+
+ printInfo("uploaded 's3://%s/%s' (%d bytes) in %d ms",
+ bucketName, path, size, duration);
stats.putTimeMs += duration;
- stats.putBytes += data.size();
+ stats.putBytes += size;
stats.put++;
}
- void upsertFile(const std::string & path, Source & source,
+ void upsertFile(const std::string & path,
+ std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType) override
{
- auto data = source.drain();
+ auto compress = [&](std::string compression)
+ {
+ auto compressed = nix::compress(compression, StreamToSourceAdapter(istream).drain());
+ return std::make_shared<std::stringstream>(std::move(*compressed));
+ };
+
if (narinfoCompression != "" && hasSuffix(path, ".narinfo"))
- uploadFile(path, *compress(narinfoCompression, data), mimeType, narinfoCompression);
+ uploadFile(path, compress(narinfoCompression), mimeType, narinfoCompression);
else if (lsCompression != "" && hasSuffix(path, ".ls"))
- uploadFile(path, *compress(lsCompression, data), mimeType, lsCompression);
+ uploadFile(path, compress(lsCompression), mimeType, lsCompression);
else if (logCompression != "" && hasPrefix(path, "log/"))
- uploadFile(path, *compress(logCompression, data), mimeType, logCompression);
+ uploadFile(path, compress(logCompression), mimeType, logCompression);
else
- uploadFile(path, data, mimeType, "");
+ uploadFile(path, istream, mimeType, "");
}
void getFile(const std::string & path, Sink & sink) override
diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh
index 84a4eb001..8386a4991 100644
--- a/src/libutil/serialise.hh
+++ b/src/libutil/serialise.hh
@@ -349,4 +349,27 @@ Source & operator >> (Source & in, bool & b)
}
+/* An adapter that converts a std::basic_istream into a source. */
+struct StreamToSourceAdapter : Source
+{
+ std::shared_ptr<std::basic_istream<char>> istream;
+
+ StreamToSourceAdapter(std::shared_ptr<std::basic_istream<char>> istream)
+ : istream(istream)
+ { }
+
+ size_t read(unsigned char * data, size_t len) override
+ {
+ if (!istream->read((char *) data, len)) {
+ if (istream->eof()) {
+ if (istream->gcount() == 0)
+ throw EndOfFile("end of file");
+ } else
+ throw Error("I/O error in StreamToSourceAdapter");
+ }
+ return istream->gcount();
+ }
+};
+
+
}