diff options
author | John Ericson <John.Ericson@Obsidian.Systems> | 2020-09-22 14:18:31 +0000 |
---|---|---|
committer | John Ericson <John.Ericson@Obsidian.Systems> | 2020-09-22 14:18:31 +0000 |
commit | e9fc2031f0c89fe65609ce6a8211186d164152fb (patch) | |
tree | 6d14122f07ddd72cbc44ca12125461fc53e1b17a /src/libstore/remote-store.cc | |
parent | b92d3b2eddcbd87d2d4b6bbbc3c99e5fedcb3a99 (diff) | |
parent | 980edd1f3a31eefe297d073f6a7cff099f21eb4a (diff) |
Merge remote-tracking branch 'upstream/master' into templated-daemon-protocol
Diffstat (limited to 'src/libstore/remote-store.cc')
-rw-r--r-- | src/libstore/remote-store.cc | 286 |
1 files changed, 155 insertions, 131 deletions
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index 34cfdb5be..273466137 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -295,6 +295,8 @@ struct ConnectionHandle std::rethrow_exception(ex); } } + + void withFramedSink(std::function<void(Sink & sink)> fun); }; @@ -408,11 +410,28 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S } +ref<const ValidPathInfo> RemoteStore::readValidPathInfo(ConnectionHandle & conn, const StorePath & path) +{ + auto deriver = readString(conn->from); + auto narHash = Hash::parseAny(readString(conn->from), htSHA256); + auto info = make_ref<ValidPathInfo>(path, narHash); + if (deriver != "") info->deriver = parseStorePath(deriver); + info->references = WorkerProto<StorePathSet>::read(*this, conn->from); + conn->from >> info->registrationTime >> info->narSize; + if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) { + conn->from >> info->ultimate; + info->sigs = readStrings<StringSet>(conn->from); + info->ca = parseContentAddressOpt(readString(conn->from)); + } + return info; +} + + void RemoteStore::queryPathInfoUncached(const StorePath & path, Callback<std::shared_ptr<const ValidPathInfo>> callback) noexcept { try { - std::shared_ptr<ValidPathInfo> info; + std::shared_ptr<const ValidPathInfo> info; { auto conn(getConnection()); conn->to << wopQueryPathInfo << printStorePath(path); @@ -428,17 +447,7 @@ void RemoteStore::queryPathInfoUncached(const StorePath & path, bool valid; conn->from >> valid; if (!valid) throw InvalidPath("path '%s' is not valid", printStorePath(path)); } - auto deriver = readString(conn->from); - auto narHash = Hash::parseAny(readString(conn->from), htSHA256); - info = std::make_shared<ValidPathInfo>(path, narHash); - if (deriver != "") info->deriver = parseStorePath(deriver); - info->references = WorkerProto<StorePathSet>::read(*this, conn->from); - conn->from >> info->registrationTime >> info->narSize; - if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) { - conn->from >> info->ultimate; - info->sigs = readStrings<StringSet>(conn->from); - info->ca = parseContentAddressOpt(readString(conn->from)); - } + info = readValidPathInfo(conn, path); } callback(std::move(info)); } catch (...) { callback.rethrow(); } @@ -512,6 +521,93 @@ std::optional<StorePath> RemoteStore::queryPathFromHashPart(const std::string & } +ref<const ValidPathInfo> RemoteStore::addCAToStore( + Source & dump, + const string & name, + ContentAddressMethod caMethod, + const StorePathSet & references, + RepairFlag repair) +{ + std::optional<ConnectionHandle> conn_(getConnection()); + auto & conn = *conn_; + + if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 25) { + + conn->to + << wopAddToStore + << name + << renderContentAddressMethod(caMethod); + WorkerProto<StorePathSet>::write(*this, conn->to, references); + conn->to << repair; + + conn.withFramedSink([&](Sink & sink) { + dump.drainInto(sink); + }); + + auto path = parseStorePath(readString(conn->from)); + return readValidPathInfo(conn, path); + } + else { + if (repair) throw Error("repairing is not supported when building through the Nix daemon protocol < 1.25"); + + std::visit(overloaded { + [&](TextHashMethod thm) -> void { + std::string s = dump.drain(); + conn->to << wopAddTextToStore << name << s; + WorkerProto<StorePathSet>::write(*this, conn->to, references); + conn.processStderr(); + }, + [&](FixedOutputHashMethod fohm) -> void { + conn->to + << wopAddToStore + << name + << ((fohm.hashType == htSHA256 && fohm.fileIngestionMethod == FileIngestionMethod::Recursive) ? 0 : 1) /* backwards compatibility hack */ + << (fohm.fileIngestionMethod == FileIngestionMethod::Recursive ? 1 : 0) + << printHashType(fohm.hashType); + + try { + conn->to.written = 0; + conn->to.warn = true; + connections->incCapacity(); + { + Finally cleanup([&]() { connections->decCapacity(); }); + if (fohm.fileIngestionMethod == FileIngestionMethod::Recursive) { + dump.drainInto(conn->to); + } else { + std::string contents = dump.drain(); + dumpString(contents, conn->to); + } + } + conn->to.warn = false; + conn.processStderr(); + } catch (SysError & e) { + /* Daemon closed while we were sending the path. Probably OOM + or I/O error. */ + if (e.errNo == EPIPE) + try { + conn.processStderr(); + } catch (EndOfFile & e) { } + throw; + } + + } + }, caMethod); + auto path = parseStorePath(readString(conn->from)); + // Release our connection to prevent a deadlock in queryPathInfo(). + conn_.reset(); + return queryPathInfo(path); + } +} + + +StorePath RemoteStore::addToStoreFromDump(Source & dump, const string & name, + FileIngestionMethod method, HashType hashType, RepairFlag repair) +{ + StorePathSet references; + return addCAToStore(dump, name, FixedOutputHashMethod{ .fileIngestionMethod = method, .hashType = hashType }, references, repair)->path; +} + + void RemoteStore::addToStore(const ValidPathInfo & info, Source & source, RepairFlag repair, CheckSigsFlag checkSigs) { @@ -552,78 +648,9 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source, << repair << !checkSigs; if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 23) { - - conn->to.flush(); - - std::exception_ptr ex; - - struct FramedSink : BufferedSink - { - ConnectionHandle & conn; - std::exception_ptr & ex; - - FramedSink(ConnectionHandle & conn, std::exception_ptr & ex) : conn(conn), ex(ex) - { } - - ~FramedSink() - { - try { - conn->to << 0; - conn->to.flush(); - } catch (...) { - ignoreException(); - } - } - - void write(const unsigned char * data, size_t len) override - { - /* Don't send more data if the remote has - encountered an error. */ - if (ex) { - auto ex2 = ex; - ex = nullptr; - std::rethrow_exception(ex2); - } - conn->to << len; - conn->to(data, len); - }; - }; - - /* Handle log messages / exceptions from the remote on a - separate thread. */ - std::thread stderrThread([&]() - { - try { - conn.processStderr(nullptr, nullptr, false); - } catch (...) { - ex = std::current_exception(); - } - }); - - Finally joinStderrThread([&]() - { - if (stderrThread.joinable()) { - stderrThread.join(); - if (ex) { - try { - std::rethrow_exception(ex); - } catch (...) { - ignoreException(); - } - } - } - }); - - { - FramedSink sink(conn, ex); + conn.withFramedSink([&](Sink & sink) { copyNAR(source, sink); - sink.flush(); - } - - stderrThread.join(); - if (ex) - std::rethrow_exception(ex); - + }); } else if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21) { conn.processStderr(0, &source); } else { @@ -634,57 +661,11 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source, } -StorePath RemoteStore::addToStore(const string & name, const Path & _srcPath, - FileIngestionMethod method, HashType hashAlgo, PathFilter & filter, RepairFlag repair) -{ - if (repair) throw Error("repairing is not supported when building through the Nix daemon"); - - auto conn(getConnection()); - - Path srcPath(absPath(_srcPath)); - - conn->to - << wopAddToStore - << name - << ((hashAlgo == htSHA256 && method == FileIngestionMethod::Recursive) ? 0 : 1) /* backwards compatibility hack */ - << (method == FileIngestionMethod::Recursive ? 1 : 0) - << printHashType(hashAlgo); - - try { - conn->to.written = 0; - conn->to.warn = true; - connections->incCapacity(); - { - Finally cleanup([&]() { connections->decCapacity(); }); - dumpPath(srcPath, conn->to, filter); - } - conn->to.warn = false; - conn.processStderr(); - } catch (SysError & e) { - /* Daemon closed while we were sending the path. Probably OOM - or I/O error. */ - if (e.errNo == EPIPE) - try { - conn.processStderr(); - } catch (EndOfFile & e) { } - throw; - } - - return parseStorePath(readString(conn->from)); -} - - StorePath RemoteStore::addTextToStore(const string & name, const string & s, const StorePathSet & references, RepairFlag repair) { - if (repair) throw Error("repairing is not supported when building through the Nix daemon"); - - auto conn(getConnection()); - conn->to << wopAddTextToStore << name << s; - WorkerProto<StorePathSet>::write(*this, conn->to, references); - - conn.processStderr(); - return parseStorePath(readString(conn->from)); + StringSource source(s); + return addCAToStore(source, name, TextHashMethod{}, references, repair)->path; } @@ -980,6 +961,49 @@ std::exception_ptr RemoteStore::Connection::processStderr(Sink * sink, Source * return nullptr; } +void ConnectionHandle::withFramedSink(std::function<void(Sink &sink)> fun) +{ + (*this)->to.flush(); + + std::exception_ptr ex; + + /* Handle log messages / exceptions from the remote on a + separate thread. */ + std::thread stderrThread([&]() + { + try { + processStderr(nullptr, nullptr, false); + } catch (...) { + ex = std::current_exception(); + } + }); + + Finally joinStderrThread([&]() + { + if (stderrThread.joinable()) { + stderrThread.join(); + if (ex) { + try { + std::rethrow_exception(ex); + } catch (...) { + ignoreException(); + } + } + } + }); + + { + FramedSink sink((*this)->to, ex); + fun(sink); + sink.flush(); + } + + stderrThread.join(); + if (ex) + std::rethrow_exception(ex); + +} + static RegisterStoreImplementation<UDSRemoteStore, UDSRemoteStoreConfig> regStore; } |