diff options
Diffstat (limited to 'src/libstore/remote-store.cc')
-rw-r--r-- | src/libstore/remote-store.cc | 139 |
1 files changed, 89 insertions, 50 deletions
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index 761b4a087..a627e9cf1 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -162,8 +162,19 @@ void RemoteStore::initConnection(Connection & conn) try { conn.to << WORKER_MAGIC_1; conn.to.flush(); - unsigned int magic = readInt(conn.from); - if (magic != WORKER_MAGIC_2) throw Error("protocol mismatch"); + StringSink saved; + try { + TeeSource tee(conn.from, saved); + unsigned int magic = readInt(tee); + if (magic != WORKER_MAGIC_2) + throw Error("protocol mismatch"); + } catch (SerialisationError & e) { + /* In case the other side is waiting for our input, close + it. */ + conn.closeWrite(); + auto msg = conn.from.drain(); + throw Error("protocol mismatch, got '%s'", chomp(*saved.s + msg)); + } conn.from >> conn.daemonVersion; if (GET_PROTOCOL_MAJOR(conn.daemonVersion) != GET_PROTOCOL_MAJOR(PROTOCOL_VERSION)) @@ -222,6 +233,7 @@ void RemoteStore::setOptions(Connection & conn) overrides.erase(settings.buildCores.name); overrides.erase(settings.useSubstitutes.name); overrides.erase(loggerSettings.showTrace.name); + overrides.erase(settings.experimentalFeatures.name); conn.to << overrides.size(); for (auto & i : overrides) conn.to << i.first << i.second.value; @@ -278,6 +290,10 @@ ConnectionHandle RemoteStore::getConnection() return ConnectionHandle(connections->get()); } +void RemoteStore::setOptions() +{ + setOptions(*(getConnection().handle)); +} bool RemoteStore::isValidPathUncached(const StorePath & path) { @@ -386,23 +402,6 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S } -ref<const ValidPathInfo> RemoteStore::readValidPathInfo(ConnectionHandle & conn, const StorePath & path) -{ - auto deriver = readString(conn->from); - auto narHash = Hash::parseAny(readString(conn->from), htSHA256); - auto info = make_ref<ValidPathInfo>(path, narHash); - if (deriver != "") info->deriver = parseStorePath(deriver); - info->references = worker_proto::read(*this, conn->from, Phantom<StorePathSet> {}); - conn->from >> info->registrationTime >> info->narSize; - if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) { - conn->from >> info->ultimate; - info->sigs = readStrings<StringSet>(conn->from); - info->ca = parseContentAddressOpt(readString(conn->from)); - } - return info; -} - - void RemoteStore::queryPathInfoUncached(const StorePath & path, Callback<std::shared_ptr<const ValidPathInfo>> callback) noexcept { @@ -423,7 +422,8 @@ void RemoteStore::queryPathInfoUncached(const StorePath & path, bool valid; conn->from >> valid; if (!valid) throw InvalidPath("path '%s' is not valid", printStorePath(path)); } - info = readValidPathInfo(conn, path); + info = std::make_shared<ValidPathInfo>( + ValidPathInfo::read(conn->from, *this, GET_PROTOCOL_MINOR(conn->daemonVersion), StorePath{path})); } callback(std::move(info)); } catch (...) { callback.rethrow(); } @@ -525,20 +525,20 @@ ref<const ValidPathInfo> RemoteStore::addCAToStore( }); } - auto path = parseStorePath(readString(conn->from)); - return readValidPathInfo(conn, path); + return make_ref<ValidPathInfo>( + ValidPathInfo::read(conn->from, *this, GET_PROTOCOL_MINOR(conn->daemonVersion))); } else { if (repair) throw Error("repairing is not supported when building through the Nix daemon protocol < 1.25"); std::visit(overloaded { - [&](TextHashMethod thm) -> void { + [&](const TextHashMethod & thm) -> void { std::string s = dump.drain(); conn->to << wopAddTextToStore << name << s; worker_proto::write(*this, conn->to, references); conn.processStderr(); }, - [&](FixedOutputHashMethod fohm) -> void { + [&](const FixedOutputHashMethod & fohm) -> void { conn->to << wopAddToStore << name @@ -582,9 +582,8 @@ ref<const ValidPathInfo> RemoteStore::addCAToStore( StorePath RemoteStore::addToStoreFromDump(Source & dump, const string & name, - FileIngestionMethod method, HashType hashType, RepairFlag repair) + FileIngestionMethod method, HashType hashType, RepairFlag repair, const StorePathSet & references) { - StorePathSet references; return addCAToStore(dump, name, FixedOutputHashMethod{ .fileIngestionMethod = method, .hashType = hashType }, references, repair)->path; } @@ -642,6 +641,25 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source, } +void RemoteStore::addMultipleToStore( + Source & source, + RepairFlag repair, + CheckSigsFlag checkSigs) +{ + if (GET_PROTOCOL_MINOR(getConnection()->daemonVersion) >= 32) { + auto conn(getConnection()); + conn->to + << wopAddMultipleToStore + << repair + << !checkSigs; + conn.withFramedSink([&](Sink & sink) { + source.drainInto(sink); + }); + } else + Store::addMultipleToStore(source, repair, checkSigs); +} + + StorePath RemoteStore::addTextToStore(const string & name, const string & s, const StorePathSet & references, RepairFlag repair) { @@ -653,36 +671,57 @@ void RemoteStore::registerDrvOutput(const Realisation & info) { auto conn(getConnection()); conn->to << wopRegisterDrvOutput; - conn->to << info.id.to_string(); - conn->to << std::string(info.outPath.to_string()); + if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 31) { + conn->to << info.id.to_string(); + conn->to << std::string(info.outPath.to_string()); + } else { + worker_proto::write(*this, conn->to, info); + } conn.processStderr(); } -std::optional<const Realisation> RemoteStore::queryRealisation(const DrvOutput & id) +void RemoteStore::queryRealisationUncached(const DrvOutput & id, + Callback<std::shared_ptr<const Realisation>> callback) noexcept { auto conn(getConnection()); conn->to << wopQueryRealisation; conn->to << id.to_string(); conn.processStderr(); - auto outPaths = worker_proto::read(*this, conn->from, Phantom<std::set<StorePath>>{}); - if (outPaths.empty()) - return std::nullopt; - return {Realisation{.id = id, .outPath = *outPaths.begin()}}; + + auto real = [&]() -> std::shared_ptr<const Realisation> { + if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 31) { + auto outPaths = worker_proto::read( + *this, conn->from, Phantom<std::set<StorePath>> {}); + if (outPaths.empty()) + return nullptr; + return std::make_shared<const Realisation>(Realisation { .id = id, .outPath = *outPaths.begin() }); + } else { + auto realisations = worker_proto::read( + *this, conn->from, Phantom<std::set<Realisation>> {}); + if (realisations.empty()) + return nullptr; + return std::make_shared<const Realisation>(*realisations.begin()); + } + }(); + + try { + callback(std::shared_ptr<const Realisation>(real)); + } catch (...) { return callback.rethrow(); } } static void writeDerivedPaths(RemoteStore & store, ConnectionHandle & conn, const std::vector<DerivedPath> & reqs) { - if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 29) { + if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 30) { worker_proto::write(store, conn->to, reqs); } else { Strings ss; for (auto & p : reqs) { auto sOrDrvPath = StorePathWithOutputs::tryFromDerivedPath(p); std::visit(overloaded { - [&](StorePathWithOutputs s) { + [&](const StorePathWithOutputs & s) { ss.push_back(s.to_string(store)); }, - [&](StorePath drvPath) { + [&](const StorePath & drvPath) { throw Error("trying to request '%s', but daemon protocol %d.%d is too old (< 1.29) to request a derivation file", store.printStorePath(drvPath), GET_PROTOCOL_MAJOR(conn->daemonVersion), @@ -694,8 +733,18 @@ static void writeDerivedPaths(RemoteStore & store, ConnectionHandle & conn, cons } } -void RemoteStore::buildPaths(const std::vector<DerivedPath> & drvPaths, BuildMode buildMode) +void RemoteStore::buildPaths(const std::vector<DerivedPath> & drvPaths, BuildMode buildMode, std::shared_ptr<Store> evalStore) { + if (evalStore && evalStore.get() != this) { + /* The remote doesn't have a way to access evalStore, so copy + the .drvs. */ + RealisedPath::Set drvPaths2; + for (auto & i : drvPaths) + if (auto p = std::get_if<DerivedPath::Built>(&i)) + drvPaths2.insert(p->drvPath); + copyClosure(*evalStore, *this, drvPaths2); + } + auto conn(getConnection()); conn->to << wopBuildPaths; assert(GET_PROTOCOL_MINOR(conn->daemonVersion) >= 13); @@ -761,15 +810,6 @@ void RemoteStore::addIndirectRoot(const Path & path) } -void RemoteStore::syncWithGC() -{ - auto conn(getConnection()); - conn->to << wopSyncWithGC; - conn.processStderr(); - readInt(conn->from); -} - - Roots RemoteStore::findRoots(bool censor) { auto conn(getConnection()); @@ -990,14 +1030,14 @@ std::exception_ptr RemoteStore::Connection::processStderr(Sink * sink, Source * return nullptr; } -void ConnectionHandle::withFramedSink(std::function<void(Sink &sink)> fun) +void ConnectionHandle::withFramedSink(std::function<void(Sink & sink)> fun) { (*this)->to.flush(); std::exception_ptr ex; - /* Handle log messages / exceptions from the remote on a - separate thread. */ + /* Handle log messages / exceptions from the remote on a separate + thread. */ std::thread stderrThread([&]() { try { @@ -1030,7 +1070,6 @@ void ConnectionHandle::withFramedSink(std::function<void(Sink &sink)> fun) stderrThread.join(); if (ex) std::rethrow_exception(ex); - } } |