aboutsummaryrefslogtreecommitdiff
path: root/src/libstore
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstore')
-rw-r--r--src/libstore/daemon.cc90
-rw-r--r--src/libstore/remote-store.cc81
-rw-r--r--src/libstore/s3-binary-cache-store.cc7
-rw-r--r--src/libstore/s3-binary-cache-store.hh1
-rw-r--r--src/libstore/store-api.cc24
-rw-r--r--src/libstore/worker-protocol.hh2
6 files changed, 167 insertions, 38 deletions
diff --git a/src/libstore/daemon.cc b/src/libstore/daemon.cc
index 503e04f92..478ae39ca 100644
--- a/src/libstore/daemon.cc
+++ b/src/libstore/daemon.cc
@@ -86,7 +86,7 @@ struct TunnelLogger : public Logger
}
/* startWork() means that we're starting an operation for which we
- want to send out stderr to the client. */
+ want to send out stderr to the client. */
void startWork()
{
auto state(state_.lock());
@@ -703,24 +703,84 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
if (!trusted)
info.ultimate = false;
- std::unique_ptr<Source> source;
- if (GET_PROTOCOL_MINOR(clientVersion) >= 21)
- source = std::make_unique<TunnelSource>(from, to);
- else {
- StringSink saved;
- TeeSource tee { from, saved };
- ParseSink ether;
- parseDump(ether, tee);
- source = std::make_unique<StringSource>(std::move(*saved.s));
+ if (GET_PROTOCOL_MINOR(clientVersion) >= 23) {
+
+ struct FramedSource : Source
+ {
+ Source & from;
+ bool eof = false;
+ std::vector<unsigned char> pending;
+ size_t pos = 0;
+
+ FramedSource(Source & from) : from(from)
+ { }
+
+ ~FramedSource()
+ {
+ if (!eof) {
+ while (true) {
+ auto n = readInt(from);
+ if (!n) break;
+ std::vector<unsigned char> data(n);
+ from(data.data(), n);
+ }
+ }
+ }
+
+ size_t read(unsigned char * data, size_t len) override
+ {
+ if (eof) throw EndOfFile("reached end of FramedSource");
+
+ if (pos >= pending.size()) {
+ size_t len = readInt(from);
+ if (!len) {
+ eof = true;
+ return 0;
+ }
+ pending = std::vector<unsigned char>(len);
+ pos = 0;
+ from(pending.data(), len);
+ }
+
+ auto n = std::min(len, pending.size() - pos);
+ memcpy(data, pending.data() + pos, n);
+ pos += n;
+ return n;
+ }
+ };
+
+ logger->startWork();
+
+ {
+ FramedSource source(from);
+ store->addToStore(info, source, (RepairFlag) repair,
+ dontCheckSigs ? NoCheckSigs : CheckSigs);
+ }
+
+ logger->stopWork();
}
- logger->startWork();
+ else {
+ std::unique_ptr<Source> source;
+ if (GET_PROTOCOL_MINOR(clientVersion) >= 21)
+ source = std::make_unique<TunnelSource>(from, to);
+ else {
+ StringSink saved;
+ TeeSource tee { from, saved };
+ ParseSink ether;
+ parseDump(ether, tee);
+ source = std::make_unique<StringSource>(std::move(*saved.s));
+ }
- // FIXME: race if addToStore doesn't read source?
- store->addToStore(info, *source, (RepairFlag) repair,
- dontCheckSigs ? NoCheckSigs : CheckSigs);
+ logger->startWork();
+
+ // FIXME: race if addToStore doesn't read source?
+ store->addToStore(info, *source, (RepairFlag) repair,
+ dontCheckSigs ? NoCheckSigs : CheckSigs);
+
+ logger->stopWork();
+ }
- logger->stopWork();
break;
}
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc
index 8d01c6667..4ed81d9d8 100644
--- a/src/libstore/remote-store.cc
+++ b/src/libstore/remote-store.cc
@@ -503,9 +503,84 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
conn->to << info.registrationTime << info.narSize
<< info.ultimate << info.sigs << renderContentAddress(info.ca)
<< repair << !checkSigs;
- bool tunnel = GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21;
- if (!tunnel) copyNAR(source, conn->to);
- conn.processStderr(0, tunnel ? &source : nullptr);
+
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 23) {
+
+ std::exception_ptr ex;
+
+ struct FramedSink : BufferedSink
+ {
+ ConnectionHandle & conn;
+ std::exception_ptr & ex;
+
+ FramedSink(ConnectionHandle & conn, std::exception_ptr & ex) : conn(conn), ex(ex)
+ { }
+
+ ~FramedSink()
+ {
+ try {
+ conn->to << 0;
+ conn->to.flush();
+ } catch (...) {
+ ignoreException();
+ }
+ }
+
+ void write(const unsigned char * data, size_t len) override
+ {
+ /* Don't send more data if the remote has
+ encountered an error. */
+ if (ex) {
+ auto ex2 = ex;
+ ex = nullptr;
+ std::rethrow_exception(ex2);
+ }
+ conn->to << len;
+ conn->to(data, len);
+ };
+ };
+
+ /* Handle log messages / exceptions from the remote on a
+ separate thread. */
+ std::thread stderrThread([&]()
+ {
+ try {
+ conn.processStderr(0, nullptr);
+ } catch (...) {
+ ex = std::current_exception();
+ }
+ });
+
+ Finally joinStderrThread([&]()
+ {
+ if (stderrThread.joinable()) {
+ stderrThread.join();
+ if (ex) {
+ try {
+ std::rethrow_exception(ex);
+ } catch (...) {
+ ignoreException();
+ }
+ }
+ }
+ });
+
+ {
+ FramedSink sink(conn, ex);
+ copyNAR(source, sink);
+ sink.flush();
+ }
+
+ stderrThread.join();
+ if (ex)
+ std::rethrow_exception(ex);
+
+ } else if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21) {
+ conn.processStderr(0, &source);
+ } else {
+ copyNAR(source, conn->to);
+ conn.processStderr(0, nullptr);
+ }
}
}
diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc
index 1b7dff085..67935f3ba 100644
--- a/src/libstore/s3-binary-cache-store.cc
+++ b/src/libstore/s3-binary-cache-store.cc
@@ -343,13 +343,10 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1)
.count();
- auto size = istream->tellg();
-
- printInfo("uploaded 's3://%s/%s' (%d bytes) in %d ms",
- bucketName, path, size, duration);
+ printInfo("uploaded 's3://%s/%s' in %d ms",
+ bucketName, path, duration);
stats.putTimeMs += duration;
- stats.putBytes += size;
stats.put++;
}
diff --git a/src/libstore/s3-binary-cache-store.hh b/src/libstore/s3-binary-cache-store.hh
index 4d43fe4d2..b2b75d498 100644
--- a/src/libstore/s3-binary-cache-store.hh
+++ b/src/libstore/s3-binary-cache-store.hh
@@ -19,7 +19,6 @@ public:
struct Stats
{
std::atomic<uint64_t> put{0};
- std::atomic<uint64_t> putBytes{0};
std::atomic<uint64_t> putTimeMs{0};
std::atomic<uint64_t> get{0};
std::atomic<uint64_t> getBytes{0};
diff --git a/src/libstore/store-api.cc b/src/libstore/store-api.cc
index b6e6952b7..c804399d2 100644
--- a/src/libstore/store-api.cc
+++ b/src/libstore/store-api.cc
@@ -729,9 +729,9 @@ void copyPaths(ref<Store> srcStore, ref<Store> dstStore, const StorePathSet & st
{
auto valid = dstStore->queryValidPaths(storePaths, substitute);
- PathSet missing;
+ StorePathSet missing;
for (auto & path : storePaths)
- if (!valid.count(path)) missing.insert(srcStore->printStorePath(path));
+ if (!valid.count(path)) missing.insert(path);
if (missing.empty()) return;
@@ -748,29 +748,27 @@ void copyPaths(ref<Store> srcStore, ref<Store> dstStore, const StorePathSet & st
ThreadPool pool;
- processGraph<Path>(pool,
- PathSet(missing.begin(), missing.end()),
+ processGraph<StorePath>(pool,
+ StorePathSet(missing.begin(), missing.end()),
- [&](const Path & storePath) {
- if (dstStore->isValidPath(dstStore->parseStorePath(storePath))) {
+ [&](const StorePath & storePath) {
+ if (dstStore->isValidPath(storePath)) {
nrDone++;
showProgress();
- return PathSet();
+ return StorePathSet();
}
- auto info = srcStore->queryPathInfo(srcStore->parseStorePath(storePath));
+ auto info = srcStore->queryPathInfo(storePath);
bytesExpected += info->narSize;
act.setExpected(actCopyPath, bytesExpected);
- return srcStore->printStorePathSet(info->references);
+ return info->references;
},
- [&](const Path & storePathS) {
+ [&](const StorePath & storePath) {
checkInterrupt();
- auto storePath = dstStore->parseStorePath(storePathS);
-
if (!dstStore->isValidPath(storePath)) {
MaintainCount<decltype(nrRunning)> mc(nrRunning);
showProgress();
@@ -780,7 +778,7 @@ void copyPaths(ref<Store> srcStore, ref<Store> dstStore, const StorePathSet & st
nrFailed++;
if (!settings.keepGoing)
throw e;
- logger->log(lvlError, fmt("could not copy %s: %s", storePathS, e.what()));
+ logger->log(lvlError, fmt("could not copy %s: %s", dstStore->printStorePath(storePath), e.what()));
showProgress();
return;
}
diff --git a/src/libstore/worker-protocol.hh b/src/libstore/worker-protocol.hh
index 8b538f6da..dcba73116 100644
--- a/src/libstore/worker-protocol.hh
+++ b/src/libstore/worker-protocol.hh
@@ -6,7 +6,7 @@ namespace nix {
#define WORKER_MAGIC_1 0x6e697863
#define WORKER_MAGIC_2 0x6478696f
-#define PROTOCOL_VERSION 0x116
+#define PROTOCOL_VERSION 0x117
#define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00)
#define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff)