diff options
-rw-r--r-- | src/libstore/build/derivation-goal.cc | 6 | ||||
-rw-r--r-- | src/libstore/daemon.cc | 64 | ||||
-rw-r--r-- | src/libstore/derivations.cc | 7 | ||||
-rw-r--r-- | src/libstore/export-import.cc | 7 | ||||
-rw-r--r-- | src/libstore/legacy-ssh-store.cc | 52 | ||||
-rw-r--r-- | src/libstore/path-info.cc | 7 | ||||
-rw-r--r-- | src/libstore/remote-store-connection.hh | 97 | ||||
-rw-r--r-- | src/libstore/remote-store.cc | 76 | ||||
-rw-r--r-- | src/libstore/remote-store.hh | 16 | ||||
-rw-r--r-- | src/libstore/ssh-store.cc | 1 | ||||
-rw-r--r-- | src/libstore/uds-remote-store.hh | 1 | ||||
-rw-r--r-- | src/libstore/worker-protocol-impl.hh | 40 | ||||
-rw-r--r-- | src/libstore/worker-protocol.cc | 102 | ||||
-rw-r--r-- | src/libstore/worker-protocol.hh | 34 | ||||
-rw-r--r-- | src/nix-store/nix-store.cc | 21 | ||||
-rw-r--r-- | src/nix/daemon.cc | 1 |
16 files changed, 348 insertions, 184 deletions
diff --git a/src/libstore/build/derivation-goal.cc b/src/libstore/build/derivation-goal.cc index 1a946e3d2..5e37f7ecb 100644 --- a/src/libstore/build/derivation-goal.cc +++ b/src/libstore/build/derivation-goal.cc @@ -1151,9 +1151,11 @@ HookReply DerivationGoal::tryBuildHook() throw; } + WorkerProto::WriteConn conn { hook->sink }; + /* Tell the hook all the inputs that have to be copied to the remote system. */ - WorkerProto::write(worker.store, hook->sink, inputPaths); + WorkerProto::write(worker.store, conn, inputPaths); /* Tell the hooks the missing outputs that have to be copied back from the remote system. */ @@ -1164,7 +1166,7 @@ HookReply DerivationGoal::tryBuildHook() if (buildMode != bmCheck && status.known && status.known->isValid()) continue; missingOutputs.insert(outputName); } - WorkerProto::write(worker.store, hook->sink, missingOutputs); + WorkerProto::write(worker.store, conn, missingOutputs); } hook->sink = FdSink(); diff --git a/src/libstore/daemon.cc b/src/libstore/daemon.cc index 7eba1a79d..75c3d2aca 100644 --- a/src/libstore/daemon.cc +++ b/src/libstore/daemon.cc @@ -260,13 +260,13 @@ struct ClientSettings } }; -static std::vector<DerivedPath> readDerivedPaths(Store & store, unsigned int clientVersion, Source & from) +static std::vector<DerivedPath> readDerivedPaths(Store & store, unsigned int clientVersion, WorkerProto::ReadConn conn) { std::vector<DerivedPath> reqs; if (GET_PROTOCOL_MINOR(clientVersion) >= 30) { - reqs = WorkerProto::Serialise<std::vector<DerivedPath>>::read(store, from); + reqs = WorkerProto::Serialise<std::vector<DerivedPath>>::read(store, conn); } else { - for (auto & s : readStrings<Strings>(from)) + for (auto & s : readStrings<Strings>(conn.from)) reqs.push_back(parsePathWithOutputs(store, s).toDerivedPath()); } return reqs; @@ -276,6 +276,9 @@ static void performOp(TunnelLogger * logger, ref<Store> store, TrustedFlag trusted, RecursiveFlag recursive, unsigned int clientVersion, Source & from, BufferedSink & to, WorkerProto::Op op) { + WorkerProto::ReadConn rconn { .from = from }; + WorkerProto::WriteConn wconn { .to = to }; + switch (op) { case WorkerProto::Op::IsValidPath: { @@ -288,7 +291,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store, } case WorkerProto::Op::QueryValidPaths: { - auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, from); + auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, rconn); SubstituteFlag substitute = NoSubstitute; if (GET_PROTOCOL_MINOR(clientVersion) >= 27) { @@ -301,7 +304,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store, } auto res = store->queryValidPaths(paths, substitute); logger->stopWork(); - WorkerProto::write(*store, to, res); + WorkerProto::write(*store, wconn, res); break; } @@ -317,11 +320,11 @@ static void performOp(TunnelLogger * logger, ref<Store> store, } case WorkerProto::Op::QuerySubstitutablePaths: { - auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, from); + auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, rconn); logger->startWork(); auto res = store->querySubstitutablePaths(paths); logger->stopWork(); - WorkerProto::write(*store, to, res); + WorkerProto::write(*store, wconn, res); break; } @@ -350,7 +353,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store, paths = store->queryValidDerivers(path); else paths = store->queryDerivationOutputs(path); logger->stopWork(); - WorkerProto::write(*store, to, paths); + WorkerProto::write(*store, wconn, paths); break; } @@ -368,7 +371,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store, logger->startWork(); auto outputs = store->queryPartialDerivationOutputMap(path); logger->stopWork(); - WorkerProto::write(*store, to, outputs); + WorkerProto::write(*store, wconn, outputs); break; } @@ -394,7 +397,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store, if (GET_PROTOCOL_MINOR(clientVersion) >= 25) { auto name = readString(from); auto camStr = readString(from); - auto refs = WorkerProto::Serialise<StorePathSet>::read(*store, from); + auto refs = WorkerProto::Serialise<StorePathSet>::read(*store, rconn); bool repairBool; from >> repairBool; auto repair = RepairFlag{repairBool}; @@ -496,7 +499,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store, case WorkerProto::Op::AddTextToStore: { std::string suffix = readString(from); std::string s = readString(from); - auto refs = WorkerProto::Serialise<StorePathSet>::read(*store, from); + auto refs = WorkerProto::Serialise<StorePathSet>::read(*store, rconn); logger->startWork(); auto path = store->addTextToStore(suffix, s, refs, NoRepair); logger->stopWork(); @@ -528,7 +531,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store, } case WorkerProto::Op::BuildPaths: { - auto drvs = readDerivedPaths(*store, clientVersion, from); + auto drvs = readDerivedPaths(*store, clientVersion, rconn); BuildMode mode = bmNormal; if (GET_PROTOCOL_MINOR(clientVersion) >= 15) { mode = (BuildMode) readInt(from); @@ -553,7 +556,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store, } case WorkerProto::Op::BuildPathsWithResults: { - auto drvs = readDerivedPaths(*store, clientVersion, from); + auto drvs = readDerivedPaths(*store, clientVersion, rconn); BuildMode mode = bmNormal; mode = (BuildMode) readInt(from); @@ -568,7 +571,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store, auto results = store->buildPathsWithResults(drvs, mode); logger->stopWork(); - WorkerProto::write(*store, to, results); + WorkerProto::write(*store, wconn, results); break; } @@ -645,7 +648,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store, DrvOutputs builtOutputs; for (auto & [output, realisation] : res.builtOutputs) builtOutputs.insert_or_assign(realisation.id, realisation); - WorkerProto::write(*store, to, builtOutputs); + WorkerProto::write(*store, wconn, builtOutputs); } break; } @@ -710,7 +713,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store, case WorkerProto::Op::CollectGarbage: { GCOptions options; options.action = (GCOptions::GCAction) readInt(from); - options.pathsToDelete = WorkerProto::Serialise<StorePathSet>::read(*store, from); + options.pathsToDelete = WorkerProto::Serialise<StorePathSet>::read(*store, rconn); from >> options.ignoreLiveness >> options.maxFreed; // obsolete fields readInt(from); @@ -780,7 +783,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store, else { to << 1 << (i->second.deriver ? store->printStorePath(*i->second.deriver) : ""); - WorkerProto::write(*store, to, i->second.references); + WorkerProto::write(*store, wconn, i->second.references); to << i->second.downloadSize << i->second.narSize; } @@ -791,11 +794,11 @@ static void performOp(TunnelLogger * logger, ref<Store> store, SubstitutablePathInfos infos; StorePathCAMap pathsMap = {}; if (GET_PROTOCOL_MINOR(clientVersion) < 22) { - auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, from); + auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, rconn); for (auto & path : paths) pathsMap.emplace(path, std::nullopt); } else - pathsMap = WorkerProto::Serialise<StorePathCAMap>::read(*store, from); + pathsMap = WorkerProto::Serialise<StorePathCAMap>::read(*store, rconn); logger->startWork(); store->querySubstitutablePathInfos(pathsMap, infos); logger->stopWork(); @@ -803,7 +806,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store, for (auto & i : infos) { to << store->printStorePath(i.first) << (i.second.deriver ? store->printStorePath(*i.second.deriver) : ""); - WorkerProto::write(*store, to, i.second.references); + WorkerProto::write(*store, wconn, i.second.references); to << i.second.downloadSize << i.second.narSize; } break; @@ -813,7 +816,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store, logger->startWork(); auto paths = store->queryAllValidPaths(); logger->stopWork(); - WorkerProto::write(*store, to, paths); + WorkerProto::write(*store, wconn, paths); break; } @@ -885,7 +888,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store, ValidPathInfo info { path, narHash }; if (deriver != "") info.deriver = store->parseStorePath(deriver); - info.references = WorkerProto::Serialise<StorePathSet>::read(*store, from); + info.references = WorkerProto::Serialise<StorePathSet>::read(*store, rconn); from >> info.registrationTime >> info.narSize >> info.ultimate; info.sigs = readStrings<StringSet>(from); info.ca = ContentAddress::parseOpt(readString(from)); @@ -930,15 +933,15 @@ static void performOp(TunnelLogger * logger, ref<Store> store, } case WorkerProto::Op::QueryMissing: { - auto targets = readDerivedPaths(*store, clientVersion, from); + auto targets = readDerivedPaths(*store, clientVersion, rconn); logger->startWork(); StorePathSet willBuild, willSubstitute, unknown; uint64_t downloadSize, narSize; store->queryMissing(targets, willBuild, willSubstitute, unknown, downloadSize, narSize); logger->stopWork(); - WorkerProto::write(*store, to, willBuild); - WorkerProto::write(*store, to, willSubstitute); - WorkerProto::write(*store, to, unknown); + WorkerProto::write(*store, wconn, willBuild); + WorkerProto::write(*store, wconn, willSubstitute); + WorkerProto::write(*store, wconn, unknown); to << downloadSize << narSize; break; } @@ -951,7 +954,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store, store->registerDrvOutput(Realisation{ .id = outputId, .outPath = outputPath}); } else { - auto realisation = WorkerProto::Serialise<Realisation>::read(*store, from); + auto realisation = WorkerProto::Serialise<Realisation>::read(*store, rconn); store->registerDrvOutput(realisation); } logger->stopWork(); @@ -966,11 +969,11 @@ static void performOp(TunnelLogger * logger, ref<Store> store, if (GET_PROTOCOL_MINOR(clientVersion) < 31) { std::set<StorePath> outPaths; if (info) outPaths.insert(info->outPath); - WorkerProto::write(*store, to, outPaths); + WorkerProto::write(*store, wconn, outPaths); } else { std::set<Realisation> realisations; if (info) realisations.insert(*info); - WorkerProto::write(*store, to, realisations); + WorkerProto::write(*store, wconn, realisations); } break; } @@ -1050,7 +1053,8 @@ void processConnection( auto temp = trusted ? store->isTrustedClient() : std::optional { NotTrusted }; - WorkerProto::write(*store, to, temp); + WorkerProto::WriteConn wconn { .to = to }; + WorkerProto::write(*store, wconn, temp); } /* Send startup error messages to the client. */ diff --git a/src/libstore/derivations.cc b/src/libstore/derivations.cc index 2295ff9e8..6f63685d4 100644 --- a/src/libstore/derivations.cc +++ b/src/libstore/derivations.cc @@ -750,7 +750,8 @@ Source & readDerivation(Source & in, const Store & store, BasicDerivation & drv, drv.outputs.emplace(std::move(name), std::move(output)); } - drv.inputSrcs = WorkerProto::Serialise<StorePathSet>::read(store, in); + drv.inputSrcs = WorkerProto::Serialise<StorePathSet>::read(store, + WorkerProto::ReadConn { .from = in }); in >> drv.platform >> drv.builder; drv.args = readStrings<Strings>(in); @@ -798,7 +799,9 @@ void writeDerivation(Sink & out, const Store & store, const BasicDerivation & dr }, }, i.second.raw()); } - WorkerProto::write(store, out, drv.inputSrcs); + WorkerProto::write(store, + WorkerProto::WriteConn { .to = out }, + drv.inputSrcs); out << drv.platform << drv.builder << drv.args; out << drv.env.size(); for (auto & i : drv.env) diff --git a/src/libstore/export-import.cc b/src/libstore/export-import.cc index b4100bace..e866aeb42 100644 --- a/src/libstore/export-import.cc +++ b/src/libstore/export-import.cc @@ -46,7 +46,9 @@ void Store::exportPath(const StorePath & path, Sink & sink) teeSink << exportMagic << printStorePath(path); - WorkerProto::write(*this, teeSink, info->references); + WorkerProto::write(*this, + WorkerProto::WriteConn { .to = teeSink }, + info->references); teeSink << (info->deriver ? printStorePath(*info->deriver) : "") << 0; @@ -74,7 +76,8 @@ StorePaths Store::importPaths(Source & source, CheckSigsFlag checkSigs) //Activity act(*logger, lvlInfo, "importing path '%s'", info.path); - auto references = WorkerProto::Serialise<StorePathSet>::read(*this, source); + auto references = WorkerProto::Serialise<StorePathSet>::read(*this, + WorkerProto::ReadConn { .from = source }); auto deriver = readString(source); auto narHash = hashString(htSHA256, saved.s); diff --git a/src/libstore/legacy-ssh-store.cc b/src/libstore/legacy-ssh-store.cc index 55ecab0ff..fa17d606d 100644 --- a/src/libstore/legacy-ssh-store.cc +++ b/src/libstore/legacy-ssh-store.cc @@ -48,6 +48,42 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor FdSource from; int remoteVersion; bool good = true; + + /** + * Coercion to `WorkerProto::ReadConn`. This makes it easy to use the + * factored out worker protocol searlizers with a + * `LegacySSHStore::Connection`. + * + * The worker protocol connection types are unidirectional, unlike + * this type. + * + * @todo Use server protocol serializers, not worker protocol + * serializers, once we have made that distiction. + */ + operator WorkerProto::ReadConn () + { + return WorkerProto::ReadConn { + .from = from, + }; + } + + /* + * Coercion to `WorkerProto::WriteConn`. This makes it easy to use the + * factored out worker protocol searlizers with a + * `LegacySSHStore::Connection`. + * + * The worker protocol connection types are unidirectional, unlike + * this type. + * + * @todo Use server protocol serializers, not worker protocol + * serializers, once we have made that distiction. + */ + operator WorkerProto::WriteConn () + { + return WorkerProto::WriteConn { + .to = to, + }; + } }; std::string host; @@ -147,7 +183,7 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor auto deriver = readString(conn->from); if (deriver != "") info->deriver = parseStorePath(deriver); - info->references = WorkerProto::Serialise<StorePathSet>::read(*this, conn->from); + info->references = WorkerProto::Serialise<StorePathSet>::read(*this, *conn); readLongLong(conn->from); // download size info->narSize = readLongLong(conn->from); @@ -181,7 +217,7 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor << printStorePath(info.path) << (info.deriver ? printStorePath(*info.deriver) : "") << info.narHash.to_string(Base16, false); - WorkerProto::write(*this, conn->to, info.references); + WorkerProto::write(*this, *conn, info.references); conn->to << info.registrationTime << info.narSize @@ -210,7 +246,7 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor conn->to << exportMagic << printStorePath(info.path); - WorkerProto::write(*this, conn->to, info.references); + WorkerProto::write(*this, *conn, info.references); conn->to << (info.deriver ? printStorePath(*info.deriver) : "") << 0 @@ -295,7 +331,7 @@ public: if (GET_PROTOCOL_MINOR(conn->remoteVersion) >= 3) conn->from >> status.timesBuilt >> status.isNonDeterministic >> status.startTime >> status.stopTime; if (GET_PROTOCOL_MINOR(conn->remoteVersion) >= 6) { - auto builtOutputs = WorkerProto::Serialise<DrvOutputs>::read(*this, conn->from); + auto builtOutputs = WorkerProto::Serialise<DrvOutputs>::read(*this, *conn); for (auto && [output, realisation] : builtOutputs) status.builtOutputs.insert_or_assign( std::move(output.outputName), @@ -370,10 +406,10 @@ public: conn->to << ServeProto::Command::QueryClosure << includeOutputs; - WorkerProto::write(*this, conn->to, paths); + WorkerProto::write(*this, *conn, paths); conn->to.flush(); - for (auto & i : WorkerProto::Serialise<StorePathSet>::read(*this, conn->from)) + for (auto & i : WorkerProto::Serialise<StorePathSet>::read(*this, *conn)) out.insert(i); } @@ -386,10 +422,10 @@ public: << ServeProto::Command::QueryValidPaths << false // lock << maybeSubstitute; - WorkerProto::write(*this, conn->to, paths); + WorkerProto::write(*this, *conn, paths); conn->to.flush(); - return WorkerProto::Serialise<StorePathSet>::read(*this, conn->from); + return WorkerProto::Serialise<StorePathSet>::read(*this, *conn); } void connect() override diff --git a/src/libstore/path-info.cc b/src/libstore/path-info.cc index 9971de1dd..981bbfb14 100644 --- a/src/libstore/path-info.cc +++ b/src/libstore/path-info.cc @@ -133,7 +133,8 @@ ValidPathInfo ValidPathInfo::read(Source & source, const Store & store, unsigned auto narHash = Hash::parseAny(readString(source), htSHA256); ValidPathInfo info(path, narHash); if (deriver != "") info.deriver = store.parseStorePath(deriver); - info.references = WorkerProto::Serialise<StorePathSet>::read(store, source); + info.references = WorkerProto::Serialise<StorePathSet>::read(store, + WorkerProto::ReadConn { .from = source }); source >> info.registrationTime >> info.narSize; if (format >= 16) { source >> info.ultimate; @@ -154,7 +155,9 @@ void ValidPathInfo::write( sink << store.printStorePath(path); sink << (deriver ? store.printStorePath(*deriver) : "") << narHash.to_string(Base16, false); - WorkerProto::write(store, sink, references); + WorkerProto::write(store, + WorkerProto::WriteConn { .to = sink }, + references); sink << registrationTime << narSize; if (format >= 16) { sink << ultimate diff --git a/src/libstore/remote-store-connection.hh b/src/libstore/remote-store-connection.hh new file mode 100644 index 000000000..d32d91a60 --- /dev/null +++ b/src/libstore/remote-store-connection.hh @@ -0,0 +1,97 @@ +#include "remote-store.hh" +#include "worker-protocol.hh" + +namespace nix { + +/** + * Bidirectional connection (send and receive) used by the Remote Store + * implementation. + * + * Contains `Source` and `Sink` for actual communication, along with + * other information learned when negotiating the connection. + */ +struct RemoteStore::Connection +{ + /** + * Send with this. + */ + FdSink to; + + /** + * Receive with this. + */ + FdSource from; + + /** + * Worker protocol version used for the connection. + * + * Despite its name, I think it is actually the maximum version both + * sides support. (If the maximum doesn't exist, we would fail to + * establish a connection and produce a value of this type.) + */ + unsigned int daemonVersion; + + /** + * Whether the remote side trusts us or not. + * + * 3 values: "yes", "no", or `std::nullopt` for "unknown". + * + * Note that the "remote side" might not be just the end daemon, but + * also an intermediary forwarder that can make its own trusting + * decisions. This would be the intersection of all their trust + * decisions, since it takes only one link in the chain to start + * denying operations. + */ + std::optional<TrustedFlag> remoteTrustsUs; + + /** + * The version of the Nix daemon that is processing our requests. + * + * Do note, it may or may not communicating with another daemon, + * rather than being an "end" `LocalStore` or similar. + */ + std::optional<std::string> daemonNixVersion; + + /** + * Time this connection was established. + */ + std::chrono::time_point<std::chrono::steady_clock> startTime; + + /** + * Coercion to `WorkerProto::ReadConn`. This makes it easy to use the + * factored out worker protocol searlizers with a + * `RemoteStore::Connection`. + * + * The worker protocol connection types are unidirectional, unlike + * this type. + */ + operator WorkerProto::ReadConn () + { + return WorkerProto::ReadConn { + .from = from, + }; + } + + /** + * Coercion to `WorkerProto::WriteConn`. This makes it easy to use the + * factored out worker protocol searlizers with a + * `RemoteStore::Connection`. + * + * The worker protocol connection types are unidirectional, unlike + * this type. + */ + operator WorkerProto::WriteConn () + { + return WorkerProto::WriteConn { + .to = to, + }; + } + + virtual ~Connection(); + + virtual void closeWrite() = 0; + + std::exception_ptr processStderr(Sink * sink = 0, Source * source = 0, bool flush = true); +}; + +} diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index 9253ce09e..1e2104e1f 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -5,6 +5,7 @@ #include "remote-fs-accessor.hh" #include "build-result.hh" #include "remote-store.hh" +#include "remote-store-connection.hh" #include "worker-protocol.hh" #include "worker-protocol-impl.hh" #include "archive.hh" @@ -101,7 +102,7 @@ void RemoteStore::initConnection(Connection & conn) } if (GET_PROTOCOL_MINOR(conn.daemonVersion) >= 35) { - conn.remoteTrustsUs = WorkerProto::Serialise<std::optional<TrustedFlag>>::read(*this, conn.from); + conn.remoteTrustsUs = WorkerProto::Serialise<std::optional<TrustedFlag>>::read(*this, conn); } else { // We don't know the answer; protocol to old. conn.remoteTrustsUs = std::nullopt; @@ -185,6 +186,7 @@ struct ConnectionHandle } RemoteStore::Connection * operator -> () { return &*handle; } + RemoteStore::Connection & operator * () { return *handle; } void processStderr(Sink * sink = 0, Source * source = 0, bool flush = true) { @@ -228,12 +230,12 @@ StorePathSet RemoteStore::queryValidPaths(const StorePathSet & paths, Substitute return res; } else { conn->to << WorkerProto::Op::QueryValidPaths; - WorkerProto::write(*this, conn->to, paths); + WorkerProto::write(*this, *conn, paths); if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 27) { conn->to << (settings.buildersUseSubstitutes ? 1 : 0); } conn.processStderr(); - return WorkerProto::Serialise<StorePathSet>::read(*this, conn->from); + return WorkerProto::Serialise<StorePathSet>::read(*this, *conn); } } @@ -243,7 +245,7 @@ StorePathSet RemoteStore::queryAllValidPaths() auto conn(getConnection()); conn->to << WorkerProto::Op::QueryAllValidPaths; conn.processStderr(); - return WorkerProto::Serialise<StorePathSet>::read(*this, conn->from); + return WorkerProto::Serialise<StorePathSet>::read(*this, *conn); } @@ -260,9 +262,9 @@ StorePathSet RemoteStore::querySubstitutablePaths(const StorePathSet & paths) return res; } else { conn->to << WorkerProto::Op::QuerySubstitutablePaths; - WorkerProto::write(*this, conn->to, paths); + WorkerProto::write(*this, *conn, paths); conn.processStderr(); - return WorkerProto::Serialise<StorePathSet>::read(*this, conn->from); + return WorkerProto::Serialise<StorePathSet>::read(*this, *conn); } } @@ -284,7 +286,7 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S auto deriver = readString(conn->from); if (deriver != "") info.deriver = parseStorePath(deriver); - info.references = WorkerProto::Serialise<StorePathSet>::read(*this, conn->from); + info.references = WorkerProto::Serialise<StorePathSet>::read(*this, *conn); info.downloadSize = readLongLong(conn->from); info.narSize = readLongLong(conn->from); infos.insert_or_assign(i.first, std::move(info)); @@ -297,9 +299,9 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S StorePathSet paths; for (auto & path : pathsMap) paths.insert(path.first); - WorkerProto::write(*this, conn->to, paths); + WorkerProto::write(*this, *conn, paths); } else - WorkerProto::write(*this, conn->to, pathsMap); + WorkerProto::write(*this, *conn, pathsMap); conn.processStderr(); size_t count = readNum<size_t>(conn->from); for (size_t n = 0; n < count; n++) { @@ -307,7 +309,7 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S auto deriver = readString(conn->from); if (deriver != "") info.deriver = parseStorePath(deriver); - info.references = WorkerProto::Serialise<StorePathSet>::read(*this, conn->from); + info.references = WorkerProto::Serialise<StorePathSet>::read(*this, *conn); info.downloadSize = readLongLong(conn->from); info.narSize = readLongLong(conn->from); } @@ -350,7 +352,7 @@ void RemoteStore::queryReferrers(const StorePath & path, auto conn(getConnection()); conn->to << WorkerProto::Op::QueryReferrers << printStorePath(path); conn.processStderr(); - for (auto & i : WorkerProto::Serialise<StorePathSet>::read(*this, conn->from)) + for (auto & i : WorkerProto::Serialise<StorePathSet>::read(*this, *conn)) referrers.insert(i); } @@ -360,7 +362,7 @@ StorePathSet RemoteStore::queryValidDerivers(const StorePath & path) auto conn(getConnection()); conn->to << WorkerProto::Op::QueryValidDerivers << printStorePath(path); conn.processStderr(); - return WorkerProto::Serialise<StorePathSet>::read(*this, conn->from); + return WorkerProto::Serialise<StorePathSet>::read(*this, *conn); } @@ -372,7 +374,7 @@ StorePathSet RemoteStore::queryDerivationOutputs(const StorePath & path) auto conn(getConnection()); conn->to << WorkerProto::Op::QueryDerivationOutputs << printStorePath(path); conn.processStderr(); - return WorkerProto::Serialise<StorePathSet>::read(*this, conn->from); + return WorkerProto::Serialise<StorePathSet>::read(*this, *conn); } @@ -382,7 +384,7 @@ std::map<std::string, std::optional<StorePath>> RemoteStore::queryPartialDerivat auto conn(getConnection()); conn->to << WorkerProto::Op::QueryDerivationOutputMap << printStorePath(path); conn.processStderr(); - return WorkerProto::Serialise<std::map<std::string, std::optional<StorePath>>>::read(*this, conn->from); + return WorkerProto::Serialise<std::map<std::string, std::optional<StorePath>>>::read(*this, *conn); } else { // Fallback for old daemon versions. // For floating-CA derivations (and their co-dependencies) this is an @@ -428,7 +430,7 @@ ref<const ValidPathInfo> RemoteStore::addCAToStore( << WorkerProto::Op::AddToStore << name << caMethod.render(hashType); - WorkerProto::write(*this, conn->to, references); + WorkerProto::write(*this, *conn, references); conn->to << repair; // The dump source may invoke the store, so we need to make some room. @@ -453,7 +455,7 @@ ref<const ValidPathInfo> RemoteStore::addCAToStore( name, printHashType(hashType)); std::string s = dump.drain(); conn->to << WorkerProto::Op::AddTextToStore << name << s; - WorkerProto::write(*this, conn->to, references); + WorkerProto::write(*this, *conn, references); conn.processStderr(); }, [&](const FileIngestionMethod & fim) -> void { @@ -519,7 +521,7 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source, sink << exportMagic << printStorePath(info.path); - WorkerProto::write(*this, sink, info.references); + WorkerProto::write(*this, *conn, info.references); sink << (info.deriver ? printStorePath(*info.deriver) : "") << 0 // == no legacy signature @@ -529,7 +531,7 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source, conn.processStderr(0, source2.get()); - auto importedPaths = WorkerProto::Serialise<StorePathSet>::read(*this, conn->from); + auto importedPaths = WorkerProto::Serialise<StorePathSet>::read(*this, *conn); assert(importedPaths.size() <= 1); } @@ -538,7 +540,7 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source, << printStorePath(info.path) << (info.deriver ? printStorePath(*info.deriver) : "") << info.narHash.to_string(Base16, false); - WorkerProto::write(*this, conn->to, info.references); + WorkerProto::write(*this, *conn, info.references); conn->to << info.registrationTime << info.narSize << info.ultimate << info.sigs << renderContentAddress(info.ca) << repair << !checkSigs; @@ -611,7 +613,7 @@ void RemoteStore::registerDrvOutput(const Realisation & info) conn->to << info.id.to_string(); conn->to << std::string(info.outPath.to_string()); } else { - WorkerProto::write(*this, conn->to, info); + WorkerProto::write(*this, *conn, info); } conn.processStderr(); } @@ -634,13 +636,13 @@ void RemoteStore::queryRealisationUncached(const DrvOutput & id, auto real = [&]() -> std::shared_ptr<const Realisation> { if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 31) { auto outPaths = WorkerProto::Serialise<std::set<StorePath>>::read( - *this, conn->from); + *this, *conn); if (outPaths.empty()) return nullptr; return std::make_shared<const Realisation>(Realisation { .id = id, .outPath = *outPaths.begin() }); } else { auto realisations = WorkerProto::Serialise<std::set<Realisation>>::read( - *this, conn->from); + *this, *conn); if (realisations.empty()) return nullptr; return std::make_shared<const Realisation>(*realisations.begin()); @@ -651,10 +653,10 @@ void RemoteStore::queryRealisationUncached(const DrvOutput & id, } catch (...) { return callback.rethrow(); } } -static void writeDerivedPaths(RemoteStore & store, ConnectionHandle & conn, const std::vector<DerivedPath> & reqs) +static void writeDerivedPaths(RemoteStore & store, RemoteStore::Connection & conn, const std::vector<DerivedPath> & reqs) { - if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 30) { - WorkerProto::write(store, conn->to, reqs); + if (GET_PROTOCOL_MINOR(conn.daemonVersion) >= 30) { + WorkerProto::write(store, conn, reqs); } else { Strings ss; for (auto & p : reqs) { @@ -666,12 +668,12 @@ static void writeDerivedPaths(RemoteStore & store, ConnectionHandle & conn, cons [&](const StorePath & drvPath) { throw Error("trying to request '%s', but daemon protocol %d.%d is too old (< 1.29) to request a derivation file", store.printStorePath(drvPath), - GET_PROTOCOL_MAJOR(conn->daemonVersion), - GET_PROTOCOL_MINOR(conn->daemonVersion)); + GET_PROTOCOL_MAJOR(conn.daemonVersion), + GET_PROTOCOL_MINOR(conn.daemonVersion)); }, }, sOrDrvPath); } - conn->to << ss; + conn.to << ss; } } @@ -697,7 +699,7 @@ void RemoteStore::buildPaths(const std::vector<DerivedPath> & drvPaths, BuildMod auto conn(getConnection()); conn->to << WorkerProto::Op::BuildPaths; assert(GET_PROTOCOL_MINOR(conn->daemonVersion) >= 13); - writeDerivedPaths(*this, conn, drvPaths); + writeDerivedPaths(*this, *conn, drvPaths); if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 15) conn->to << buildMode; else @@ -721,10 +723,10 @@ std::vector<KeyedBuildResult> RemoteStore::buildPathsWithResults( if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 34) { conn->to << WorkerProto::Op::BuildPathsWithResults; - writeDerivedPaths(*this, conn, paths); + writeDerivedPaths(*this, *conn, paths); conn->to << buildMode; conn.processStderr(); - return WorkerProto::Serialise<std::vector<KeyedBuildResult>>::read(*this, conn->from); + return WorkerProto::Serialise<std::vector<KeyedBuildResult>>::read(*this, *conn); } else { // Avoid deadlock. conn_.reset(); @@ -807,7 +809,7 @@ BuildResult RemoteStore::buildDerivation(const StorePath & drvPath, const BasicD conn->from >> res.timesBuilt >> res.isNonDeterministic >> res.startTime >> res.stopTime; } if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 28) { - auto builtOutputs = WorkerProto::Serialise<DrvOutputs>::read(*this, conn->from); + auto builtOutputs = WorkerProto::Serialise<DrvOutputs>::read(*this, *conn); for (auto && [output, realisation] : builtOutputs) res.builtOutputs.insert_or_assign( std::move(output.outputName), @@ -866,7 +868,7 @@ void RemoteStore::collectGarbage(const GCOptions & options, GCResults & results) conn->to << WorkerProto::Op::CollectGarbage << options.action; - WorkerProto::write(*this, conn->to, options.pathsToDelete); + WorkerProto::write(*this, *conn, options.pathsToDelete); conn->to << options.ignoreLiveness << options.maxFreed /* removed options */ @@ -923,11 +925,11 @@ void RemoteStore::queryMissing(const std::vector<DerivedPath> & targets, // to prevent a deadlock. goto fallback; conn->to << WorkerProto::Op::QueryMissing; - writeDerivedPaths(*this, conn, targets); + writeDerivedPaths(*this, *conn, targets); conn.processStderr(); - willBuild = WorkerProto::Serialise<StorePathSet>::read(*this, conn->from); - willSubstitute = WorkerProto::Serialise<StorePathSet>::read(*this, conn->from); - unknown = WorkerProto::Serialise<StorePathSet>::read(*this, conn->from); + willBuild = WorkerProto::Serialise<StorePathSet>::read(*this, *conn); + willSubstitute = WorkerProto::Serialise<StorePathSet>::read(*this, *conn); + unknown = WorkerProto::Serialise<StorePathSet>::read(*this, *conn); conn->from >> downloadSize >> narSize; return; } diff --git a/src/libstore/remote-store.hh b/src/libstore/remote-store.hh index 4f3971bfd..cb7a71acf 100644 --- a/src/libstore/remote-store.hh +++ b/src/libstore/remote-store.hh @@ -166,21 +166,7 @@ public: void flushBadConnections(); - struct Connection - { - FdSink to; - FdSource from; - unsigned int daemonVersion; - std::optional<TrustedFlag> remoteTrustsUs; - std::optional<std::string> daemonNixVersion; - std::chrono::time_point<std::chrono::steady_clock> startTime; - - virtual ~Connection(); - - virtual void closeWrite() = 0; - - std::exception_ptr processStderr(Sink * sink = 0, Source * source = 0, bool flush = true); - }; + struct Connection; ref<Connection> openConnectionWrapper(); diff --git a/src/libstore/ssh-store.cc b/src/libstore/ssh-store.cc index 962221ad2..0200076c0 100644 --- a/src/libstore/ssh-store.cc +++ b/src/libstore/ssh-store.cc @@ -1,6 +1,7 @@ #include "ssh-store-config.hh" #include "store-api.hh" #include "remote-store.hh" +#include "remote-store-connection.hh" #include "remote-fs-accessor.hh" #include "archive.hh" #include "worker-protocol.hh" diff --git a/src/libstore/uds-remote-store.hh b/src/libstore/uds-remote-store.hh index 3bbab371c..2bd6517fa 100644 --- a/src/libstore/uds-remote-store.hh +++ b/src/libstore/uds-remote-store.hh @@ -2,6 +2,7 @@ ///@file #include "remote-store.hh" +#include "remote-store-connection.hh" #include "local-fs-store.hh" namespace nix { diff --git a/src/libstore/worker-protocol-impl.hh b/src/libstore/worker-protocol-impl.hh index f34e55c54..d3d2792ff 100644 --- a/src/libstore/worker-protocol-impl.hh +++ b/src/libstore/worker-protocol-impl.hh @@ -13,65 +13,65 @@ namespace nix { template<typename T> -std::vector<T> WorkerProto::Serialise<std::vector<T>>::read(const Store & store, Source & from) +std::vector<T> WorkerProto::Serialise<std::vector<T>>::read(const Store & store, WorkerProto::ReadConn conn) { std::vector<T> resSet; - auto size = readNum<size_t>(from); + auto size = readNum<size_t>(conn.from); while (size--) { - resSet.push_back(WorkerProto::Serialise<T>::read(store, from)); + resSet.push_back(WorkerProto::Serialise<T>::read(store, conn)); } return resSet; } template<typename T> -void WorkerProto::Serialise<std::vector<T>>::write(const Store & store, Sink & out, const std::vector<T> & resSet) +void WorkerProto::Serialise<std::vector<T>>::write(const Store & store, WorkerProto::WriteConn conn, const std::vector<T> & resSet) { - out << resSet.size(); + conn.to << resSet.size(); for (auto & key : resSet) { - WorkerProto::Serialise<T>::write(store, out, key); + WorkerProto::Serialise<T>::write(store, conn, key); } } template<typename T> -std::set<T> WorkerProto::Serialise<std::set<T>>::read(const Store & store, Source & from) +std::set<T> WorkerProto::Serialise<std::set<T>>::read(const Store & store, WorkerProto::ReadConn conn) { std::set<T> resSet; - auto size = readNum<size_t>(from); + auto size = readNum<size_t>(conn.from); while (size--) { - resSet.insert(WorkerProto::Serialise<T>::read(store, from)); + resSet.insert(WorkerProto::Serialise<T>::read(store, conn)); } return resSet; } template<typename T> -void WorkerProto::Serialise<std::set<T>>::write(const Store & store, Sink & out, const std::set<T> & resSet) +void WorkerProto::Serialise<std::set<T>>::write(const Store & store, WorkerProto::WriteConn conn, const std::set<T> & resSet) { - out << resSet.size(); + conn.to << resSet.size(); for (auto & key : resSet) { - WorkerProto::Serialise<T>::write(store, out, key); + WorkerProto::Serialise<T>::write(store, conn, key); } } template<typename K, typename V> -std::map<K, V> WorkerProto::Serialise<std::map<K, V>>::read(const Store & store, Source & from) +std::map<K, V> WorkerProto::Serialise<std::map<K, V>>::read(const Store & store, WorkerProto::ReadConn conn) { std::map<K, V> resMap; - auto size = readNum<size_t>(from); + auto size = readNum<size_t>(conn.from); while (size--) { - auto k = WorkerProto::Serialise<K>::read(store, from); - auto v = WorkerProto::Serialise<V>::read(store, from); + auto k = WorkerProto::Serialise<K>::read(store, conn); + auto v = WorkerProto::Serialise<V>::read(store, conn); resMap.insert_or_assign(std::move(k), std::move(v)); } return resMap; } template<typename K, typename V> -void WorkerProto::Serialise<std::map<K, V>>::write(const Store & store, Sink & out, const std::map<K, V> & resMap) +void WorkerProto::Serialise<std::map<K, V>>::write(const Store & store, WorkerProto::WriteConn conn, const std::map<K, V> & resMap) { - out << resMap.size(); + conn.to << resMap.size(); for (auto & i : resMap) { - WorkerProto::Serialise<K>::write(store, out, i.first); - WorkerProto::Serialise<V>::write(store, out, i.second); + WorkerProto::Serialise<K>::write(store, conn, i.first); + WorkerProto::Serialise<V>::write(store, conn, i.second); } } diff --git a/src/libstore/worker-protocol.cc b/src/libstore/worker-protocol.cc index 0bf9e4d68..a23130743 100644 --- a/src/libstore/worker-protocol.cc +++ b/src/libstore/worker-protocol.cc @@ -12,31 +12,31 @@ namespace nix { -std::string WorkerProto::Serialise<std::string>::read(const Store & store, Source & from) +std::string WorkerProto::Serialise<std::string>::read(const Store & store, WorkerProto::ReadConn conn) { - return readString(from); + return readString(conn.from); } -void WorkerProto::Serialise<std::string>::write(const Store & store, Sink & out, const std::string & str) +void WorkerProto::Serialise<std::string>::write(const Store & store, WorkerProto::WriteConn conn, const std::string & str) { - out << str; + conn.to << str; } -StorePath WorkerProto::Serialise<StorePath>::read(const Store & store, Source & from) +StorePath WorkerProto::Serialise<StorePath>::read(const Store & store, WorkerProto::ReadConn conn) { - return store.parseStorePath(readString(from)); + return store.parseStorePath(readString(conn.from)); } -void WorkerProto::Serialise<StorePath>::write(const Store & store, Sink & out, const StorePath & storePath) +void WorkerProto::Serialise<StorePath>::write(const Store & store, WorkerProto::WriteConn conn, const StorePath & storePath) { - out << store.printStorePath(storePath); + conn.to << store.printStorePath(storePath); } -std::optional<TrustedFlag> WorkerProto::Serialise<std::optional<TrustedFlag>>::read(const Store & store, Source & from) +std::optional<TrustedFlag> WorkerProto::Serialise<std::optional<TrustedFlag>>::read(const Store & store, WorkerProto::ReadConn conn) { - auto temp = readNum<uint8_t>(from); + auto temp = readNum<uint8_t>(conn.from); switch (temp) { case 0: return std::nullopt; @@ -49,17 +49,17 @@ std::optional<TrustedFlag> WorkerProto::Serialise<std::optional<TrustedFlag>>::r } } -void WorkerProto::Serialise<std::optional<TrustedFlag>>::write(const Store & store, Sink & out, const std::optional<TrustedFlag> & optTrusted) +void WorkerProto::Serialise<std::optional<TrustedFlag>>::write(const Store & store, WorkerProto::WriteConn conn, const std::optional<TrustedFlag> & optTrusted) { if (!optTrusted) - out << (uint8_t)0; + conn.to << (uint8_t)0; else { switch (*optTrusted) { case Trusted: - out << (uint8_t)1; + conn.to << (uint8_t)1; break; case NotTrusted: - out << (uint8_t)2; + conn.to << (uint8_t)2; break; default: assert(false); @@ -68,83 +68,83 @@ void WorkerProto::Serialise<std::optional<TrustedFlag>>::write(const Store & sto } -ContentAddress WorkerProto::Serialise<ContentAddress>::read(const Store & store, Source & from) +ContentAddress WorkerProto::Serialise<ContentAddress>::read(const Store & store, WorkerProto::ReadConn conn) { - return ContentAddress::parse(readString(from)); + return ContentAddress::parse(readString(conn.from)); } -void WorkerProto::Serialise<ContentAddress>::write(const Store & store, Sink & out, const ContentAddress & ca) +void WorkerProto::Serialise<ContentAddress>::write(const Store & store, WorkerProto::WriteConn conn, const ContentAddress & ca) { - out << renderContentAddress(ca); + conn.to << renderContentAddress(ca); } -DerivedPath WorkerProto::Serialise<DerivedPath>::read(const Store & store, Source & from) +DerivedPath WorkerProto::Serialise<DerivedPath>::read(const Store & store, WorkerProto::ReadConn conn) { - auto s = readString(from); + auto s = readString(conn.from); return DerivedPath::parseLegacy(store, s); } -void WorkerProto::Serialise<DerivedPath>::write(const Store & store, Sink & out, const DerivedPath & req) +void WorkerProto::Serialise<DerivedPath>::write(const Store & store, WorkerProto::WriteConn conn, const DerivedPath & req) { - out << req.to_string_legacy(store); + conn.to << req.to_string_legacy(store); } -Realisation WorkerProto::Serialise<Realisation>::read(const Store & store, Source & from) +Realisation WorkerProto::Serialise<Realisation>::read(const Store & store, WorkerProto::ReadConn conn) { - std::string rawInput = readString(from); + std::string rawInput = readString(conn.from); return Realisation::fromJSON( nlohmann::json::parse(rawInput), "remote-protocol" ); } -void WorkerProto::Serialise<Realisation>::write(const Store & store, Sink & out, const Realisation & realisation) +void WorkerProto::Serialise<Realisation>::write(const Store & store, WorkerProto::WriteConn conn, const Realisation & realisation) { - out << realisation.toJSON().dump(); + conn.to << realisation.toJSON().dump(); } -DrvOutput WorkerProto::Serialise<DrvOutput>::read(const Store & store, Source & from) +DrvOutput WorkerProto::Serialise<DrvOutput>::read(const Store & store, WorkerProto::ReadConn conn) { - return DrvOutput::parse(readString(from)); + return DrvOutput::parse(readString(conn.from)); } -void WorkerProto::Serialise<DrvOutput>::write(const Store & store, Sink & out, const DrvOutput & drvOutput) +void WorkerProto::Serialise<DrvOutput>::write(const Store & store, WorkerProto::WriteConn conn, const DrvOutput & drvOutput) { - out << drvOutput.to_string(); + conn.to << drvOutput.to_string(); } -KeyedBuildResult WorkerProto::Serialise<KeyedBuildResult>::read(const Store & store, Source & from) +KeyedBuildResult WorkerProto::Serialise<KeyedBuildResult>::read(const Store & store, WorkerProto::ReadConn conn) { - auto path = WorkerProto::Serialise<DerivedPath>::read(store, from); - auto br = WorkerProto::Serialise<BuildResult>::read(store, from); + auto path = WorkerProto::Serialise<DerivedPath>::read(store, conn); + auto br = WorkerProto::Serialise<BuildResult>::read(store, conn); return KeyedBuildResult { std::move(br), /* .path = */ std::move(path), }; } -void WorkerProto::Serialise<KeyedBuildResult>::write(const Store & store, Sink & to, const KeyedBuildResult & res) +void WorkerProto::Serialise<KeyedBuildResult>::write(const Store & store, WorkerProto::WriteConn conn, const KeyedBuildResult & res) { - WorkerProto::write(store, to, res.path); - WorkerProto::write(store, to, static_cast<const BuildResult &>(res)); + WorkerProto::write(store, conn, res.path); + WorkerProto::write(store, conn, static_cast<const BuildResult &>(res)); } -BuildResult WorkerProto::Serialise<BuildResult>::read(const Store & store, Source & from) +BuildResult WorkerProto::Serialise<BuildResult>::read(const Store & store, WorkerProto::ReadConn conn) { BuildResult res; - res.status = (BuildResult::Status) readInt(from); - from + res.status = (BuildResult::Status) readInt(conn.from); + conn.from >> res.errorMsg >> res.timesBuilt >> res.isNonDeterministic >> res.startTime >> res.stopTime; - auto builtOutputs = WorkerProto::Serialise<DrvOutputs>::read(store, from); + auto builtOutputs = WorkerProto::Serialise<DrvOutputs>::read(store, conn); for (auto && [output, realisation] : builtOutputs) res.builtOutputs.insert_or_assign( std::move(output.outputName), @@ -152,9 +152,9 @@ BuildResult WorkerProto::Serialise<BuildResult>::read(const Store & store, Sourc return res; } -void WorkerProto::Serialise<BuildResult>::write(const Store & store, Sink & to, const BuildResult & res) +void WorkerProto::Serialise<BuildResult>::write(const Store & store, WorkerProto::WriteConn conn, const BuildResult & res) { - to + conn.to << res.status << res.errorMsg << res.timesBuilt @@ -164,30 +164,30 @@ void WorkerProto::Serialise<BuildResult>::write(const Store & store, Sink & to, DrvOutputs builtOutputs; for (auto & [output, realisation] : res.builtOutputs) builtOutputs.insert_or_assign(realisation.id, realisation); - WorkerProto::write(store, to, builtOutputs); + WorkerProto::write(store, conn, builtOutputs); } -std::optional<StorePath> WorkerProto::Serialise<std::optional<StorePath>>::read(const Store & store, Source & from) +std::optional<StorePath> WorkerProto::Serialise<std::optional<StorePath>>::read(const Store & store, WorkerProto::ReadConn conn) { - auto s = readString(from); + auto s = readString(conn.from); return s == "" ? std::optional<StorePath> {} : store.parseStorePath(s); } -void WorkerProto::Serialise<std::optional<StorePath>>::write(const Store & store, Sink & out, const std::optional<StorePath> & storePathOpt) +void WorkerProto::Serialise<std::optional<StorePath>>::write(const Store & store, WorkerProto::WriteConn conn, const std::optional<StorePath> & storePathOpt) { - out << (storePathOpt ? store.printStorePath(*storePathOpt) : ""); + conn.to << (storePathOpt ? store.printStorePath(*storePathOpt) : ""); } -std::optional<ContentAddress> WorkerProto::Serialise<std::optional<ContentAddress>>::read(const Store & store, Source & from) +std::optional<ContentAddress> WorkerProto::Serialise<std::optional<ContentAddress>>::read(const Store & store, WorkerProto::ReadConn conn) { - return ContentAddress::parseOpt(readString(from)); + return ContentAddress::parseOpt(readString(conn.from)); } -void WorkerProto::Serialise<std::optional<ContentAddress>>::write(const Store & store, Sink & out, const std::optional<ContentAddress> & caOpt) +void WorkerProto::Serialise<std::optional<ContentAddress>>::write(const Store & store, WorkerProto::WriteConn conn, const std::optional<ContentAddress> & caOpt) { - out << (caOpt ? renderContentAddress(*caOpt) : ""); + conn.to << (caOpt ? renderContentAddress(*caOpt) : ""); } } diff --git a/src/libstore/worker-protocol.hh b/src/libstore/worker-protocol.hh index cd6801290..ff762c924 100644 --- a/src/libstore/worker-protocol.hh +++ b/src/libstore/worker-protocol.hh @@ -50,6 +50,28 @@ struct WorkerProto enum struct Op : uint64_t; /** + * A unidirectional read connection, to be used by the read half of the + * canonical serializers below. + * + * This currently is just a `Source &`, but more fields will be added + * later. + */ + struct ReadConn { + Source & from; + }; + + /** + * A unidirectional write connection, to be used by the write half of the + * canonical serializers below. + * + * This currently is just a `Sink &`, but more fields will be added + * later. + */ + struct WriteConn { + Sink & to; + }; + + /** * Data type for canonical pairs of serialisers for the worker protocol. * * See https://en.cppreference.com/w/cpp/language/adl for the broader @@ -75,8 +97,8 @@ struct WorkerProto // This makes for a quicker debug cycle, as desired. #if 0 { - static T read(const Store & store, Source & from); - static void write(const Store & store, Sink & out, const T & t); + static T read(const Store & store, ReadConn conn); + static void write(const Store & store, WriteConn conn, const T & t); }; #endif @@ -85,9 +107,9 @@ struct WorkerProto * infer the type instead of having to write it down explicitly. */ template<typename T> - static void write(const Store & store, Sink & out, const T & t) + static void write(const Store & store, WriteConn conn, const T & t) { - WorkerProto::Serialise<T>::write(store, out, t); + WorkerProto::Serialise<T>::write(store, conn, t); } }; @@ -171,8 +193,8 @@ inline std::ostream & operator << (std::ostream & s, WorkerProto::Op op) */ #define MAKE_WORKER_PROTO(T) \ struct WorkerProto::Serialise< T > { \ - static T read(const Store & store, Source & from); \ - static void write(const Store & store, Sink & out, const T & t); \ + static T read(const Store & store, WorkerProto::ReadConn conn); \ + static void write(const Store & store, WorkerProto::WriteConn conn, const T & t); \ }; template<> diff --git a/src/nix-store/nix-store.cc b/src/nix-store/nix-store.cc index 062c68ff8..caa0248f1 100644 --- a/src/nix-store/nix-store.cc +++ b/src/nix-store/nix-store.cc @@ -807,6 +807,9 @@ static void opServe(Strings opFlags, Strings opArgs) out.flush(); unsigned int clientVersion = readInt(in); + WorkerProto::ReadConn rconn { .from = in }; + WorkerProto::WriteConn wconn { .to = out }; + auto getBuildSettings = [&]() { // FIXME: changing options here doesn't work if we're // building through the daemon. @@ -850,7 +853,7 @@ static void opServe(Strings opFlags, Strings opArgs) case ServeProto::Command::QueryValidPaths: { bool lock = readInt(in); bool substitute = readInt(in); - auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, in); + auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, rconn); if (lock && writeAllowed) for (auto & path : paths) store->addTempRoot(path); @@ -859,19 +862,19 @@ static void opServe(Strings opFlags, Strings opArgs) store->substitutePaths(paths); } - WorkerProto::write(*store, out, store->queryValidPaths(paths)); + WorkerProto::write(*store, wconn, store->queryValidPaths(paths)); break; } case ServeProto::Command::QueryPathInfos: { - auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, in); + auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, rconn); // !!! Maybe we want a queryPathInfos? for (auto & i : paths) { try { auto info = store->queryPathInfo(i); out << store->printStorePath(info->path) << (info->deriver ? store->printStorePath(*info->deriver) : ""); - WorkerProto::write(*store, out, info->references); + WorkerProto::write(*store, wconn, info->references); // !!! Maybe we want compression? out << info->narSize // downloadSize << info->narSize; @@ -899,7 +902,7 @@ static void opServe(Strings opFlags, Strings opArgs) case ServeProto::Command::ExportPaths: { readInt(in); // obsolete - store->exportPaths(WorkerProto::Serialise<StorePathSet>::read(*store, in), out); + store->exportPaths(WorkerProto::Serialise<StorePathSet>::read(*store, rconn), out); break; } @@ -945,7 +948,7 @@ static void opServe(Strings opFlags, Strings opArgs) DrvOutputs builtOutputs; for (auto & [output, realisation] : status.builtOutputs) builtOutputs.insert_or_assign(realisation.id, realisation); - WorkerProto::write(*store, out, builtOutputs); + WorkerProto::write(*store, wconn, builtOutputs); } break; @@ -954,9 +957,9 @@ static void opServe(Strings opFlags, Strings opArgs) case ServeProto::Command::QueryClosure: { bool includeOutputs = readInt(in); StorePathSet closure; - store->computeFSClosure(WorkerProto::Serialise<StorePathSet>::read(*store, in), + store->computeFSClosure(WorkerProto::Serialise<StorePathSet>::read(*store, rconn), closure, false, includeOutputs); - WorkerProto::write(*store, out, closure); + WorkerProto::write(*store, wconn, closure); break; } @@ -971,7 +974,7 @@ static void opServe(Strings opFlags, Strings opArgs) }; if (deriver != "") info.deriver = store->parseStorePath(deriver); - info.references = WorkerProto::Serialise<StorePathSet>::read(*store, in); + info.references = WorkerProto::Serialise<StorePathSet>::read(*store, rconn); in >> info.registrationTime >> info.narSize >> info.ultimate; info.sigs = readStrings<StringSet>(in); info.ca = ContentAddress::parseOpt(readString(in)); diff --git a/src/nix/daemon.cc b/src/nix/daemon.cc index 9fe9b3b1e..8e2bcf7e1 100644 --- a/src/nix/daemon.cc +++ b/src/nix/daemon.cc @@ -4,6 +4,7 @@ #include "shared.hh" #include "local-store.hh" #include "remote-store.hh" +#include "remote-store-connection.hh" #include "util.hh" #include "serialise.hh" #include "archive.hh" |