diff options
author | John Ericson <John.Ericson@Obsidian.Systems> | 2023-04-17 13:40:46 -0400 |
---|---|---|
committer | John Ericson <John.Ericson@Obsidian.Systems> | 2023-06-19 12:08:23 -0400 |
commit | 9f69b7dee9fc6035b8aa0cc718f5e74af460d9aa (patch) | |
tree | 167c44235e63dd0ed73b7ee3497ee04ecccfda86 /src/libstore/remote-store.cc | |
parent | 4e8b495ad7dddabc35bf9d6afe3573426ffed15d (diff) |
Create `worker_proto::{Read,Write}Conn`
Pass this around instead of `Source &` and `Sink &` directly. This will
give us something to put the protocol version on once the time comes.
To do this ergonomically, we need to expose `RemoteStore::Connection`,
so do that too. Give it some more API docs while we are at it.
Diffstat (limited to 'src/libstore/remote-store.cc')
-rw-r--r-- | src/libstore/remote-store.cc | 76 |
1 files changed, 39 insertions, 37 deletions
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index 9253ce09e..1e2104e1f 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -5,6 +5,7 @@ #include "remote-fs-accessor.hh" #include "build-result.hh" #include "remote-store.hh" +#include "remote-store-connection.hh" #include "worker-protocol.hh" #include "worker-protocol-impl.hh" #include "archive.hh" @@ -101,7 +102,7 @@ void RemoteStore::initConnection(Connection & conn) } if (GET_PROTOCOL_MINOR(conn.daemonVersion) >= 35) { - conn.remoteTrustsUs = WorkerProto::Serialise<std::optional<TrustedFlag>>::read(*this, conn.from); + conn.remoteTrustsUs = WorkerProto::Serialise<std::optional<TrustedFlag>>::read(*this, conn); } else { // We don't know the answer; protocol to old. conn.remoteTrustsUs = std::nullopt; @@ -185,6 +186,7 @@ struct ConnectionHandle } RemoteStore::Connection * operator -> () { return &*handle; } + RemoteStore::Connection & operator * () { return *handle; } void processStderr(Sink * sink = 0, Source * source = 0, bool flush = true) { @@ -228,12 +230,12 @@ StorePathSet RemoteStore::queryValidPaths(const StorePathSet & paths, Substitute return res; } else { conn->to << WorkerProto::Op::QueryValidPaths; - WorkerProto::write(*this, conn->to, paths); + WorkerProto::write(*this, *conn, paths); if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 27) { conn->to << (settings.buildersUseSubstitutes ? 1 : 0); } conn.processStderr(); - return WorkerProto::Serialise<StorePathSet>::read(*this, conn->from); + return WorkerProto::Serialise<StorePathSet>::read(*this, *conn); } } @@ -243,7 +245,7 @@ StorePathSet RemoteStore::queryAllValidPaths() auto conn(getConnection()); conn->to << WorkerProto::Op::QueryAllValidPaths; conn.processStderr(); - return WorkerProto::Serialise<StorePathSet>::read(*this, conn->from); + return WorkerProto::Serialise<StorePathSet>::read(*this, *conn); } @@ -260,9 +262,9 @@ StorePathSet RemoteStore::querySubstitutablePaths(const StorePathSet & paths) return res; } else { conn->to << WorkerProto::Op::QuerySubstitutablePaths; - WorkerProto::write(*this, conn->to, paths); + WorkerProto::write(*this, *conn, paths); conn.processStderr(); - return WorkerProto::Serialise<StorePathSet>::read(*this, conn->from); + return WorkerProto::Serialise<StorePathSet>::read(*this, *conn); } } @@ -284,7 +286,7 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S auto deriver = readString(conn->from); if (deriver != "") info.deriver = parseStorePath(deriver); - info.references = WorkerProto::Serialise<StorePathSet>::read(*this, conn->from); + info.references = WorkerProto::Serialise<StorePathSet>::read(*this, *conn); info.downloadSize = readLongLong(conn->from); info.narSize = readLongLong(conn->from); infos.insert_or_assign(i.first, std::move(info)); @@ -297,9 +299,9 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S StorePathSet paths; for (auto & path : pathsMap) paths.insert(path.first); - WorkerProto::write(*this, conn->to, paths); + WorkerProto::write(*this, *conn, paths); } else - WorkerProto::write(*this, conn->to, pathsMap); + WorkerProto::write(*this, *conn, pathsMap); conn.processStderr(); size_t count = readNum<size_t>(conn->from); for (size_t n = 0; n < count; n++) { @@ -307,7 +309,7 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S auto deriver = readString(conn->from); if (deriver != "") info.deriver = parseStorePath(deriver); - info.references = WorkerProto::Serialise<StorePathSet>::read(*this, conn->from); + info.references = WorkerProto::Serialise<StorePathSet>::read(*this, *conn); info.downloadSize = readLongLong(conn->from); info.narSize = readLongLong(conn->from); } @@ -350,7 +352,7 @@ void RemoteStore::queryReferrers(const StorePath & path, auto conn(getConnection()); conn->to << WorkerProto::Op::QueryReferrers << printStorePath(path); conn.processStderr(); - for (auto & i : WorkerProto::Serialise<StorePathSet>::read(*this, conn->from)) + for (auto & i : WorkerProto::Serialise<StorePathSet>::read(*this, *conn)) referrers.insert(i); } @@ -360,7 +362,7 @@ StorePathSet RemoteStore::queryValidDerivers(const StorePath & path) auto conn(getConnection()); conn->to << WorkerProto::Op::QueryValidDerivers << printStorePath(path); conn.processStderr(); - return WorkerProto::Serialise<StorePathSet>::read(*this, conn->from); + return WorkerProto::Serialise<StorePathSet>::read(*this, *conn); } @@ -372,7 +374,7 @@ StorePathSet RemoteStore::queryDerivationOutputs(const StorePath & path) auto conn(getConnection()); conn->to << WorkerProto::Op::QueryDerivationOutputs << printStorePath(path); conn.processStderr(); - return WorkerProto::Serialise<StorePathSet>::read(*this, conn->from); + return WorkerProto::Serialise<StorePathSet>::read(*this, *conn); } @@ -382,7 +384,7 @@ std::map<std::string, std::optional<StorePath>> RemoteStore::queryPartialDerivat auto conn(getConnection()); conn->to << WorkerProto::Op::QueryDerivationOutputMap << printStorePath(path); conn.processStderr(); - return WorkerProto::Serialise<std::map<std::string, std::optional<StorePath>>>::read(*this, conn->from); + return WorkerProto::Serialise<std::map<std::string, std::optional<StorePath>>>::read(*this, *conn); } else { // Fallback for old daemon versions. // For floating-CA derivations (and their co-dependencies) this is an @@ -428,7 +430,7 @@ ref<const ValidPathInfo> RemoteStore::addCAToStore( << WorkerProto::Op::AddToStore << name << caMethod.render(hashType); - WorkerProto::write(*this, conn->to, references); + WorkerProto::write(*this, *conn, references); conn->to << repair; // The dump source may invoke the store, so we need to make some room. @@ -453,7 +455,7 @@ ref<const ValidPathInfo> RemoteStore::addCAToStore( name, printHashType(hashType)); std::string s = dump.drain(); conn->to << WorkerProto::Op::AddTextToStore << name << s; - WorkerProto::write(*this, conn->to, references); + WorkerProto::write(*this, *conn, references); conn.processStderr(); }, [&](const FileIngestionMethod & fim) -> void { @@ -519,7 +521,7 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source, sink << exportMagic << printStorePath(info.path); - WorkerProto::write(*this, sink, info.references); + WorkerProto::write(*this, *conn, info.references); sink << (info.deriver ? printStorePath(*info.deriver) : "") << 0 // == no legacy signature @@ -529,7 +531,7 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source, conn.processStderr(0, source2.get()); - auto importedPaths = WorkerProto::Serialise<StorePathSet>::read(*this, conn->from); + auto importedPaths = WorkerProto::Serialise<StorePathSet>::read(*this, *conn); assert(importedPaths.size() <= 1); } @@ -538,7 +540,7 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source, << printStorePath(info.path) << (info.deriver ? printStorePath(*info.deriver) : "") << info.narHash.to_string(Base16, false); - WorkerProto::write(*this, conn->to, info.references); + WorkerProto::write(*this, *conn, info.references); conn->to << info.registrationTime << info.narSize << info.ultimate << info.sigs << renderContentAddress(info.ca) << repair << !checkSigs; @@ -611,7 +613,7 @@ void RemoteStore::registerDrvOutput(const Realisation & info) conn->to << info.id.to_string(); conn->to << std::string(info.outPath.to_string()); } else { - WorkerProto::write(*this, conn->to, info); + WorkerProto::write(*this, *conn, info); } conn.processStderr(); } @@ -634,13 +636,13 @@ void RemoteStore::queryRealisationUncached(const DrvOutput & id, auto real = [&]() -> std::shared_ptr<const Realisation> { if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 31) { auto outPaths = WorkerProto::Serialise<std::set<StorePath>>::read( - *this, conn->from); + *this, *conn); if (outPaths.empty()) return nullptr; return std::make_shared<const Realisation>(Realisation { .id = id, .outPath = *outPaths.begin() }); } else { auto realisations = WorkerProto::Serialise<std::set<Realisation>>::read( - *this, conn->from); + *this, *conn); if (realisations.empty()) return nullptr; return std::make_shared<const Realisation>(*realisations.begin()); @@ -651,10 +653,10 @@ void RemoteStore::queryRealisationUncached(const DrvOutput & id, } catch (...) { return callback.rethrow(); } } -static void writeDerivedPaths(RemoteStore & store, ConnectionHandle & conn, const std::vector<DerivedPath> & reqs) +static void writeDerivedPaths(RemoteStore & store, RemoteStore::Connection & conn, const std::vector<DerivedPath> & reqs) { - if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 30) { - WorkerProto::write(store, conn->to, reqs); + if (GET_PROTOCOL_MINOR(conn.daemonVersion) >= 30) { + WorkerProto::write(store, conn, reqs); } else { Strings ss; for (auto & p : reqs) { @@ -666,12 +668,12 @@ static void writeDerivedPaths(RemoteStore & store, ConnectionHandle & conn, cons [&](const StorePath & drvPath) { throw Error("trying to request '%s', but daemon protocol %d.%d is too old (< 1.29) to request a derivation file", store.printStorePath(drvPath), - GET_PROTOCOL_MAJOR(conn->daemonVersion), - GET_PROTOCOL_MINOR(conn->daemonVersion)); + GET_PROTOCOL_MAJOR(conn.daemonVersion), + GET_PROTOCOL_MINOR(conn.daemonVersion)); }, }, sOrDrvPath); } - conn->to << ss; + conn.to << ss; } } @@ -697,7 +699,7 @@ void RemoteStore::buildPaths(const std::vector<DerivedPath> & drvPaths, BuildMod auto conn(getConnection()); conn->to << WorkerProto::Op::BuildPaths; assert(GET_PROTOCOL_MINOR(conn->daemonVersion) >= 13); - writeDerivedPaths(*this, conn, drvPaths); + writeDerivedPaths(*this, *conn, drvPaths); if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 15) conn->to << buildMode; else @@ -721,10 +723,10 @@ std::vector<KeyedBuildResult> RemoteStore::buildPathsWithResults( if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 34) { conn->to << WorkerProto::Op::BuildPathsWithResults; - writeDerivedPaths(*this, conn, paths); + writeDerivedPaths(*this, *conn, paths); conn->to << buildMode; conn.processStderr(); - return WorkerProto::Serialise<std::vector<KeyedBuildResult>>::read(*this, conn->from); + return WorkerProto::Serialise<std::vector<KeyedBuildResult>>::read(*this, *conn); } else { // Avoid deadlock. conn_.reset(); @@ -807,7 +809,7 @@ BuildResult RemoteStore::buildDerivation(const StorePath & drvPath, const BasicD conn->from >> res.timesBuilt >> res.isNonDeterministic >> res.startTime >> res.stopTime; } if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 28) { - auto builtOutputs = WorkerProto::Serialise<DrvOutputs>::read(*this, conn->from); + auto builtOutputs = WorkerProto::Serialise<DrvOutputs>::read(*this, *conn); for (auto && [output, realisation] : builtOutputs) res.builtOutputs.insert_or_assign( std::move(output.outputName), @@ -866,7 +868,7 @@ void RemoteStore::collectGarbage(const GCOptions & options, GCResults & results) conn->to << WorkerProto::Op::CollectGarbage << options.action; - WorkerProto::write(*this, conn->to, options.pathsToDelete); + WorkerProto::write(*this, *conn, options.pathsToDelete); conn->to << options.ignoreLiveness << options.maxFreed /* removed options */ @@ -923,11 +925,11 @@ void RemoteStore::queryMissing(const std::vector<DerivedPath> & targets, // to prevent a deadlock. goto fallback; conn->to << WorkerProto::Op::QueryMissing; - writeDerivedPaths(*this, conn, targets); + writeDerivedPaths(*this, *conn, targets); conn.processStderr(); - willBuild = WorkerProto::Serialise<StorePathSet>::read(*this, conn->from); - willSubstitute = WorkerProto::Serialise<StorePathSet>::read(*this, conn->from); - unknown = WorkerProto::Serialise<StorePathSet>::read(*this, conn->from); + willBuild = WorkerProto::Serialise<StorePathSet>::read(*this, *conn); + willSubstitute = WorkerProto::Serialise<StorePathSet>::read(*this, *conn); + unknown = WorkerProto::Serialise<StorePathSet>::read(*this, *conn); conn->from >> downloadSize >> narSize; return; } |