aboutsummaryrefslogtreecommitdiff
path: root/src/libstore/remote-store.cc
diff options
context:
space:
mode:
authorJohn Ericson <John.Ericson@Obsidian.Systems>2020-09-22 14:18:31 +0000
committerJohn Ericson <John.Ericson@Obsidian.Systems>2020-09-22 14:18:31 +0000
commite9fc2031f0c89fe65609ce6a8211186d164152fb (patch)
tree6d14122f07ddd72cbc44ca12125461fc53e1b17a /src/libstore/remote-store.cc
parentb92d3b2eddcbd87d2d4b6bbbc3c99e5fedcb3a99 (diff)
parent980edd1f3a31eefe297d073f6a7cff099f21eb4a (diff)
Merge remote-tracking branch 'upstream/master' into templated-daemon-protocol
Diffstat (limited to 'src/libstore/remote-store.cc')
-rw-r--r--src/libstore/remote-store.cc286
1 files changed, 155 insertions, 131 deletions
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc
index 34cfdb5be..273466137 100644
--- a/src/libstore/remote-store.cc
+++ b/src/libstore/remote-store.cc
@@ -295,6 +295,8 @@ struct ConnectionHandle
std::rethrow_exception(ex);
}
}
+
+ void withFramedSink(std::function<void(Sink & sink)> fun);
};
@@ -408,11 +410,28 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S
}
+ref<const ValidPathInfo> RemoteStore::readValidPathInfo(ConnectionHandle & conn, const StorePath & path)
+{
+ auto deriver = readString(conn->from);
+ auto narHash = Hash::parseAny(readString(conn->from), htSHA256);
+ auto info = make_ref<ValidPathInfo>(path, narHash);
+ if (deriver != "") info->deriver = parseStorePath(deriver);
+ info->references = WorkerProto<StorePathSet>::read(*this, conn->from);
+ conn->from >> info->registrationTime >> info->narSize;
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) {
+ conn->from >> info->ultimate;
+ info->sigs = readStrings<StringSet>(conn->from);
+ info->ca = parseContentAddressOpt(readString(conn->from));
+ }
+ return info;
+}
+
+
void RemoteStore::queryPathInfoUncached(const StorePath & path,
Callback<std::shared_ptr<const ValidPathInfo>> callback) noexcept
{
try {
- std::shared_ptr<ValidPathInfo> info;
+ std::shared_ptr<const ValidPathInfo> info;
{
auto conn(getConnection());
conn->to << wopQueryPathInfo << printStorePath(path);
@@ -428,17 +447,7 @@ void RemoteStore::queryPathInfoUncached(const StorePath & path,
bool valid; conn->from >> valid;
if (!valid) throw InvalidPath("path '%s' is not valid", printStorePath(path));
}
- auto deriver = readString(conn->from);
- auto narHash = Hash::parseAny(readString(conn->from), htSHA256);
- info = std::make_shared<ValidPathInfo>(path, narHash);
- if (deriver != "") info->deriver = parseStorePath(deriver);
- info->references = WorkerProto<StorePathSet>::read(*this, conn->from);
- conn->from >> info->registrationTime >> info->narSize;
- if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) {
- conn->from >> info->ultimate;
- info->sigs = readStrings<StringSet>(conn->from);
- info->ca = parseContentAddressOpt(readString(conn->from));
- }
+ info = readValidPathInfo(conn, path);
}
callback(std::move(info));
} catch (...) { callback.rethrow(); }
@@ -512,6 +521,93 @@ std::optional<StorePath> RemoteStore::queryPathFromHashPart(const std::string &
}
+ref<const ValidPathInfo> RemoteStore::addCAToStore(
+ Source & dump,
+ const string & name,
+ ContentAddressMethod caMethod,
+ const StorePathSet & references,
+ RepairFlag repair)
+{
+ std::optional<ConnectionHandle> conn_(getConnection());
+ auto & conn = *conn_;
+
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 25) {
+
+ conn->to
+ << wopAddToStore
+ << name
+ << renderContentAddressMethod(caMethod);
+ WorkerProto<StorePathSet>::write(*this, conn->to, references);
+ conn->to << repair;
+
+ conn.withFramedSink([&](Sink & sink) {
+ dump.drainInto(sink);
+ });
+
+ auto path = parseStorePath(readString(conn->from));
+ return readValidPathInfo(conn, path);
+ }
+ else {
+ if (repair) throw Error("repairing is not supported when building through the Nix daemon protocol < 1.25");
+
+ std::visit(overloaded {
+ [&](TextHashMethod thm) -> void {
+ std::string s = dump.drain();
+ conn->to << wopAddTextToStore << name << s;
+ WorkerProto<StorePathSet>::write(*this, conn->to, references);
+ conn.processStderr();
+ },
+ [&](FixedOutputHashMethod fohm) -> void {
+ conn->to
+ << wopAddToStore
+ << name
+ << ((fohm.hashType == htSHA256 && fohm.fileIngestionMethod == FileIngestionMethod::Recursive) ? 0 : 1) /* backwards compatibility hack */
+ << (fohm.fileIngestionMethod == FileIngestionMethod::Recursive ? 1 : 0)
+ << printHashType(fohm.hashType);
+
+ try {
+ conn->to.written = 0;
+ conn->to.warn = true;
+ connections->incCapacity();
+ {
+ Finally cleanup([&]() { connections->decCapacity(); });
+ if (fohm.fileIngestionMethod == FileIngestionMethod::Recursive) {
+ dump.drainInto(conn->to);
+ } else {
+ std::string contents = dump.drain();
+ dumpString(contents, conn->to);
+ }
+ }
+ conn->to.warn = false;
+ conn.processStderr();
+ } catch (SysError & e) {
+ /* Daemon closed while we were sending the path. Probably OOM
+ or I/O error. */
+ if (e.errNo == EPIPE)
+ try {
+ conn.processStderr();
+ } catch (EndOfFile & e) { }
+ throw;
+ }
+
+ }
+ }, caMethod);
+ auto path = parseStorePath(readString(conn->from));
+ // Release our connection to prevent a deadlock in queryPathInfo().
+ conn_.reset();
+ return queryPathInfo(path);
+ }
+}
+
+
+StorePath RemoteStore::addToStoreFromDump(Source & dump, const string & name,
+ FileIngestionMethod method, HashType hashType, RepairFlag repair)
+{
+ StorePathSet references;
+ return addCAToStore(dump, name, FixedOutputHashMethod{ .fileIngestionMethod = method, .hashType = hashType }, references, repair)->path;
+}
+
+
void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
RepairFlag repair, CheckSigsFlag checkSigs)
{
@@ -552,78 +648,9 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
<< repair << !checkSigs;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 23) {
-
- conn->to.flush();
-
- std::exception_ptr ex;
-
- struct FramedSink : BufferedSink
- {
- ConnectionHandle & conn;
- std::exception_ptr & ex;
-
- FramedSink(ConnectionHandle & conn, std::exception_ptr & ex) : conn(conn), ex(ex)
- { }
-
- ~FramedSink()
- {
- try {
- conn->to << 0;
- conn->to.flush();
- } catch (...) {
- ignoreException();
- }
- }
-
- void write(const unsigned char * data, size_t len) override
- {
- /* Don't send more data if the remote has
- encountered an error. */
- if (ex) {
- auto ex2 = ex;
- ex = nullptr;
- std::rethrow_exception(ex2);
- }
- conn->to << len;
- conn->to(data, len);
- };
- };
-
- /* Handle log messages / exceptions from the remote on a
- separate thread. */
- std::thread stderrThread([&]()
- {
- try {
- conn.processStderr(nullptr, nullptr, false);
- } catch (...) {
- ex = std::current_exception();
- }
- });
-
- Finally joinStderrThread([&]()
- {
- if (stderrThread.joinable()) {
- stderrThread.join();
- if (ex) {
- try {
- std::rethrow_exception(ex);
- } catch (...) {
- ignoreException();
- }
- }
- }
- });
-
- {
- FramedSink sink(conn, ex);
+ conn.withFramedSink([&](Sink & sink) {
copyNAR(source, sink);
- sink.flush();
- }
-
- stderrThread.join();
- if (ex)
- std::rethrow_exception(ex);
-
+ });
} else if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21) {
conn.processStderr(0, &source);
} else {
@@ -634,57 +661,11 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
}
-StorePath RemoteStore::addToStore(const string & name, const Path & _srcPath,
- FileIngestionMethod method, HashType hashAlgo, PathFilter & filter, RepairFlag repair)
-{
- if (repair) throw Error("repairing is not supported when building through the Nix daemon");
-
- auto conn(getConnection());
-
- Path srcPath(absPath(_srcPath));
-
- conn->to
- << wopAddToStore
- << name
- << ((hashAlgo == htSHA256 && method == FileIngestionMethod::Recursive) ? 0 : 1) /* backwards compatibility hack */
- << (method == FileIngestionMethod::Recursive ? 1 : 0)
- << printHashType(hashAlgo);
-
- try {
- conn->to.written = 0;
- conn->to.warn = true;
- connections->incCapacity();
- {
- Finally cleanup([&]() { connections->decCapacity(); });
- dumpPath(srcPath, conn->to, filter);
- }
- conn->to.warn = false;
- conn.processStderr();
- } catch (SysError & e) {
- /* Daemon closed while we were sending the path. Probably OOM
- or I/O error. */
- if (e.errNo == EPIPE)
- try {
- conn.processStderr();
- } catch (EndOfFile & e) { }
- throw;
- }
-
- return parseStorePath(readString(conn->from));
-}
-
-
StorePath RemoteStore::addTextToStore(const string & name, const string & s,
const StorePathSet & references, RepairFlag repair)
{
- if (repair) throw Error("repairing is not supported when building through the Nix daemon");
-
- auto conn(getConnection());
- conn->to << wopAddTextToStore << name << s;
- WorkerProto<StorePathSet>::write(*this, conn->to, references);
-
- conn.processStderr();
- return parseStorePath(readString(conn->from));
+ StringSource source(s);
+ return addCAToStore(source, name, TextHashMethod{}, references, repair)->path;
}
@@ -980,6 +961,49 @@ std::exception_ptr RemoteStore::Connection::processStderr(Sink * sink, Source *
return nullptr;
}
+void ConnectionHandle::withFramedSink(std::function<void(Sink &sink)> fun)
+{
+ (*this)->to.flush();
+
+ std::exception_ptr ex;
+
+ /* Handle log messages / exceptions from the remote on a
+ separate thread. */
+ std::thread stderrThread([&]()
+ {
+ try {
+ processStderr(nullptr, nullptr, false);
+ } catch (...) {
+ ex = std::current_exception();
+ }
+ });
+
+ Finally joinStderrThread([&]()
+ {
+ if (stderrThread.joinable()) {
+ stderrThread.join();
+ if (ex) {
+ try {
+ std::rethrow_exception(ex);
+ } catch (...) {
+ ignoreException();
+ }
+ }
+ }
+ });
+
+ {
+ FramedSink sink((*this)->to, ex);
+ fun(sink);
+ sink.flush();
+ }
+
+ stderrThread.join();
+ if (ex)
+ std::rethrow_exception(ex);
+
+}
+
static RegisterStoreImplementation<UDSRemoteStore, UDSRemoteStoreConfig> regStore;
}