aboutsummaryrefslogtreecommitdiff
path: root/src/libstore/remote-store.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstore/remote-store.cc')
-rw-r--r--src/libstore/remote-store.cc674
1 files changed, 378 insertions, 296 deletions
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc
index 33d1e431b..761b4a087 100644
--- a/src/libstore/remote-store.cc
+++ b/src/libstore/remote-store.cc
@@ -1,5 +1,7 @@
#include "serialise.hh"
#include "util.hh"
+#include "path-with-outputs.hh"
+#include "remote-fs-accessor.hh"
#include "remote-store.hh"
#include "worker-protocol.hh"
#include "archive.hh"
@@ -9,169 +11,148 @@
#include "pool.hh"
#include "finally.hh"
#include "logging.hh"
-
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <sys/socket.h>
-#include <sys/un.h>
-#include <errno.h>
-#include <fcntl.h>
-#include <unistd.h>
-
-#include <cstring>
+#include "callback.hh"
+#include "filetransfer.hh"
+#include <nlohmann/json.hpp>
namespace nix {
+namespace worker_proto {
-template<> StorePathSet readStorePaths(const Store & store, Source & from)
+std::string read(const Store & store, Source & from, Phantom<std::string> _)
{
- StorePathSet paths;
- for (auto & i : readStrings<Strings>(from))
- paths.insert(store.parseStorePath(i));
- return paths;
+ return readString(from);
}
+void write(const Store & store, Sink & out, const std::string & str)
+{
+ out << str;
+}
-void writeStorePaths(const Store & store, Sink & out, const StorePathSet & paths)
+
+StorePath read(const Store & store, Source & from, Phantom<StorePath> _)
{
- out << paths.size();
- for (auto & i : paths)
- out << store.printStorePath(i);
+ return store.parseStorePath(readString(from));
}
-StorePathCAMap readStorePathCAMap(const Store & store, Source & from)
+void write(const Store & store, Sink & out, const StorePath & storePath)
{
- StorePathCAMap paths;
- auto count = readNum<size_t>(from);
- while (count--)
- paths.insert_or_assign(store.parseStorePath(readString(from)), parseContentAddressOpt(readString(from)));
- return paths;
+ out << store.printStorePath(storePath);
}
-void writeStorePathCAMap(const Store & store, Sink & out, const StorePathCAMap & paths)
+
+ContentAddress read(const Store & store, Source & from, Phantom<ContentAddress> _)
{
- out << paths.size();
- for (auto & i : paths) {
- out << store.printStorePath(i.first);
- out << renderContentAddress(i.second);
- }
+ return parseContentAddress(readString(from));
}
-std::map<string, StorePath> readOutputPathMap(const Store & store, Source & from)
+void write(const Store & store, Sink & out, const ContentAddress & ca)
{
- std::map<string, StorePath> pathMap;
- auto rawInput = readStrings<Strings>(from);
- if (rawInput.size() % 2)
- throw Error("got an odd number of elements from the daemon when trying to read a output path map");
- auto curInput = rawInput.begin();
- while (curInput != rawInput.end()) {
- auto thisKey = *curInput++;
- auto thisValue = *curInput++;
- pathMap.emplace(thisKey, store.parseStorePath(thisValue));
- }
- return pathMap;
+ out << renderContentAddress(ca);
}
-void writeOutputPathMap(const Store & store, Sink & out, const std::map<string, StorePath> & pathMap)
+
+DerivedPath read(const Store & store, Source & from, Phantom<DerivedPath> _)
{
- out << 2*pathMap.size();
- for (auto & i : pathMap) {
- out << i.first;
- out << store.printStorePath(i.second);
- }
+ auto s = readString(from);
+ return DerivedPath::parse(store, s);
}
-/* TODO: Separate these store impls into different files, give them better names */
-RemoteStore::RemoteStore(const Params & params)
- : Store(params)
- , connections(make_ref<Pool<Connection>>(
- std::max(1, (int) maxConnections),
- [this]() { return openConnectionWrapper(); },
- [this](const ref<Connection> & r) {
- return
- r->to.good()
- && r->from.good()
- && std::chrono::duration_cast<std::chrono::seconds>(
- std::chrono::steady_clock::now() - r->startTime).count() < maxConnectionAge;
- }
- ))
+void write(const Store & store, Sink & out, const DerivedPath & req)
{
+ out << req.to_string(store);
}
-ref<RemoteStore::Connection> RemoteStore::openConnectionWrapper()
+Realisation read(const Store & store, Source & from, Phantom<Realisation> _)
{
- if (failed)
- throw Error("opening a connection to remote store '%s' previously failed", getUri());
- try {
- return openConnection();
- } catch (...) {
- failed = true;
- throw;
- }
+ std::string rawInput = readString(from);
+ return Realisation::fromJSON(
+ nlohmann::json::parse(rawInput),
+ "remote-protocol"
+ );
}
-
-UDSRemoteStore::UDSRemoteStore(const Params & params)
- : Store(params)
- , LocalFSStore(params)
- , RemoteStore(params)
+void write(const Store & store, Sink & out, const Realisation & realisation)
{
+ out << realisation.toJSON().dump();
}
-UDSRemoteStore::UDSRemoteStore(std::string socket_path, const Params & params)
- : Store(params)
- , LocalFSStore(params)
- , RemoteStore(params)
- , path(socket_path)
+DrvOutput read(const Store & store, Source & from, Phantom<DrvOutput> _)
{
+ return DrvOutput::parse(readString(from));
}
-
-std::string UDSRemoteStore::getUri()
+void write(const Store & store, Sink & out, const DrvOutput & drvOutput)
{
- if (path) {
- return std::string("unix://") + *path;
- } else {
- return "daemon";
- }
+ out << drvOutput.to_string();
}
-ref<RemoteStore::Connection> UDSRemoteStore::openConnection()
+std::optional<StorePath> read(const Store & store, Source & from, Phantom<std::optional<StorePath>> _)
{
- auto conn = make_ref<Connection>();
+ auto s = readString(from);
+ return s == "" ? std::optional<StorePath> {} : store.parseStorePath(s);
+}
- /* Connect to a daemon that does the privileged work for us. */
- conn->fd = socket(PF_UNIX, SOCK_STREAM
- #ifdef SOCK_CLOEXEC
- | SOCK_CLOEXEC
- #endif
- , 0);
- if (!conn->fd)
- throw SysError("cannot create Unix domain socket");
- closeOnExec(conn->fd.get());
+void write(const Store & store, Sink & out, const std::optional<StorePath> & storePathOpt)
+{
+ out << (storePathOpt ? store.printStorePath(*storePathOpt) : "");
+}
- string socketPath = path ? *path : settings.nixDaemonSocketFile;
- struct sockaddr_un addr;
- addr.sun_family = AF_UNIX;
- if (socketPath.size() + 1 >= sizeof(addr.sun_path))
- throw Error("socket path '%1%' is too long", socketPath);
- strcpy(addr.sun_path, socketPath.c_str());
+std::optional<ContentAddress> read(const Store & store, Source & from, Phantom<std::optional<ContentAddress>> _)
+{
+ return parseContentAddressOpt(readString(from));
+}
- if (::connect(conn->fd.get(), (struct sockaddr *) &addr, sizeof(addr)) == -1)
- throw SysError("cannot connect to daemon at '%1%'", socketPath);
+void write(const Store & store, Sink & out, const std::optional<ContentAddress> & caOpt)
+{
+ out << (caOpt ? renderContentAddress(*caOpt) : "");
+}
- conn->from.fd = conn->fd.get();
- conn->to.fd = conn->fd.get();
+}
- conn->startTime = std::chrono::steady_clock::now();
- initConnection(*conn);
+/* TODO: Separate these store impls into different files, give them better names */
+RemoteStore::RemoteStore(const Params & params)
+ : RemoteStoreConfig(params)
+ , Store(params)
+ , connections(make_ref<Pool<Connection>>(
+ std::max(1, (int) maxConnections),
+ [this]() {
+ auto conn = openConnectionWrapper();
+ try {
+ initConnection(*conn);
+ } catch (...) {
+ failed = true;
+ throw;
+ }
+ return conn;
+ },
+ [this](const ref<Connection> & r) {
+ return
+ r->to.good()
+ && r->from.good()
+ && std::chrono::duration_cast<std::chrono::seconds>(
+ std::chrono::steady_clock::now() - r->startTime).count() < maxConnectionAge;
+ }
+ ))
+{
+}
- return conn;
+
+ref<RemoteStore::Connection> RemoteStore::openConnectionWrapper()
+{
+ if (failed)
+ throw Error("opening a connection to remote store '%s' previously failed", getUri());
+ try {
+ return openConnection();
+ } catch (...) {
+ failed = true;
+ throw;
+ }
}
@@ -231,7 +212,8 @@ void RemoteStore::setOptions(Connection & conn)
if (GET_PROTOCOL_MINOR(conn.daemonVersion) >= 12) {
std::map<std::string, Config::SettingInfo> overrides;
- globalConfig.getSettings(overrides, true);
+ settings.getSettings(overrides, true); // libstore settings
+ fileTransferSettings.getSettings(overrides, true);
overrides.erase(settings.keepFailed.name);
overrides.erase(settings.keepGoing.name);
overrides.erase(settings.tryFallback.name);
@@ -278,14 +260,16 @@ struct ConnectionHandle
RemoteStore::Connection * operator -> () { return &*handle; }
- void processStderr(Sink * sink = 0, Source * source = 0)
+ void processStderr(Sink * sink = 0, Source * source = 0, bool flush = true)
{
- auto ex = handle->processStderr(sink, source);
+ auto ex = handle->processStderr(sink, source, flush);
if (ex) {
daemonException = true;
std::rethrow_exception(ex);
}
}
+
+ void withFramedSink(std::function<void(Sink & sink)> fun);
};
@@ -314,9 +298,12 @@ StorePathSet RemoteStore::queryValidPaths(const StorePathSet & paths, Substitute
return res;
} else {
conn->to << wopQueryValidPaths;
- writeStorePaths(*this, conn->to, paths);
+ worker_proto::write(*this, conn->to, paths);
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 27) {
+ conn->to << (settings.buildersUseSubstitutes ? 1 : 0);
+ }
conn.processStderr();
- return readStorePaths<StorePathSet>(*this, conn->from);
+ return worker_proto::read(*this, conn->from, Phantom<StorePathSet> {});
}
}
@@ -326,7 +313,7 @@ StorePathSet RemoteStore::queryAllValidPaths()
auto conn(getConnection());
conn->to << wopQueryAllValidPaths;
conn.processStderr();
- return readStorePaths<StorePathSet>(*this, conn->from);
+ return worker_proto::read(*this, conn->from, Phantom<StorePathSet> {});
}
@@ -343,9 +330,9 @@ StorePathSet RemoteStore::querySubstitutablePaths(const StorePathSet & paths)
return res;
} else {
conn->to << wopQuerySubstitutablePaths;
- writeStorePaths(*this, conn->to, paths);
+ worker_proto::write(*this, conn->to, paths);
conn.processStderr();
- return readStorePaths<StorePathSet>(*this, conn->from);
+ return worker_proto::read(*this, conn->from, Phantom<StorePathSet> {});
}
}
@@ -367,7 +354,7 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S
auto deriver = readString(conn->from);
if (deriver != "")
info.deriver = parseStorePath(deriver);
- info.references = readStorePaths<StorePathSet>(*this, conn->from);
+ info.references = worker_proto::read(*this, conn->from, Phantom<StorePathSet> {});
info.downloadSize = readLongLong(conn->from);
info.narSize = readLongLong(conn->from);
infos.insert_or_assign(i.first, std::move(info));
@@ -380,9 +367,9 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S
StorePathSet paths;
for (auto & path : pathsMap)
paths.insert(path.first);
- writeStorePaths(*this, conn->to, paths);
+ worker_proto::write(*this, conn->to, paths);
} else
- writeStorePathCAMap(*this, conn->to, pathsMap);
+ worker_proto::write(*this, conn->to, pathsMap);
conn.processStderr();
size_t count = readNum<size_t>(conn->from);
for (size_t n = 0; n < count; n++) {
@@ -390,7 +377,7 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S
auto deriver = readString(conn->from);
if (deriver != "")
info.deriver = parseStorePath(deriver);
- info.references = readStorePaths<StorePathSet>(*this, conn->from);
+ info.references = worker_proto::read(*this, conn->from, Phantom<StorePathSet> {});
info.downloadSize = readLongLong(conn->from);
info.narSize = readLongLong(conn->from);
}
@@ -399,11 +386,28 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S
}
+ref<const ValidPathInfo> RemoteStore::readValidPathInfo(ConnectionHandle & conn, const StorePath & path)
+{
+ auto deriver = readString(conn->from);
+ auto narHash = Hash::parseAny(readString(conn->from), htSHA256);
+ auto info = make_ref<ValidPathInfo>(path, narHash);
+ if (deriver != "") info->deriver = parseStorePath(deriver);
+ info->references = worker_proto::read(*this, conn->from, Phantom<StorePathSet> {});
+ conn->from >> info->registrationTime >> info->narSize;
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) {
+ conn->from >> info->ultimate;
+ info->sigs = readStrings<StringSet>(conn->from);
+ info->ca = parseContentAddressOpt(readString(conn->from));
+ }
+ return info;
+}
+
+
void RemoteStore::queryPathInfoUncached(const StorePath & path,
Callback<std::shared_ptr<const ValidPathInfo>> callback) noexcept
{
try {
- std::shared_ptr<ValidPathInfo> info;
+ std::shared_ptr<const ValidPathInfo> info;
{
auto conn(getConnection());
conn->to << wopQueryPathInfo << printStorePath(path);
@@ -419,17 +423,7 @@ void RemoteStore::queryPathInfoUncached(const StorePath & path,
bool valid; conn->from >> valid;
if (!valid) throw InvalidPath("path '%s' is not valid", printStorePath(path));
}
- info = std::make_shared<ValidPathInfo>(StorePath(path));
- auto deriver = readString(conn->from);
- if (deriver != "") info->deriver = parseStorePath(deriver);
- info->narHash = Hash::parseAny(readString(conn->from), htSHA256);
- info->references = readStorePaths<StorePathSet>(*this, conn->from);
- conn->from >> info->registrationTime >> info->narSize;
- if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) {
- conn->from >> info->ultimate;
- info->sigs = readStrings<StringSet>(conn->from);
- info->ca = parseContentAddressOpt(readString(conn->from));
- }
+ info = readValidPathInfo(conn, path);
}
callback(std::move(info));
} catch (...) { callback.rethrow(); }
@@ -442,7 +436,7 @@ void RemoteStore::queryReferrers(const StorePath & path,
auto conn(getConnection());
conn->to << wopQueryReferrers << printStorePath(path);
conn.processStderr();
- for (auto & i : readStorePaths<StorePathSet>(*this, conn->from))
+ for (auto & i : worker_proto::read(*this, conn->from, Phantom<StorePathSet> {}))
referrers.insert(i);
}
@@ -452,29 +446,44 @@ StorePathSet RemoteStore::queryValidDerivers(const StorePath & path)
auto conn(getConnection());
conn->to << wopQueryValidDerivers << printStorePath(path);
conn.processStderr();
- return readStorePaths<StorePathSet>(*this, conn->from);
+ return worker_proto::read(*this, conn->from, Phantom<StorePathSet> {});
}
StorePathSet RemoteStore::queryDerivationOutputs(const StorePath & path)
{
- auto conn(getConnection());
- if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 0x16) {
+ if (GET_PROTOCOL_MINOR(getProtocol()) >= 0x16) {
return Store::queryDerivationOutputs(path);
}
+ auto conn(getConnection());
conn->to << wopQueryDerivationOutputs << printStorePath(path);
conn.processStderr();
- return readStorePaths<StorePathSet>(*this, conn->from);
+ return worker_proto::read(*this, conn->from, Phantom<StorePathSet> {});
}
-OutputPathMap RemoteStore::queryDerivationOutputMap(const StorePath & path)
+std::map<std::string, std::optional<StorePath>> RemoteStore::queryPartialDerivationOutputMap(const StorePath & path)
{
- auto conn(getConnection());
- conn->to << wopQueryDerivationOutputMap << printStorePath(path);
- conn.processStderr();
- return readOutputPathMap(*this, conn->from);
-
+ if (GET_PROTOCOL_MINOR(getProtocol()) >= 0x16) {
+ auto conn(getConnection());
+ conn->to << wopQueryDerivationOutputMap << printStorePath(path);
+ conn.processStderr();
+ return worker_proto::read(*this, conn->from, Phantom<std::map<std::string, std::optional<StorePath>>> {});
+ } else {
+ // Fallback for old daemon versions.
+ // For floating-CA derivations (and their co-dependencies) this is an
+ // under-approximation as it only returns the paths that can be inferred
+ // from the derivation itself (and not the ones that are known because
+ // the have been built), but as old stores don't handle floating-CA
+ // derivations this shouldn't matter
+ auto derivation = readDerivation(path);
+ auto outputsWithOptPaths = derivation.outputsAndOptPaths(*this);
+ std::map<std::string, std::optional<StorePath>> ret;
+ for (auto & [outputName, outputAndPath] : outputsWithOptPaths) {
+ ret.emplace(outputName, outputAndPath.second);
+ }
+ return ret;
+ }
}
std::optional<StorePath> RemoteStore::queryPathFromHashPart(const std::string & hashPart)
@@ -488,6 +497,98 @@ std::optional<StorePath> RemoteStore::queryPathFromHashPart(const std::string &
}
+ref<const ValidPathInfo> RemoteStore::addCAToStore(
+ Source & dump,
+ const string & name,
+ ContentAddressMethod caMethod,
+ const StorePathSet & references,
+ RepairFlag repair)
+{
+ std::optional<ConnectionHandle> conn_(getConnection());
+ auto & conn = *conn_;
+
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 25) {
+
+ conn->to
+ << wopAddToStore
+ << name
+ << renderContentAddressMethod(caMethod);
+ worker_proto::write(*this, conn->to, references);
+ conn->to << repair;
+
+ // The dump source may invoke the store, so we need to make some room.
+ connections->incCapacity();
+ {
+ Finally cleanup([&]() { connections->decCapacity(); });
+ conn.withFramedSink([&](Sink & sink) {
+ dump.drainInto(sink);
+ });
+ }
+
+ auto path = parseStorePath(readString(conn->from));
+ return readValidPathInfo(conn, path);
+ }
+ else {
+ if (repair) throw Error("repairing is not supported when building through the Nix daemon protocol < 1.25");
+
+ std::visit(overloaded {
+ [&](TextHashMethod thm) -> void {
+ std::string s = dump.drain();
+ conn->to << wopAddTextToStore << name << s;
+ worker_proto::write(*this, conn->to, references);
+ conn.processStderr();
+ },
+ [&](FixedOutputHashMethod fohm) -> void {
+ conn->to
+ << wopAddToStore
+ << name
+ << ((fohm.hashType == htSHA256 && fohm.fileIngestionMethod == FileIngestionMethod::Recursive) ? 0 : 1) /* backwards compatibility hack */
+ << (fohm.fileIngestionMethod == FileIngestionMethod::Recursive ? 1 : 0)
+ << printHashType(fohm.hashType);
+
+ try {
+ conn->to.written = 0;
+ conn->to.warn = true;
+ connections->incCapacity();
+ {
+ Finally cleanup([&]() { connections->decCapacity(); });
+ if (fohm.fileIngestionMethod == FileIngestionMethod::Recursive) {
+ dump.drainInto(conn->to);
+ } else {
+ std::string contents = dump.drain();
+ dumpString(contents, conn->to);
+ }
+ }
+ conn->to.warn = false;
+ conn.processStderr();
+ } catch (SysError & e) {
+ /* Daemon closed while we were sending the path. Probably OOM
+ or I/O error. */
+ if (e.errNo == EPIPE)
+ try {
+ conn.processStderr();
+ } catch (EndOfFile & e) { }
+ throw;
+ }
+
+ }
+ }, caMethod);
+ auto path = parseStorePath(readString(conn->from));
+ // Release our connection to prevent a deadlock in queryPathInfo().
+ conn_.reset();
+ return queryPathInfo(path);
+ }
+}
+
+
+StorePath RemoteStore::addToStoreFromDump(Source & dump, const string & name,
+ FileIngestionMethod method, HashType hashType, RepairFlag repair)
+{
+ StorePathSet references;
+ return addCAToStore(dump, name, FixedOutputHashMethod{ .fileIngestionMethod = method, .hashType = hashType }, references, repair)->path;
+}
+
+
void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
RepairFlag repair, CheckSigsFlag checkSigs)
{
@@ -503,7 +604,7 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
sink
<< exportMagic
<< printStorePath(info.path);
- writeStorePaths(*this, sink, info.references);
+ worker_proto::write(*this, sink, info.references);
sink
<< (info.deriver ? printStorePath(*info.deriver) : "")
<< 0 // == no legacy signature
@@ -513,7 +614,7 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
conn.processStderr(0, source2.get());
- auto importedPaths = readStorePaths<StorePathSet>(*this, conn->from);
+ auto importedPaths = worker_proto::read(*this, conn->from, Phantom<StorePathSet> {});
assert(importedPaths.size() <= 1);
}
@@ -521,83 +622,16 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
conn->to << wopAddToStoreNar
<< printStorePath(info.path)
<< (info.deriver ? printStorePath(*info.deriver) : "")
- << info.narHash->to_string(Base16, false);
- writeStorePaths(*this, conn->to, info.references);
+ << info.narHash.to_string(Base16, false);
+ worker_proto::write(*this, conn->to, info.references);
conn->to << info.registrationTime << info.narSize
<< info.ultimate << info.sigs << renderContentAddress(info.ca)
<< repair << !checkSigs;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 23) {
-
- std::exception_ptr ex;
-
- struct FramedSink : BufferedSink
- {
- ConnectionHandle & conn;
- std::exception_ptr & ex;
-
- FramedSink(ConnectionHandle & conn, std::exception_ptr & ex) : conn(conn), ex(ex)
- { }
-
- ~FramedSink()
- {
- try {
- conn->to << 0;
- conn->to.flush();
- } catch (...) {
- ignoreException();
- }
- }
-
- void write(const unsigned char * data, size_t len) override
- {
- /* Don't send more data if the remote has
- encountered an error. */
- if (ex) {
- auto ex2 = ex;
- ex = nullptr;
- std::rethrow_exception(ex2);
- }
- conn->to << len;
- conn->to(data, len);
- };
- };
-
- /* Handle log messages / exceptions from the remote on a
- separate thread. */
- std::thread stderrThread([&]()
- {
- try {
- conn.processStderr();
- } catch (...) {
- ex = std::current_exception();
- }
- });
-
- Finally joinStderrThread([&]()
- {
- if (stderrThread.joinable()) {
- stderrThread.join();
- if (ex) {
- try {
- std::rethrow_exception(ex);
- } catch (...) {
- ignoreException();
- }
- }
- }
- });
-
- {
- FramedSink sink(conn, ex);
+ conn.withFramedSink([&](Sink & sink) {
copyNAR(source, sink);
- sink.flush();
- }
-
- stderrThread.join();
- if (ex)
- std::rethrow_exception(ex);
-
+ });
} else if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21) {
conn.processStderr(0, &source);
} else {
@@ -608,69 +642,64 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
}
-StorePath RemoteStore::addToStore(const string & name, const Path & _srcPath,
- FileIngestionMethod method, HashType hashAlgo, PathFilter & filter, RepairFlag repair)
+StorePath RemoteStore::addTextToStore(const string & name, const string & s,
+ const StorePathSet & references, RepairFlag repair)
{
- if (repair) throw Error("repairing is not supported when building through the Nix daemon");
+ StringSource source(s);
+ return addCAToStore(source, name, TextHashMethod{}, references, repair)->path;
+}
+void RemoteStore::registerDrvOutput(const Realisation & info)
+{
auto conn(getConnection());
-
- Path srcPath(absPath(_srcPath));
-
- conn->to
- << wopAddToStore
- << name
- << ((hashAlgo == htSHA256 && method == FileIngestionMethod::Recursive) ? 0 : 1) /* backwards compatibility hack */
- << (method == FileIngestionMethod::Recursive ? 1 : 0)
- << printHashType(hashAlgo);
-
- try {
- conn->to.written = 0;
- conn->to.warn = true;
- connections->incCapacity();
- {
- Finally cleanup([&]() { connections->decCapacity(); });
- dumpPath(srcPath, conn->to, filter);
- }
- conn->to.warn = false;
- conn.processStderr();
- } catch (SysError & e) {
- /* Daemon closed while we were sending the path. Probably OOM
- or I/O error. */
- if (e.errNo == EPIPE)
- try {
- conn.processStderr();
- } catch (EndOfFile & e) { }
- throw;
- }
-
- return parseStorePath(readString(conn->from));
+ conn->to << wopRegisterDrvOutput;
+ conn->to << info.id.to_string();
+ conn->to << std::string(info.outPath.to_string());
+ conn.processStderr();
}
-
-StorePath RemoteStore::addTextToStore(const string & name, const string & s,
- const StorePathSet & references, RepairFlag repair)
+std::optional<const Realisation> RemoteStore::queryRealisation(const DrvOutput & id)
{
- if (repair) throw Error("repairing is not supported when building through the Nix daemon");
-
auto conn(getConnection());
- conn->to << wopAddTextToStore << name << s;
- writeStorePaths(*this, conn->to, references);
-
+ conn->to << wopQueryRealisation;
+ conn->to << id.to_string();
conn.processStderr();
- return parseStorePath(readString(conn->from));
+ auto outPaths = worker_proto::read(*this, conn->from, Phantom<std::set<StorePath>>{});
+ if (outPaths.empty())
+ return std::nullopt;
+ return {Realisation{.id = id, .outPath = *outPaths.begin()}};
}
+static void writeDerivedPaths(RemoteStore & store, ConnectionHandle & conn, const std::vector<DerivedPath> & reqs)
+{
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 29) {
+ worker_proto::write(store, conn->to, reqs);
+ } else {
+ Strings ss;
+ for (auto & p : reqs) {
+ auto sOrDrvPath = StorePathWithOutputs::tryFromDerivedPath(p);
+ std::visit(overloaded {
+ [&](StorePathWithOutputs s) {
+ ss.push_back(s.to_string(store));
+ },
+ [&](StorePath drvPath) {
+ throw Error("trying to request '%s', but daemon protocol %d.%d is too old (< 1.29) to request a derivation file",
+ store.printStorePath(drvPath),
+ GET_PROTOCOL_MAJOR(conn->daemonVersion),
+ GET_PROTOCOL_MINOR(conn->daemonVersion));
+ },
+ }, sOrDrvPath);
+ }
+ conn->to << ss;
+ }
+}
-void RemoteStore::buildPaths(const std::vector<StorePathWithOutputs> & drvPaths, BuildMode buildMode)
+void RemoteStore::buildPaths(const std::vector<DerivedPath> & drvPaths, BuildMode buildMode)
{
auto conn(getConnection());
conn->to << wopBuildPaths;
assert(GET_PROTOCOL_MINOR(conn->daemonVersion) >= 13);
- Strings ss;
- for (auto & p : drvPaths)
- ss.push_back(p.to_string(*this));
- conn->to << ss;
+ writeDerivedPaths(*this, conn, drvPaths);
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 15)
conn->to << buildMode;
else
@@ -692,9 +721,15 @@ BuildResult RemoteStore::buildDerivation(const StorePath & drvPath, const BasicD
conn->to << buildMode;
conn.processStderr();
BuildResult res;
- unsigned int status;
- conn->from >> status >> res.errorMsg;
- res.status = (BuildResult::Status) status;
+ res.status = (BuildResult::Status) readInt(conn->from);
+ conn->from >> res.errorMsg;
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 29) {
+ conn->from >> res.timesBuilt >> res.isNonDeterministic >> res.startTime >> res.stopTime;
+ }
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 28) {
+ auto builtOutputs = worker_proto::read(*this, conn->from, Phantom<DrvOutputs> {});
+ res.builtOutputs = builtOutputs;
+ }
return res;
}
@@ -757,7 +792,7 @@ void RemoteStore::collectGarbage(const GCOptions & options, GCResults & results)
conn->to
<< wopCollectGarbage << options.action;
- writeStorePaths(*this, conn->to, options.pathsToDelete);
+ worker_proto::write(*this, conn->to, options.pathsToDelete);
conn->to << options.ignoreLiveness
<< options.maxFreed
/* removed options */
@@ -803,7 +838,7 @@ void RemoteStore::addSignatures(const StorePath & storePath, const StringSet & s
}
-void RemoteStore::queryMissing(const std::vector<StorePathWithOutputs> & targets,
+void RemoteStore::queryMissing(const std::vector<DerivedPath> & targets,
StorePathSet & willBuild, StorePathSet & willSubstitute, StorePathSet & unknown,
uint64_t & downloadSize, uint64_t & narSize)
{
@@ -814,14 +849,11 @@ void RemoteStore::queryMissing(const std::vector<StorePathWithOutputs> & targets
// to prevent a deadlock.
goto fallback;
conn->to << wopQueryMissing;
- Strings ss;
- for (auto & p : targets)
- ss.push_back(p.to_string(*this));
- conn->to << ss;
+ writeDerivedPaths(*this, conn, targets);
conn.processStderr();
- willBuild = readStorePaths<StorePathSet>(*this, conn->from);
- willSubstitute = readStorePaths<StorePathSet>(*this, conn->from);
- unknown = readStorePaths<StorePathSet>(*this, conn->from);
+ willBuild = worker_proto::read(*this, conn->from, Phantom<StorePathSet> {});
+ willSubstitute = worker_proto::read(*this, conn->from, Phantom<StorePathSet> {});
+ unknown = worker_proto::read(*this, conn->from, Phantom<StorePathSet> {});
conn->from >> downloadSize >> narSize;
return;
}
@@ -860,6 +892,18 @@ RemoteStore::Connection::~Connection()
}
}
+void RemoteStore::narFromPath(const StorePath & path, Sink & sink)
+{
+ auto conn(connections->get());
+ conn->to << wopNarFromPath << printStorePath(path);
+ conn->processStderr();
+ copyNAR(conn->from, sink);
+}
+
+ref<FSAccessor> RemoteStore::getFSAccessor()
+{
+ return make_ref<RemoteFSAccessor>(ref<Store>(shared_from_this()));
+}
static Logger::Fields readFields(Source & from)
{
@@ -878,9 +922,10 @@ static Logger::Fields readFields(Source & from)
}
-std::exception_ptr RemoteStore::Connection::processStderr(Sink * sink, Source * source)
+std::exception_ptr RemoteStore::Connection::processStderr(Sink * sink, Source * source, bool flush)
{
- to.flush();
+ if (flush)
+ to.flush();
while (true) {
@@ -895,15 +940,19 @@ std::exception_ptr RemoteStore::Connection::processStderr(Sink * sink, Source *
else if (msg == STDERR_READ) {
if (!source) throw Error("no source");
size_t len = readNum<size_t>(from);
- auto buf = std::make_unique<unsigned char[]>(len);
- writeString(buf.get(), source->read(buf.get(), len), to);
+ auto buf = std::make_unique<char[]>(len);
+ writeString({(const char *) buf.get(), source->read(buf.get(), len)}, to);
to.flush();
}
else if (msg == STDERR_ERROR) {
- string error = readString(from);
- unsigned int status = readInt(from);
- return std::make_exception_ptr(Error(status, error));
+ if (GET_PROTOCOL_MINOR(daemonVersion) >= 26) {
+ return std::make_exception_ptr(readError(from));
+ } else {
+ string error = readString(from);
+ unsigned int status = readInt(from);
+ return std::make_exception_ptr(Error(status, error));
+ }
}
else if (msg == STDERR_NEXT)
@@ -941,14 +990,47 @@ std::exception_ptr RemoteStore::Connection::processStderr(Sink * sink, Source *
return nullptr;
}
-static std::string uriScheme = "unix://";
-
-static RegisterStoreImplementation regStore([](
- const std::string & uri, const Store::Params & params)
- -> std::shared_ptr<Store>
+void ConnectionHandle::withFramedSink(std::function<void(Sink &sink)> fun)
{
- if (std::string(uri, 0, uriScheme.size()) != uriScheme) return 0;
- return std::make_shared<UDSRemoteStore>(std::string(uri, uriScheme.size()), params);
-});
+ (*this)->to.flush();
+
+ std::exception_ptr ex;
+
+ /* Handle log messages / exceptions from the remote on a
+ separate thread. */
+ std::thread stderrThread([&]()
+ {
+ try {
+ processStderr(nullptr, nullptr, false);
+ } catch (...) {
+ ex = std::current_exception();
+ }
+ });
+
+ Finally joinStderrThread([&]()
+ {
+ if (stderrThread.joinable()) {
+ stderrThread.join();
+ if (ex) {
+ try {
+ std::rethrow_exception(ex);
+ } catch (...) {
+ ignoreException();
+ }
+ }
+ }
+ });
+
+ {
+ FramedSink sink((*this)->to, ex);
+ fun(sink);
+ sink.flush();
+ }
+
+ stderrThread.join();
+ if (ex)
+ std::rethrow_exception(ex);
+
+}
}