aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/libstore/build.cc8
-rw-r--r--src/libstore/content-address.cc62
-rw-r--r--src/libstore/content-address.hh19
-rw-r--r--src/libstore/daemon.cc176
-rw-r--r--src/libstore/remote-store.cc276
-rw-r--r--src/libstore/remote-store.hh13
-rw-r--r--src/libstore/store-api.hh3
-rw-r--r--src/libstore/worker-protocol.hh4
-rw-r--r--src/libutil/serialise.cc13
-rw-r--r--src/libutil/serialise.hh91
-rw-r--r--src/libutil/split.hh2
11 files changed, 413 insertions, 254 deletions
diff --git a/src/libstore/build.cc b/src/libstore/build.cc
index 0b51d90ea..07c5bceb2 100644
--- a/src/libstore/build.cc
+++ b/src/libstore/build.cc
@@ -2950,14 +2950,6 @@ struct RestrictedStore : public LocalFSStore, public virtual RestrictedStoreConf
goal.addDependency(info.path);
}
- StorePath addToStoreFromDump(Source & dump, const string & name,
- FileIngestionMethod method = FileIngestionMethod::Recursive, HashType hashAlgo = htSHA256, RepairFlag repair = NoRepair) override
- {
- auto path = next->addToStoreFromDump(dump, name, method, hashAlgo, repair);
- goal.addDependency(path);
- return path;
- }
-
StorePath addTextToStore(const string & name, const string & s,
const StorePathSet & references, RepairFlag repair = NoRepair) override
{
diff --git a/src/libstore/content-address.cc b/src/libstore/content-address.cc
index 0885c3d0e..74215c545 100644
--- a/src/libstore/content-address.cc
+++ b/src/libstore/content-address.cc
@@ -37,48 +37,86 @@ std::string renderContentAddress(ContentAddress ca) {
}, ca);
}
-ContentAddress parseContentAddress(std::string_view rawCa) {
- auto rest = rawCa;
+std::string renderContentAddressMethod(ContentAddressMethod cam) {
+ return std::visit(overloaded {
+ [](TextHashMethod &th) {
+ return std::string{"text:"} + printHashType(htSHA256);
+ },
+ [](FixedOutputHashMethod &fshm) {
+ return "fixed:" + makeFileIngestionPrefix(fshm.fileIngestionMethod) + printHashType(fshm.hashType);
+ }
+ }, cam);
+}
+
+/*
+ Parses content address strings up to the hash.
+ */
+static ContentAddressMethod parseContentAddressMethodPrefix(std::string_view & rest) {
+ std::string_view wholeInput { rest };
std::string_view prefix;
{
auto optPrefix = splitPrefixTo(rest, ':');
if (!optPrefix)
- throw UsageError("not a content address because it is not in the form '<prefix>:<rest>': %s", rawCa);
+ throw UsageError("not a content address because it is not in the form '<prefix>:<rest>': %s", wholeInput);
prefix = *optPrefix;
}
auto parseHashType_ = [&](){
auto hashTypeRaw = splitPrefixTo(rest, ':');
if (!hashTypeRaw)
- throw UsageError("content address hash must be in form '<algo>:<hash>', but found: %s", rawCa);
+ throw UsageError("content address hash must be in form '<algo>:<hash>', but found: %s", wholeInput);
HashType hashType = parseHashType(*hashTypeRaw);
return std::move(hashType);
};
// Switch on prefix
if (prefix == "text") {
- // No parsing of the method, "text" only support flat.
+ // No parsing of the ingestion method, "text" only support flat.
HashType hashType = parseHashType_();
if (hashType != htSHA256)
throw Error("text content address hash should use %s, but instead uses %s",
printHashType(htSHA256), printHashType(hashType));
- return TextHash {
- .hash = Hash::parseNonSRIUnprefixed(rest, std::move(hashType)),
- };
+ return TextHashMethod {};
} else if (prefix == "fixed") {
// Parse method
auto method = FileIngestionMethod::Flat;
if (splitPrefix(rest, "r:"))
method = FileIngestionMethod::Recursive;
HashType hashType = parseHashType_();
- return FixedOutputHash {
- .method = method,
- .hash = Hash::parseNonSRIUnprefixed(rest, std::move(hashType)),
+ return FixedOutputHashMethod {
+ .fileIngestionMethod = method,
+ .hashType = std::move(hashType),
};
} else
throw UsageError("content address prefix '%s' is unrecognized. Recogonized prefixes are 'text' or 'fixed'", prefix);
-};
+}
+
+ContentAddress parseContentAddress(std::string_view rawCa) {
+ auto rest = rawCa;
+
+ ContentAddressMethod caMethod = parseContentAddressMethodPrefix(rest);
+
+ return std::visit(
+ overloaded {
+ [&](TextHashMethod thm) {
+ return ContentAddress(TextHash {
+ .hash = Hash::parseNonSRIUnprefixed(rest, htSHA256)
+ });
+ },
+ [&](FixedOutputHashMethod fohMethod) {
+ return ContentAddress(FixedOutputHash {
+ .method = fohMethod.fileIngestionMethod,
+ .hash = Hash::parseNonSRIUnprefixed(rest, std::move(fohMethod.hashType)),
+ });
+ },
+ }, caMethod);
+}
+
+ContentAddressMethod parseContentAddressMethod(std::string_view caMethod) {
+ std::string_view asPrefix {std::string{caMethod} + ":"};
+ return parseContentAddressMethodPrefix(asPrefix);
+}
std::optional<ContentAddress> parseContentAddressOpt(std::string_view rawCaOpt) {
return rawCaOpt == "" ? std::optional<ContentAddress> {} : parseContentAddress(rawCaOpt);
diff --git a/src/libstore/content-address.hh b/src/libstore/content-address.hh
index 22a039242..f6a6f5140 100644
--- a/src/libstore/content-address.hh
+++ b/src/libstore/content-address.hh
@@ -55,4 +55,23 @@ std::optional<ContentAddress> parseContentAddressOpt(std::string_view rawCaOpt);
Hash getContentAddressHash(const ContentAddress & ca);
+/*
+ We only have one way to hash text with references, so this is single-value
+ type is only useful in std::variant.
+*/
+struct TextHashMethod { };
+struct FixedOutputHashMethod {
+ FileIngestionMethod fileIngestionMethod;
+ HashType hashType;
+};
+
+typedef std::variant<
+ TextHashMethod,
+ FixedOutputHashMethod
+ > ContentAddressMethod;
+
+ContentAddressMethod parseContentAddressMethod(std::string_view rawCaMethod);
+
+std::string renderContentAddressMethod(ContentAddressMethod caMethod);
+
}
diff --git a/src/libstore/daemon.cc b/src/libstore/daemon.cc
index 8adabf549..8897a73f4 100644
--- a/src/libstore/daemon.cc
+++ b/src/libstore/daemon.cc
@@ -2,7 +2,6 @@
#include "monitor-fd.hh"
#include "worker-protocol.hh"
#include "store-api.hh"
-#include "local-store.hh"
#include "finally.hh"
#include "affinity.hh"
#include "archive.hh"
@@ -240,6 +239,18 @@ struct ClientSettings
}
};
+static void writeValidPathInfo(ref<Store> store, unsigned int clientVersion, Sink & to, std::shared_ptr<const ValidPathInfo> info) {
+ to << (info->deriver ? store->printStorePath(*info->deriver) : "")
+ << info->narHash.to_string(Base16, false);
+ writeStorePaths(*store, to, info->references);
+ to << info->registrationTime << info->narSize;
+ if (GET_PROTOCOL_MINOR(clientVersion) >= 16) {
+ to << info->ultimate
+ << info->sigs
+ << renderContentAddress(info->ca);
+ }
+}
+
static void performOp(TunnelLogger * logger, ref<Store> store,
TrustedFlag trusted, RecursiveFlag recursive, unsigned int clientVersion,
Source & from, BufferedSink & to, unsigned int op)
@@ -350,47 +361,83 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
}
case wopAddToStore: {
- HashType hashAlgo;
- std::string baseName;
- FileIngestionMethod method;
- {
- bool fixed;
- uint8_t recursive;
- std::string hashAlgoRaw;
- from >> baseName >> fixed /* obsolete */ >> recursive >> hashAlgoRaw;
- if (recursive > (uint8_t) FileIngestionMethod::Recursive)
- throw Error("unsupported FileIngestionMethod with value of %i; you may need to upgrade nix-daemon", recursive);
- method = FileIngestionMethod { recursive };
- /* Compatibility hack. */
- if (!fixed) {
- hashAlgoRaw = "sha256";
- method = FileIngestionMethod::Recursive;
+ if (GET_PROTOCOL_MINOR(clientVersion) >= 25) {
+ auto name = readString(from);
+ auto camStr = readString(from);
+ auto refs = readStorePaths<StorePathSet>(*store, from);
+ bool repairBool;
+ from >> repairBool;
+ auto repair = RepairFlag{repairBool};
+
+ logger->startWork();
+ auto pathInfo = [&]() {
+ // NB: FramedSource must be out of scope before logger->stopWork();
+ ContentAddressMethod contentAddressMethod = parseContentAddressMethod(camStr);
+ FramedSource source(from);
+ // TODO this is essentially RemoteStore::addCAToStore. Move it up to Store.
+ return std::visit(overloaded {
+ [&](TextHashMethod &_) {
+ // We could stream this by changing Store
+ std::string contents = source.drain();
+ auto path = store->addTextToStore(name, contents, refs, repair);
+ return store->queryPathInfo(path);
+ },
+ [&](FixedOutputHashMethod &fohm) {
+ if (!refs.empty())
+ throw UnimplementedError("cannot yet have refs with flat or nar-hashed data");
+ auto path = store->addToStoreFromDump(source, name, fohm.fileIngestionMethod, fohm.hashType, repair);
+ return store->queryPathInfo(path);
+ },
+ }, contentAddressMethod);
+ }();
+ logger->stopWork();
+
+ to << store->printStorePath(pathInfo->path);
+ writeValidPathInfo(store, clientVersion, to, pathInfo);
+
+ } else {
+ HashType hashAlgo;
+ std::string baseName;
+ FileIngestionMethod method;
+ {
+ bool fixed;
+ uint8_t recursive;
+ std::string hashAlgoRaw;
+ from >> baseName >> fixed /* obsolete */ >> recursive >> hashAlgoRaw;
+ if (recursive > (uint8_t) FileIngestionMethod::Recursive)
+ throw Error("unsupported FileIngestionMethod with value of %i; you may need to upgrade nix-daemon", recursive);
+ method = FileIngestionMethod { recursive };
+ /* Compatibility hack. */
+ if (!fixed) {
+ hashAlgoRaw = "sha256";
+ method = FileIngestionMethod::Recursive;
+ }
+ hashAlgo = parseHashType(hashAlgoRaw);
}
- hashAlgo = parseHashType(hashAlgoRaw);
- }
- StringSink saved;
- TeeSource savedNARSource(from, saved);
- RetrieveRegularNARSink savedRegular { saved };
+ StringSink saved;
+ TeeSource savedNARSource(from, saved);
+ RetrieveRegularNARSink savedRegular { saved };
- if (method == FileIngestionMethod::Recursive) {
- /* Get the entire NAR dump from the client and save it to
- a string so that we can pass it to
- addToStoreFromDump(). */
- ParseSink sink; /* null sink; just parse the NAR */
- parseDump(sink, savedNARSource);
- } else
- parseDump(savedRegular, from);
+ if (method == FileIngestionMethod::Recursive) {
+ /* Get the entire NAR dump from the client and save it to
+ a string so that we can pass it to
+ addToStoreFromDump(). */
+ ParseSink sink; /* null sink; just parse the NAR */
+ parseDump(sink, savedNARSource);
+ } else
+ parseDump(savedRegular, from);
- logger->startWork();
- if (!savedRegular.regular) throw Error("regular file expected");
+ logger->startWork();
+ if (!savedRegular.regular) throw Error("regular file expected");
- // FIXME: try to stream directly from `from`.
- StringSource dumpSource { *saved.s };
- auto path = store->addToStoreFromDump(dumpSource, baseName, method, hashAlgo);
- logger->stopWork();
+ // FIXME: try to stream directly from `from`.
+ StringSource dumpSource { *saved.s };
+ auto path = store->addToStoreFromDump(dumpSource, baseName, method, hashAlgo);
+ logger->stopWork();
- to << store->printStorePath(path);
+ to << store->printStorePath(path);
+ }
break;
}
@@ -675,15 +722,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
if (info) {
if (GET_PROTOCOL_MINOR(clientVersion) >= 17)
to << 1;
- to << (info->deriver ? store->printStorePath(*info->deriver) : "")
- << info->narHash.to_string(Base16, false);
- writeStorePaths(*store, to, info->references);
- to << info->registrationTime << info->narSize;
- if (GET_PROTOCOL_MINOR(clientVersion) >= 16) {
- to << info->ultimate
- << info->sigs
- << renderContentAddress(info->ca);
- }
+ writeValidPathInfo(store, clientVersion, to, info);
} else {
assert(GET_PROTOCOL_MINOR(clientVersion) >= 17);
to << 0;
@@ -749,59 +788,12 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
info.ultimate = false;
if (GET_PROTOCOL_MINOR(clientVersion) >= 23) {
-
- struct FramedSource : Source
- {
- Source & from;
- bool eof = false;
- std::vector<unsigned char> pending;
- size_t pos = 0;
-
- FramedSource(Source & from) : from(from)
- { }
-
- ~FramedSource()
- {
- if (!eof) {
- while (true) {
- auto n = readInt(from);
- if (!n) break;
- std::vector<unsigned char> data(n);
- from(data.data(), n);
- }
- }
- }
-
- size_t read(unsigned char * data, size_t len) override
- {
- if (eof) throw EndOfFile("reached end of FramedSource");
-
- if (pos >= pending.size()) {
- size_t len = readInt(from);
- if (!len) {
- eof = true;
- return 0;
- }
- pending = std::vector<unsigned char>(len);
- pos = 0;
- from(pending.data(), len);
- }
-
- auto n = std::min(len, pending.size() - pos);
- memcpy(data, pending.data() + pos, n);
- pos += n;
- return n;
- }
- };
-
logger->startWork();
-
{
FramedSource source(from);
store->addToStore(info, source, (RepairFlag) repair,
dontCheckSigs ? NoCheckSigs : CheckSigs);
}
-
logger->stopWork();
}
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc
index 27535f1d0..02a8b72b9 100644
--- a/src/libstore/remote-store.cc
+++ b/src/libstore/remote-store.cc
@@ -307,6 +307,8 @@ struct ConnectionHandle
std::rethrow_exception(ex);
}
}
+
+ void withFramedSink(std::function<void(Sink & sink)> fun);
};
@@ -420,11 +422,27 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S
}
+ref<const ValidPathInfo> RemoteStore::readValidPathInfo(ConnectionHandle & conn, const StorePath & path) {
+ auto deriver = readString(conn->from);
+ auto narHash = Hash::parseAny(readString(conn->from), htSHA256);
+ auto info = make_ref<ValidPathInfo>(path, narHash);
+ if (deriver != "") info->deriver = parseStorePath(deriver);
+ info->references = readStorePaths<StorePathSet>(*this, conn->from);
+ conn->from >> info->registrationTime >> info->narSize;
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) {
+ conn->from >> info->ultimate;
+ info->sigs = readStrings<StringSet>(conn->from);
+ info->ca = parseContentAddressOpt(readString(conn->from));
+ }
+ return info;
+}
+
+
void RemoteStore::queryPathInfoUncached(const StorePath & path,
Callback<std::shared_ptr<const ValidPathInfo>> callback) noexcept
{
try {
- std::shared_ptr<ValidPathInfo> info;
+ std::shared_ptr<const ValidPathInfo> info;
{
auto conn(getConnection());
conn->to << wopQueryPathInfo << printStorePath(path);
@@ -440,17 +458,7 @@ void RemoteStore::queryPathInfoUncached(const StorePath & path,
bool valid; conn->from >> valid;
if (!valid) throw InvalidPath("path '%s' is not valid", printStorePath(path));
}
- auto deriver = readString(conn->from);
- auto narHash = Hash::parseAny(readString(conn->from), htSHA256);
- info = std::make_shared<ValidPathInfo>(path, narHash);
- if (deriver != "") info->deriver = parseStorePath(deriver);
- info->references = readStorePaths<StorePathSet>(*this, conn->from);
- conn->from >> info->registrationTime >> info->narSize;
- if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) {
- conn->from >> info->ultimate;
- info->sigs = readStrings<StringSet>(conn->from);
- info->ca = parseContentAddressOpt(readString(conn->from));
- }
+ info = readValidPathInfo(conn, path);
}
callback(std::move(info));
} catch (...) { callback.rethrow(); }
@@ -525,6 +533,84 @@ std::optional<StorePath> RemoteStore::queryPathFromHashPart(const std::string &
}
+ref<const ValidPathInfo> RemoteStore::addCAToStore(Source & dump, const string & name, ContentAddressMethod caMethod, StorePathSet references, RepairFlag repair)
+{
+ auto conn(getConnection());
+
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 25) {
+
+ conn->to
+ << wopAddToStore
+ << name
+ << renderContentAddressMethod(caMethod);
+ writeStorePaths(*this, conn->to, references);
+ conn->to << repair;
+
+ conn.withFramedSink([&](Sink & sink) {
+ dump.drainInto(sink);
+ });
+
+ auto path = parseStorePath(readString(conn->from));
+ return readValidPathInfo(conn, path);
+ }
+ else {
+ if (repair) throw Error("repairing is not supported when building through the Nix daemon protocol < 1.25");
+
+ std::visit(overloaded {
+ [&](TextHashMethod thm) -> void {
+ std::string s = dump.drain();
+ conn->to << wopAddTextToStore << name << s;
+ writeStorePaths(*this, conn->to, references);
+ conn.processStderr();
+ },
+ [&](FixedOutputHashMethod fohm) -> void {
+ conn->to
+ << wopAddToStore
+ << name
+ << ((fohm.hashType == htSHA256 && fohm.fileIngestionMethod == FileIngestionMethod::Recursive) ? 0 : 1) /* backwards compatibility hack */
+ << (fohm.fileIngestionMethod == FileIngestionMethod::Recursive ? 1 : 0)
+ << printHashType(fohm.hashType);
+
+ try {
+ conn->to.written = 0;
+ conn->to.warn = true;
+ connections->incCapacity();
+ {
+ Finally cleanup([&]() { connections->decCapacity(); });
+ if (fohm.fileIngestionMethod == FileIngestionMethod::Recursive) {
+ dump.drainInto(conn->to);
+ } else {
+ std::string contents = dump.drain();
+ dumpString(contents, conn->to);
+ }
+ }
+ conn->to.warn = false;
+ conn.processStderr();
+ } catch (SysError & e) {
+ /* Daemon closed while we were sending the path. Probably OOM
+ or I/O error. */
+ if (e.errNo == EPIPE)
+ try {
+ conn.processStderr();
+ } catch (EndOfFile & e) { }
+ throw;
+ }
+
+ }
+ }, caMethod);
+ auto path = parseStorePath(readString(conn->from));
+ return queryPathInfo(path);
+ }
+}
+
+StorePath RemoteStore::addToStoreFromDump(Source & dump, const string & name,
+ FileIngestionMethod method, HashType hashType, RepairFlag repair)
+{
+ StorePathSet references;
+ return addCAToStore(dump, name, FixedOutputHashMethod{ .fileIngestionMethod = method, .hashType = hashType }, references, repair)->path;
+}
+
+
void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
RepairFlag repair, CheckSigsFlag checkSigs)
{
@@ -565,78 +651,9 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
<< repair << !checkSigs;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 23) {
-
- conn->to.flush();
-
- std::exception_ptr ex;
-
- struct FramedSink : BufferedSink
- {
- ConnectionHandle & conn;
- std::exception_ptr & ex;
-
- FramedSink(ConnectionHandle & conn, std::exception_ptr & ex) : conn(conn), ex(ex)
- { }
-
- ~FramedSink()
- {
- try {
- conn->to << 0;
- conn->to.flush();
- } catch (...) {
- ignoreException();
- }
- }
-
- void write(const unsigned char * data, size_t len) override
- {
- /* Don't send more data if the remote has
- encountered an error. */
- if (ex) {
- auto ex2 = ex;
- ex = nullptr;
- std::rethrow_exception(ex2);
- }
- conn->to << len;
- conn->to(data, len);
- };
- };
-
- /* Handle log messages / exceptions from the remote on a
- separate thread. */
- std::thread stderrThread([&]()
- {
- try {
- conn.processStderr(nullptr, nullptr, false);
- } catch (...) {
- ex = std::current_exception();
- }
- });
-
- Finally joinStderrThread([&]()
- {
- if (stderrThread.joinable()) {
- stderrThread.join();
- if (ex) {
- try {
- std::rethrow_exception(ex);
- } catch (...) {
- ignoreException();
- }
- }
- }
- });
-
- {
- FramedSink sink(conn, ex);
+ conn.withFramedSink([&](Sink & sink) {
copyNAR(source, sink);
- sink.flush();
- }
-
- stderrThread.join();
- if (ex)
- std::rethrow_exception(ex);
-
+ });
} else if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21) {
conn.processStderr(0, &source);
} else {
@@ -647,57 +664,11 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
}
-StorePath RemoteStore::addToStore(const string & name, const Path & _srcPath,
- FileIngestionMethod method, HashType hashAlgo, PathFilter & filter, RepairFlag repair)
-{
- if (repair) throw Error("repairing is not supported when building through the Nix daemon");
-
- auto conn(getConnection());
-
- Path srcPath(absPath(_srcPath));
-
- conn->to
- << wopAddToStore
- << name
- << ((hashAlgo == htSHA256 && method == FileIngestionMethod::Recursive) ? 0 : 1) /* backwards compatibility hack */
- << (method == FileIngestionMethod::Recursive ? 1 : 0)
- << printHashType(hashAlgo);
-
- try {
- conn->to.written = 0;
- conn->to.warn = true;
- connections->incCapacity();
- {
- Finally cleanup([&]() { connections->decCapacity(); });
- dumpPath(srcPath, conn->to, filter);
- }
- conn->to.warn = false;
- conn.processStderr();
- } catch (SysError & e) {
- /* Daemon closed while we were sending the path. Probably OOM
- or I/O error. */
- if (e.errNo == EPIPE)
- try {
- conn.processStderr();
- } catch (EndOfFile & e) { }
- throw;
- }
-
- return parseStorePath(readString(conn->from));
-}
-
-
StorePath RemoteStore::addTextToStore(const string & name, const string & s,
const StorePathSet & references, RepairFlag repair)
{
- if (repair) throw Error("repairing is not supported when building through the Nix daemon");
-
- auto conn(getConnection());
- conn->to << wopAddTextToStore << name << s;
- writeStorePaths(*this, conn->to, references);
-
- conn.processStderr();
- return parseStorePath(readString(conn->from));
+ StringSource source(s);
+ return addCAToStore(source, name, TextHashMethod{}, references, repair)->path;
}
@@ -993,6 +964,49 @@ std::exception_ptr RemoteStore::Connection::processStderr(Sink * sink, Source *
return nullptr;
}
+void
+ConnectionHandle::withFramedSink(std::function<void(Sink &sink)> fun) {
+ (*this)->to.flush();
+
+ std::exception_ptr ex;
+
+ /* Handle log messages / exceptions from the remote on a
+ separate thread. */
+ std::thread stderrThread([&]()
+ {
+ try {
+ processStderr(nullptr, nullptr, false);
+ } catch (...) {
+ ex = std::current_exception();
+ }
+ });
+
+ Finally joinStderrThread([&]()
+ {
+ if (stderrThread.joinable()) {
+ stderrThread.join();
+ if (ex) {
+ try {
+ std::rethrow_exception(ex);
+ } catch (...) {
+ ignoreException();
+ }
+ }
+ }
+ });
+
+ {
+ FramedSink sink((*this)->to, ex);
+ fun(sink);
+ sink.flush();
+ }
+
+ stderrThread.join();
+ if (ex)
+ std::rethrow_exception(ex);
+
+}
+
static RegisterStoreImplementation<UDSRemoteStore, UDSRemoteStoreConfig> regStore;
}
diff --git a/src/libstore/remote-store.hh b/src/libstore/remote-store.hh
index 91c748006..735a3c24e 100644
--- a/src/libstore/remote-store.hh
+++ b/src/libstore/remote-store.hh
@@ -63,13 +63,16 @@ public:
void querySubstitutablePathInfos(const StorePathCAMap & paths,
SubstitutablePathInfos & infos) override;
+ /* Add a content-addressable store path. `dump` will be drained. */
+ ref<const ValidPathInfo> addCAToStore(Source & dump, const string & name, ContentAddressMethod caMethod, StorePathSet references, RepairFlag repair);
+
+ /* Add a content-addressable store path. Does not support references. `dump` will be drained. */
+ StorePath addToStoreFromDump(Source & dump, const string & name,
+ FileIngestionMethod method = FileIngestionMethod::Recursive, HashType hashAlgo = htSHA256, RepairFlag repair = NoRepair) override;
+
void addToStore(const ValidPathInfo & info, Source & nar,
RepairFlag repair, CheckSigsFlag checkSigs) override;
- StorePath addToStore(const string & name, const Path & srcPath,
- FileIngestionMethod method = FileIngestionMethod::Recursive, HashType hashAlgo = htSHA256,
- PathFilter & filter = defaultPathFilter, RepairFlag repair = NoRepair) override;
-
StorePath addTextToStore(const string & name, const string & s,
const StorePathSet & references, RepairFlag repair) override;
@@ -139,6 +142,8 @@ protected:
virtual void narFromPath(const StorePath & path, Sink & sink) override;
+ ref<const ValidPathInfo> readValidPathInfo(ConnectionHandle & conn, const StorePath & path);
+
private:
std::atomic_bool failed{false};
diff --git a/src/libstore/store-api.hh b/src/libstore/store-api.hh
index 4d3f07dfc..591140874 100644
--- a/src/libstore/store-api.hh
+++ b/src/libstore/store-api.hh
@@ -449,7 +449,8 @@ public:
/* Like addToStore(), but the contents of the path are contained
in `dump', which is either a NAR serialisation (if recursive ==
true) or simply the contents of a regular file (if recursive ==
- false). */
+ false).
+ `dump` may be drained */
// FIXME: remove?
virtual StorePath addToStoreFromDump(Source & dump, const string & name,
FileIngestionMethod method = FileIngestionMethod::Recursive, HashType hashAlgo = htSHA256, RepairFlag repair = NoRepair)
diff --git a/src/libstore/worker-protocol.hh b/src/libstore/worker-protocol.hh
index 13cf8d4ab..b100d1550 100644
--- a/src/libstore/worker-protocol.hh
+++ b/src/libstore/worker-protocol.hh
@@ -6,7 +6,7 @@ namespace nix {
#define WORKER_MAGIC_1 0x6e697863
#define WORKER_MAGIC_2 0x6478696f
-#define PROTOCOL_VERSION 0x118
+#define PROTOCOL_VERSION 0x119
#define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00)
#define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff)
@@ -18,7 +18,7 @@ typedef enum {
wopQueryReferences = 5, // obsolete
wopQueryReferrers = 6,
wopAddToStore = 7,
- wopAddTextToStore = 8,
+ wopAddTextToStore = 8, // obsolete since 1.25, Nix 3.0. Use wopAddToStore
wopBuildPaths = 9,
wopEnsurePath = 10,
wopAddTempRoot = 11,
diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc
index 00c945113..a469a1e73 100644
--- a/src/libutil/serialise.cc
+++ b/src/libutil/serialise.cc
@@ -93,7 +93,7 @@ void Source::operator () (unsigned char * data, size_t len)
}
-std::string Source::drain()
+void Source::drainInto(Sink & sink)
{
std::string s;
std::vector<unsigned char> buf(8192);
@@ -101,12 +101,19 @@ std::string Source::drain()
size_t n;
try {
n = read(buf.data(), buf.size());
- s.append((char *) buf.data(), n);
+ sink(buf.data(), n);
} catch (EndOfFile &) {
break;
}
}
- return s;
+}
+
+
+std::string Source::drain()
+{
+ StringSink s;
+ drainInto(s);
+ return *s.s;
}
diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh
index 7682a0f19..b41e58f33 100644
--- a/src/libutil/serialise.hh
+++ b/src/libutil/serialise.hh
@@ -69,6 +69,8 @@ struct Source
virtual bool good() { return true; }
+ void drainInto(Sink & sink);
+
std::string drain();
};
@@ -404,4 +406,93 @@ struct StreamToSourceAdapter : Source
};
+/* A source that reads a distinct format of concatenated chunks back into its
+ logical form, in order to guarantee a known state to the original stream,
+ even in the event of errors.
+
+ Use with FramedSink, which also allows the logical stream to be terminated
+ in the event of an exception.
+*/
+struct FramedSource : Source
+{
+ Source & from;
+ bool eof = false;
+ std::vector<unsigned char> pending;
+ size_t pos = 0;
+
+ FramedSource(Source & from) : from(from)
+ { }
+
+ ~FramedSource()
+ {
+ if (!eof) {
+ while (true) {
+ auto n = readInt(from);
+ if (!n) break;
+ std::vector<unsigned char> data(n);
+ from(data.data(), n);
+ }
+ }
+ }
+
+ size_t read(unsigned char * data, size_t len) override
+ {
+ if (eof) throw EndOfFile("reached end of FramedSource");
+
+ if (pos >= pending.size()) {
+ size_t len = readInt(from);
+ if (!len) {
+ eof = true;
+ return 0;
+ }
+ pending = std::vector<unsigned char>(len);
+ pos = 0;
+ from(pending.data(), len);
+ }
+
+ auto n = std::min(len, pending.size() - pos);
+ memcpy(data, pending.data() + pos, n);
+ pos += n;
+ return n;
+ }
+};
+
+/* Write as chunks in the format expected by FramedSource.
+
+ The exception_ptr reference can be used to terminate the stream when you
+ detect that an error has occurred on the remote end.
+*/
+struct FramedSink : nix::BufferedSink
+{
+ BufferedSink & to;
+ std::exception_ptr & ex;
+
+ FramedSink(BufferedSink & to, std::exception_ptr & ex) : to(to), ex(ex)
+ { }
+
+ ~FramedSink()
+ {
+ try {
+ to << 0;
+ to.flush();
+ } catch (...) {
+ ignoreException();
+ }
+ }
+
+ void write(const unsigned char * data, size_t len) override
+ {
+ /* Don't send more data if the remote has
+ encountered an error. */
+ if (ex) {
+ auto ex2 = ex;
+ ex = nullptr;
+ std::rethrow_exception(ex2);
+ }
+ to << len;
+ to(data, len);
+ };
+};
+
+
}
diff --git a/src/libutil/split.hh b/src/libutil/split.hh
index d19d7d8ed..87a23b13e 100644
--- a/src/libutil/split.hh
+++ b/src/libutil/split.hh
@@ -9,7 +9,7 @@ namespace nix {
// If `separator` is found, we return the portion of the string before the
// separator, and modify the string argument to contain only the part after the
-// separator. Otherwise, wer return `std::nullopt`, and we leave the argument
+// separator. Otherwise, we return `std::nullopt`, and we leave the argument
// string alone.
static inline std::optional<std::string_view> splitPrefixTo(std::string_view & string, char separator) {
auto sepInstance = string.find(separator);