From 4c0077a07d2ee5362bd8ffb0ea9ee053c759303d Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Wed, 29 Jul 2020 00:48:39 +0200 Subject: Fix RemoteStore::addToStore() latency Since 6185d25e523a3cd223dd6f6aca10cf6ff15b4823, this was very latency-bound since it required a round-trip for every 32 KiB. So for example copying a 514 MiB closure over a virtual ethernet device with a articial delay of just 1 ms took 343s. Now it takes 2.7s. Fixes #3372. --- src/libstore/daemon.cc | 90 +++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 75 insertions(+), 15 deletions(-) (limited to 'src/libstore/daemon.cc') diff --git a/src/libstore/daemon.cc b/src/libstore/daemon.cc index 503e04f92..478ae39ca 100644 --- a/src/libstore/daemon.cc +++ b/src/libstore/daemon.cc @@ -86,7 +86,7 @@ struct TunnelLogger : public Logger } /* startWork() means that we're starting an operation for which we - want to send out stderr to the client. */ + want to send out stderr to the client. */ void startWork() { auto state(state_.lock()); @@ -703,24 +703,84 @@ static void performOp(TunnelLogger * logger, ref store, if (!trusted) info.ultimate = false; - std::unique_ptr source; - if (GET_PROTOCOL_MINOR(clientVersion) >= 21) - source = std::make_unique(from, to); - else { - StringSink saved; - TeeSource tee { from, saved }; - ParseSink ether; - parseDump(ether, tee); - source = std::make_unique(std::move(*saved.s)); + if (GET_PROTOCOL_MINOR(clientVersion) >= 23) { + + struct FramedSource : Source + { + Source & from; + bool eof = false; + std::vector pending; + size_t pos = 0; + + FramedSource(Source & from) : from(from) + { } + + ~FramedSource() + { + if (!eof) { + while (true) { + auto n = readInt(from); + if (!n) break; + std::vector data(n); + from(data.data(), n); + } + } + } + + size_t read(unsigned char * data, size_t len) override + { + if (eof) throw EndOfFile("reached end of FramedSource"); + + if (pos >= pending.size()) { + size_t len = readInt(from); + if (!len) { + eof = true; + return 0; + } + pending = std::vector(len); + pos = 0; + from(pending.data(), len); + } + + auto n = std::min(len, pending.size() - pos); + memcpy(data, pending.data() + pos, n); + pos += n; + return n; + } + }; + + logger->startWork(); + + { + FramedSource source(from); + store->addToStore(info, source, (RepairFlag) repair, + dontCheckSigs ? NoCheckSigs : CheckSigs); + } + + logger->stopWork(); } - logger->startWork(); + else { + std::unique_ptr source; + if (GET_PROTOCOL_MINOR(clientVersion) >= 21) + source = std::make_unique(from, to); + else { + StringSink saved; + TeeSource tee { from, saved }; + ParseSink ether; + parseDump(ether, tee); + source = std::make_unique(std::move(*saved.s)); + } - // FIXME: race if addToStore doesn't read source? - store->addToStore(info, *source, (RepairFlag) repair, - dontCheckSigs ? NoCheckSigs : CheckSigs); + logger->startWork(); + + // FIXME: race if addToStore doesn't read source? + store->addToStore(info, *source, (RepairFlag) repair, + dontCheckSigs ? NoCheckSigs : CheckSigs); + + logger->stopWork(); + } - logger->stopWork(); break; } -- cgit v1.2.3