diff options
Diffstat (limited to 'src/libstore')
-rw-r--r-- | src/libstore/daemon.cc | 90 | ||||
-rw-r--r-- | src/libstore/remote-store.cc | 81 | ||||
-rw-r--r-- | src/libstore/s3-binary-cache-store.cc | 7 | ||||
-rw-r--r-- | src/libstore/s3-binary-cache-store.hh | 1 | ||||
-rw-r--r-- | src/libstore/store-api.cc | 24 | ||||
-rw-r--r-- | src/libstore/worker-protocol.hh | 2 |
6 files changed, 167 insertions, 38 deletions
diff --git a/src/libstore/daemon.cc b/src/libstore/daemon.cc index 503e04f92..478ae39ca 100644 --- a/src/libstore/daemon.cc +++ b/src/libstore/daemon.cc @@ -86,7 +86,7 @@ struct TunnelLogger : public Logger } /* startWork() means that we're starting an operation for which we - want to send out stderr to the client. */ + want to send out stderr to the client. */ void startWork() { auto state(state_.lock()); @@ -703,24 +703,84 @@ static void performOp(TunnelLogger * logger, ref<Store> store, if (!trusted) info.ultimate = false; - std::unique_ptr<Source> source; - if (GET_PROTOCOL_MINOR(clientVersion) >= 21) - source = std::make_unique<TunnelSource>(from, to); - else { - StringSink saved; - TeeSource tee { from, saved }; - ParseSink ether; - parseDump(ether, tee); - source = std::make_unique<StringSource>(std::move(*saved.s)); + if (GET_PROTOCOL_MINOR(clientVersion) >= 23) { + + struct FramedSource : Source + { + Source & from; + bool eof = false; + std::vector<unsigned char> pending; + size_t pos = 0; + + FramedSource(Source & from) : from(from) + { } + + ~FramedSource() + { + if (!eof) { + while (true) { + auto n = readInt(from); + if (!n) break; + std::vector<unsigned char> data(n); + from(data.data(), n); + } + } + } + + size_t read(unsigned char * data, size_t len) override + { + if (eof) throw EndOfFile("reached end of FramedSource"); + + if (pos >= pending.size()) { + size_t len = readInt(from); + if (!len) { + eof = true; + return 0; + } + pending = std::vector<unsigned char>(len); + pos = 0; + from(pending.data(), len); + } + + auto n = std::min(len, pending.size() - pos); + memcpy(data, pending.data() + pos, n); + pos += n; + return n; + } + }; + + logger->startWork(); + + { + FramedSource source(from); + store->addToStore(info, source, (RepairFlag) repair, + dontCheckSigs ? NoCheckSigs : CheckSigs); + } + + logger->stopWork(); } - logger->startWork(); + else { + std::unique_ptr<Source> source; + if (GET_PROTOCOL_MINOR(clientVersion) >= 21) + source = std::make_unique<TunnelSource>(from, to); + else { + StringSink saved; + TeeSource tee { from, saved }; + ParseSink ether; + parseDump(ether, tee); + source = std::make_unique<StringSource>(std::move(*saved.s)); + } - // FIXME: race if addToStore doesn't read source? - store->addToStore(info, *source, (RepairFlag) repair, - dontCheckSigs ? NoCheckSigs : CheckSigs); + logger->startWork(); + + // FIXME: race if addToStore doesn't read source? + store->addToStore(info, *source, (RepairFlag) repair, + dontCheckSigs ? NoCheckSigs : CheckSigs); + + logger->stopWork(); + } - logger->stopWork(); break; } diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index 8d01c6667..4ed81d9d8 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -503,9 +503,84 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source, conn->to << info.registrationTime << info.narSize << info.ultimate << info.sigs << renderContentAddress(info.ca) << repair << !checkSigs; - bool tunnel = GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21; - if (!tunnel) copyNAR(source, conn->to); - conn.processStderr(0, tunnel ? &source : nullptr); + + if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 23) { + + std::exception_ptr ex; + + struct FramedSink : BufferedSink + { + ConnectionHandle & conn; + std::exception_ptr & ex; + + FramedSink(ConnectionHandle & conn, std::exception_ptr & ex) : conn(conn), ex(ex) + { } + + ~FramedSink() + { + try { + conn->to << 0; + conn->to.flush(); + } catch (...) { + ignoreException(); + } + } + + void write(const unsigned char * data, size_t len) override + { + /* Don't send more data if the remote has + encountered an error. */ + if (ex) { + auto ex2 = ex; + ex = nullptr; + std::rethrow_exception(ex2); + } + conn->to << len; + conn->to(data, len); + }; + }; + + /* Handle log messages / exceptions from the remote on a + separate thread. */ + std::thread stderrThread([&]() + { + try { + conn.processStderr(0, nullptr); + } catch (...) { + ex = std::current_exception(); + } + }); + + Finally joinStderrThread([&]() + { + if (stderrThread.joinable()) { + stderrThread.join(); + if (ex) { + try { + std::rethrow_exception(ex); + } catch (...) { + ignoreException(); + } + } + } + }); + + { + FramedSink sink(conn, ex); + copyNAR(source, sink); + sink.flush(); + } + + stderrThread.join(); + if (ex) + std::rethrow_exception(ex); + + } else if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21) { + conn.processStderr(0, &source); + } else { + copyNAR(source, conn->to); + conn.processStderr(0, nullptr); + } } } diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index 1b7dff085..67935f3ba 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -343,13 +343,10 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1) .count(); - auto size = istream->tellg(); - - printInfo("uploaded 's3://%s/%s' (%d bytes) in %d ms", - bucketName, path, size, duration); + printInfo("uploaded 's3://%s/%s' in %d ms", + bucketName, path, duration); stats.putTimeMs += duration; - stats.putBytes += size; stats.put++; } diff --git a/src/libstore/s3-binary-cache-store.hh b/src/libstore/s3-binary-cache-store.hh index 4d43fe4d2..b2b75d498 100644 --- a/src/libstore/s3-binary-cache-store.hh +++ b/src/libstore/s3-binary-cache-store.hh @@ -19,7 +19,6 @@ public: struct Stats { std::atomic<uint64_t> put{0}; - std::atomic<uint64_t> putBytes{0}; std::atomic<uint64_t> putTimeMs{0}; std::atomic<uint64_t> get{0}; std::atomic<uint64_t> getBytes{0}; diff --git a/src/libstore/store-api.cc b/src/libstore/store-api.cc index b6e6952b7..c804399d2 100644 --- a/src/libstore/store-api.cc +++ b/src/libstore/store-api.cc @@ -729,9 +729,9 @@ void copyPaths(ref<Store> srcStore, ref<Store> dstStore, const StorePathSet & st { auto valid = dstStore->queryValidPaths(storePaths, substitute); - PathSet missing; + StorePathSet missing; for (auto & path : storePaths) - if (!valid.count(path)) missing.insert(srcStore->printStorePath(path)); + if (!valid.count(path)) missing.insert(path); if (missing.empty()) return; @@ -748,29 +748,27 @@ void copyPaths(ref<Store> srcStore, ref<Store> dstStore, const StorePathSet & st ThreadPool pool; - processGraph<Path>(pool, - PathSet(missing.begin(), missing.end()), + processGraph<StorePath>(pool, + StorePathSet(missing.begin(), missing.end()), - [&](const Path & storePath) { - if (dstStore->isValidPath(dstStore->parseStorePath(storePath))) { + [&](const StorePath & storePath) { + if (dstStore->isValidPath(storePath)) { nrDone++; showProgress(); - return PathSet(); + return StorePathSet(); } - auto info = srcStore->queryPathInfo(srcStore->parseStorePath(storePath)); + auto info = srcStore->queryPathInfo(storePath); bytesExpected += info->narSize; act.setExpected(actCopyPath, bytesExpected); - return srcStore->printStorePathSet(info->references); + return info->references; }, - [&](const Path & storePathS) { + [&](const StorePath & storePath) { checkInterrupt(); - auto storePath = dstStore->parseStorePath(storePathS); - if (!dstStore->isValidPath(storePath)) { MaintainCount<decltype(nrRunning)> mc(nrRunning); showProgress(); @@ -780,7 +778,7 @@ void copyPaths(ref<Store> srcStore, ref<Store> dstStore, const StorePathSet & st nrFailed++; if (!settings.keepGoing) throw e; - logger->log(lvlError, fmt("could not copy %s: %s", storePathS, e.what())); + logger->log(lvlError, fmt("could not copy %s: %s", dstStore->printStorePath(storePath), e.what())); showProgress(); return; } diff --git a/src/libstore/worker-protocol.hh b/src/libstore/worker-protocol.hh index 8b538f6da..dcba73116 100644 --- a/src/libstore/worker-protocol.hh +++ b/src/libstore/worker-protocol.hh @@ -6,7 +6,7 @@ namespace nix { #define WORKER_MAGIC_1 0x6e697863 #define WORKER_MAGIC_2 0x6478696f -#define PROTOCOL_VERSION 0x116 +#define PROTOCOL_VERSION 0x117 #define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00) #define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff) |