aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/libstore/local-store.cc16
-rw-r--r--src/libstore/local-store.hh2
-rw-r--r--src/libutil/serialise.cc33
-rw-r--r--src/libutil/serialise.hh11
4 files changed, 40 insertions, 22 deletions
diff --git a/src/libstore/local-store.cc b/src/libstore/local-store.cc
index 925ac25bf..dac7a50c4 100644
--- a/src/libstore/local-store.cc
+++ b/src/libstore/local-store.cc
@@ -1036,11 +1036,13 @@ void LocalStore::addToStore(const ValidPathInfo & info, Source & source,
StorePath LocalStore::addToStoreFromDump(Source & dump, const string & name,
FileIngestionMethod method, HashType hashAlgo, RepairFlag repair)
{
- return addToStoreCommon(name, method, hashAlgo, repair, [&](auto & sink) {
+ return addToStoreCommon(name, method, hashAlgo, repair, [&](auto & sink, size_t & wanted) {
while (1) {
- uint8_t buf[1];
- auto n = dump.read(buf, 1);
+ constexpr size_t bufSize = 1024;
+ uint8_t buf[bufSize];
+ auto n = dump.read(buf, std::min(wanted, bufSize));
sink(buf, n);
+ // when control is yielded back to us wanted will be updated.
}
});
}
@@ -1051,7 +1053,7 @@ StorePath LocalStore::addToStore(const string & name, const Path & _srcPath,
{
Path srcPath(absPath(_srcPath));
- return addToStoreCommon(name, method, hashAlgo, repair, [&](auto & sink) {
+ return addToStoreCommon(name, method, hashAlgo, repair, [&](auto & sink, size_t & _) {
if (method == FileIngestionMethod::Recursive)
dumpPath(srcPath, sink, filter);
else
@@ -1062,7 +1064,7 @@ StorePath LocalStore::addToStore(const string & name, const Path & _srcPath,
StorePath LocalStore::addToStoreCommon(
const string & name, FileIngestionMethod method, HashType hashAlgo, RepairFlag repair,
- std::function<void(Sink &)> demux)
+ std::function<void(Sink &, size_t &)> demux)
{
/* For computing the NAR hash. */
auto sha256Sink = std::make_unique<HashSink>(htSHA256);
@@ -1083,7 +1085,7 @@ StorePath LocalStore::addToStoreCommon(
bool inMemory = true;
std::string nar;
- auto source = sinkToSource([&](Sink & sink) {
+ auto source = sinkToSource([&](Sink & sink, size_t & wanted) {
LambdaSink sink2([&](const unsigned char * buf, size_t len) {
(*sha256Sink)(buf, len);
if (hashSink) (*hashSink)(buf, len);
@@ -1101,7 +1103,7 @@ StorePath LocalStore::addToStoreCommon(
if (!inMemory) sink(buf, len);
});
- demux(sink2);
+ demux(sink2, wanted);
});
std::unique_ptr<AutoDelete> delTempDir;
diff --git a/src/libstore/local-store.hh b/src/libstore/local-store.hh
index 215731f87..ae23004c4 100644
--- a/src/libstore/local-store.hh
+++ b/src/libstore/local-store.hh
@@ -292,7 +292,7 @@ private:
StorePath addToStoreCommon(
const string & name, FileIngestionMethod method, HashType hashAlgo, RepairFlag repair,
- std::function<void(Sink &)> demux);
+ std::function<void(Sink &, size_t &)> demux);
Path getRealStoreDir() override { return realStoreDir; }
diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc
index c8b71188f..141e9e976 100644
--- a/src/libutil/serialise.cc
+++ b/src/libutil/serialise.cc
@@ -165,35 +165,43 @@ size_t StringSource::read(unsigned char * data, size_t len)
#endif
std::unique_ptr<Source> sinkToSource(
- std::function<void(Sink &)> fun,
+ std::function<void(Sink &, size_t &)> fun,
std::function<void()> eof)
{
struct SinkToSource : Source
{
- typedef boost::coroutines2::coroutine<std::string> coro_t;
+ typedef boost::coroutines2::coroutine<std::basic_string<uint8_t>> coro_t;
- std::function<void(Sink &)> fun;
+ std::function<void(Sink &, size_t &)> fun;
std::function<void()> eof;
std::optional<coro_t::pull_type> coro;
bool started = false;
- SinkToSource(std::function<void(Sink &)> fun, std::function<void()> eof)
+ /* It would be nicer to have the co-routines have both args and a
+ return value, but unfortunately that was removed from Boost's
+ implementation for some reason, so we use some extra state instead.
+ */
+ size_t wanted = 0;
+
+ SinkToSource(std::function<void(Sink &, size_t &)> fun, std::function<void()> eof)
: fun(fun), eof(eof)
{
}
- std::string cur;
+ std::basic_string<uint8_t> cur;
size_t pos = 0;
size_t read(unsigned char * data, size_t len) override
{
- if (!coro)
+ wanted = len < cur.size() ? 0 : len - cur.size();
+ if (!coro) {
coro = coro_t::pull_type([&](coro_t::push_type & yield) {
- LambdaSink sink([&](const unsigned char * data, size_t len) {
- if (len) yield(std::string((const char *) data, len));
+ LambdaSink sink([&](const uint8_t * data, size_t len) {
+ if (len) yield(std::basic_string<uint8_t> { data, len });
});
- fun(sink);
+ fun(sink, wanted);
});
+ }
if (!*coro) { eof(); abort(); }
@@ -203,11 +211,10 @@ std::unique_ptr<Source> sinkToSource(
pos = 0;
}
- auto n = std::min(cur.size() - pos, len);
- memcpy(data, (unsigned char *) cur.data() + pos, n);
- pos += n;
+ auto numCopied = cur.copy(data, len, pos);
+ pos += numCopied;
- return n;
+ return numCopied;
}
};
diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh
index 8386a4991..6cb9d1bf5 100644
--- a/src/libutil/serialise.hh
+++ b/src/libutil/serialise.hh
@@ -260,11 +260,20 @@ struct LambdaSource : Source
/* Convert a function that feeds data into a Sink into a Source. The
Source executes the function as a coroutine. */
std::unique_ptr<Source> sinkToSource(
- std::function<void(Sink &)> fun,
+ std::function<void(Sink &, size_t &)> fun,
std::function<void()> eof = []() {
throw EndOfFile("coroutine has finished");
});
+static inline std::unique_ptr<Source> sinkToSource(
+ std::function<void(Sink &)> fun,
+ std::function<void()> eof = []() {
+ throw EndOfFile("coroutine has finished");
+ })
+{
+ return sinkToSource([fun](Sink & s, size_t & _) { fun(s); }, eof);
+}
+
void writePadding(size_t len, Sink & sink);
void writeString(const unsigned char * buf, size_t len, Sink & sink);