aboutsummaryrefslogtreecommitdiff
path: root/src/libstore/daemon.cc
diff options
context:
space:
mode:
authorEelco Dolstra <edolstra@gmail.com>2020-07-29 00:48:39 +0200
committerEelco Dolstra <edolstra@gmail.com>2020-07-29 00:48:39 +0200
commit4c0077a07d2ee5362bd8ffb0ea9ee053c759303d (patch)
tree16406f4c2d649ad65c534a0495e1c2a3fc056416 /src/libstore/daemon.cc
parentc159f48a39835d5b2fe7c1ddd4467bc093ee251f (diff)
Fix RemoteStore::addToStore() latency
Since 6185d25e523a3cd223dd6f6aca10cf6ff15b4823, this was very latency-bound since it required a round-trip for every 32 KiB. So for example copying a 514 MiB closure over a virtual ethernet device with a articial delay of just 1 ms took 343s. Now it takes 2.7s. Fixes #3372.
Diffstat (limited to 'src/libstore/daemon.cc')
-rw-r--r--src/libstore/daemon.cc90
1 files changed, 75 insertions, 15 deletions
diff --git a/src/libstore/daemon.cc b/src/libstore/daemon.cc
index 503e04f92..478ae39ca 100644
--- a/src/libstore/daemon.cc
+++ b/src/libstore/daemon.cc
@@ -86,7 +86,7 @@ struct TunnelLogger : public Logger
}
/* startWork() means that we're starting an operation for which we
- want to send out stderr to the client. */
+ want to send out stderr to the client. */
void startWork()
{
auto state(state_.lock());
@@ -703,24 +703,84 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
if (!trusted)
info.ultimate = false;
- std::unique_ptr<Source> source;
- if (GET_PROTOCOL_MINOR(clientVersion) >= 21)
- source = std::make_unique<TunnelSource>(from, to);
- else {
- StringSink saved;
- TeeSource tee { from, saved };
- ParseSink ether;
- parseDump(ether, tee);
- source = std::make_unique<StringSource>(std::move(*saved.s));
+ if (GET_PROTOCOL_MINOR(clientVersion) >= 23) {
+
+ struct FramedSource : Source
+ {
+ Source & from;
+ bool eof = false;
+ std::vector<unsigned char> pending;
+ size_t pos = 0;
+
+ FramedSource(Source & from) : from(from)
+ { }
+
+ ~FramedSource()
+ {
+ if (!eof) {
+ while (true) {
+ auto n = readInt(from);
+ if (!n) break;
+ std::vector<unsigned char> data(n);
+ from(data.data(), n);
+ }
+ }
+ }
+
+ size_t read(unsigned char * data, size_t len) override
+ {
+ if (eof) throw EndOfFile("reached end of FramedSource");
+
+ if (pos >= pending.size()) {
+ size_t len = readInt(from);
+ if (!len) {
+ eof = true;
+ return 0;
+ }
+ pending = std::vector<unsigned char>(len);
+ pos = 0;
+ from(pending.data(), len);
+ }
+
+ auto n = std::min(len, pending.size() - pos);
+ memcpy(data, pending.data() + pos, n);
+ pos += n;
+ return n;
+ }
+ };
+
+ logger->startWork();
+
+ {
+ FramedSource source(from);
+ store->addToStore(info, source, (RepairFlag) repair,
+ dontCheckSigs ? NoCheckSigs : CheckSigs);
+ }
+
+ logger->stopWork();
}
- logger->startWork();
+ else {
+ std::unique_ptr<Source> source;
+ if (GET_PROTOCOL_MINOR(clientVersion) >= 21)
+ source = std::make_unique<TunnelSource>(from, to);
+ else {
+ StringSink saved;
+ TeeSource tee { from, saved };
+ ParseSink ether;
+ parseDump(ether, tee);
+ source = std::make_unique<StringSource>(std::move(*saved.s));
+ }
- // FIXME: race if addToStore doesn't read source?
- store->addToStore(info, *source, (RepairFlag) repair,
- dontCheckSigs ? NoCheckSigs : CheckSigs);
+ logger->startWork();
+
+ // FIXME: race if addToStore doesn't read source?
+ store->addToStore(info, *source, (RepairFlag) repair,
+ dontCheckSigs ? NoCheckSigs : CheckSigs);
+
+ logger->stopWork();
+ }
- logger->stopWork();
break;
}