aboutsummaryrefslogtreecommitdiff
path: root/src/libstore/remote-store.cc
diff options
context:
space:
mode:
authorJohn Ericson <John.Ericson@Obsidian.Systems>2020-09-23 22:35:41 +0000
committerJohn Ericson <John.Ericson@Obsidian.Systems>2020-09-23 22:35:41 +0000
commit45ca7c3e4b92bbafbfa8e30513c9dd3cfe76e3f1 (patch)
tree687ae85bfed1558f05880dfdb8bc358520134c52 /src/libstore/remote-store.cc
parente61061c88e0dfcce9329ea9f0b041a35270dfa1a (diff)
parent8d9402f411643c451cf2a0776afcb3a1af0f9a8c (diff)
Merge remote-tracking branch 'upstream/master' into path-info
Diffstat (limited to 'src/libstore/remote-store.cc')
-rw-r--r--src/libstore/remote-store.cc422
1 files changed, 245 insertions, 177 deletions
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc
index 273455bae..831db0bb8 100644
--- a/src/libstore/remote-store.cc
+++ b/src/libstore/remote-store.cc
@@ -1,5 +1,6 @@
#include "serialise.hh"
#include "util.hh"
+#include "remote-fs-accessor.hh"
#include "remote-store.hh"
#include "worker-protocol.hh"
#include "archive.hh"
@@ -9,6 +10,7 @@
#include "pool.hh"
#include "finally.hh"
#include "logging.hh"
+#include "callback.hh"
#include <sys/types.h>
#include <sys/stat.h>
@@ -31,7 +33,6 @@ template<> StorePathSet readStorePaths(const Store & store, Source & from)
return paths;
}
-
void writeStorePaths(const Store & store, Sink & out, const StorePathSet & paths)
{
out << paths.size();
@@ -39,12 +40,16 @@ void writeStorePaths(const Store & store, Sink & out, const StorePathSet & paths
out << store.printStorePath(i);
}
+
StorePathCAMap readStorePathCAMap(const Store & store, Source & from)
{
StorePathCAMap paths;
auto count = readNum<size_t>(from);
- while (count--)
- paths.insert_or_assign(store.parseStorePath(readString(from)), parseContentAddressOpt(readString(from)));
+ while (count--) {
+ auto path = store.parseStorePath(readString(from));
+ auto ca = parseContentAddressOpt(readString(from));
+ paths.insert_or_assign(path, ca);
+ }
return paths;
}
@@ -57,36 +62,52 @@ void writeStorePathCAMap(const Store & store, Sink & out, const StorePathCAMap &
}
}
-std::map<string, StorePath> readOutputPathMap(const Store & store, Source & from)
+
+namespace worker_proto {
+
+StorePath read(const Store & store, Source & from, Phantom<StorePath> _)
{
- std::map<string, StorePath> pathMap;
- auto rawInput = readStrings<Strings>(from);
- if (rawInput.size() % 2)
- throw Error("got an odd number of elements from the daemon when trying to read a output path map");
- auto curInput = rawInput.begin();
- while (curInput != rawInput.end()) {
- auto thisKey = *curInput++;
- auto thisValue = *curInput++;
- pathMap.emplace(thisKey, store.parseStorePath(thisValue));
- }
- return pathMap;
+ return store.parseStorePath(readString(from));
}
-void writeOutputPathMap(const Store & store, Sink & out, const std::map<string, StorePath> & pathMap)
+void write(const Store & store, Sink & out, const StorePath & storePath)
{
- out << 2*pathMap.size();
- for (auto & i : pathMap) {
- out << i.first;
- out << store.printStorePath(i.second);
- }
+ out << store.printStorePath(storePath);
+}
+
+
+template<>
+std::optional<StorePath> read(const Store & store, Source & from, Phantom<std::optional<StorePath>> _)
+{
+ auto s = readString(from);
+ return s == "" ? std::optional<StorePath> {} : store.parseStorePath(s);
}
+template<>
+void write(const Store & store, Sink & out, const std::optional<StorePath> & storePathOpt)
+{
+ out << (storePathOpt ? store.printStorePath(*storePathOpt) : "");
+}
+
+}
+
+
/* TODO: Separate these store impls into different files, give them better names */
RemoteStore::RemoteStore(const Params & params)
: Store(params)
+ , RemoteStoreConfig(params)
, connections(make_ref<Pool<Connection>>(
std::max(1, (int) maxConnections),
- [this]() { return openConnectionWrapper(); },
+ [this]() {
+ auto conn = openConnectionWrapper();
+ try {
+ initConnection(*conn);
+ } catch (...) {
+ failed = true;
+ throw;
+ }
+ return conn;
+ },
[this](const ref<Connection> & r) {
return
r->to.good()
@@ -113,19 +134,21 @@ ref<RemoteStore::Connection> RemoteStore::openConnectionWrapper()
UDSRemoteStore::UDSRemoteStore(const Params & params)
- : Store(params)
+ : StoreConfig(params)
+ , Store(params)
, LocalFSStore(params)
, RemoteStore(params)
{
}
-UDSRemoteStore::UDSRemoteStore(std::string socket_path, const Params & params)
- : Store(params)
- , LocalFSStore(params)
- , RemoteStore(params)
- , path(socket_path)
+UDSRemoteStore::UDSRemoteStore(
+ const std::string scheme,
+ std::string socket_path,
+ const Params & params)
+ : UDSRemoteStore(params)
{
+ path.emplace(socket_path);
}
@@ -169,8 +192,6 @@ ref<RemoteStore::Connection> UDSRemoteStore::openConnection()
conn->startTime = std::chrono::steady_clock::now();
- initConnection(*conn);
-
return conn;
}
@@ -278,14 +299,16 @@ struct ConnectionHandle
RemoteStore::Connection * operator -> () { return &*handle; }
- void processStderr(Sink * sink = 0, Source * source = 0)
+ void processStderr(Sink * sink = 0, Source * source = 0, bool flush = true)
{
- auto ex = handle->processStderr(sink, source);
+ auto ex = handle->processStderr(sink, source, flush);
if (ex) {
daemonException = true;
std::rethrow_exception(ex);
}
}
+
+ void withFramedSink(std::function<void(Sink & sink)> fun);
};
@@ -400,11 +423,28 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S
}
+ref<const ValidPathInfo> RemoteStore::readValidPathInfo(ConnectionHandle & conn, const StorePath & path)
+{
+ auto deriver = readString(conn->from);
+ auto narHash = Hash::parseAny(readString(conn->from), htSHA256);
+ auto info = make_ref<ValidPathInfo>(path, narHash);
+ if (deriver != "") info->deriver = parseStorePath(deriver);
+ info->setReferencesPossiblyToSelf(readStorePaths<StorePathSet>(*this, conn->from));
+ conn->from >> info->registrationTime >> info->narSize;
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) {
+ conn->from >> info->ultimate;
+ info->sigs = readStrings<StringSet>(conn->from);
+ info->ca = parseContentAddressOpt(readString(conn->from));
+ }
+ return info;
+}
+
+
void RemoteStore::queryPathInfoUncached(const StorePath & path,
Callback<std::shared_ptr<const ValidPathInfo>> callback) noexcept
{
try {
- std::shared_ptr<ValidPathInfo> info;
+ std::shared_ptr<const ValidPathInfo> info;
{
auto conn(getConnection());
conn->to << wopQueryPathInfo << printStorePath(path);
@@ -420,17 +460,7 @@ void RemoteStore::queryPathInfoUncached(const StorePath & path,
bool valid; conn->from >> valid;
if (!valid) throw InvalidPath("path '%s' is not valid", printStorePath(path));
}
- info = std::make_shared<ValidPathInfo>(StorePath(path));
- auto deriver = readString(conn->from);
- if (deriver != "") info->deriver = parseStorePath(deriver);
- info->narHash = Hash(readString(conn->from), htSHA256);
- info->setReferencesPossiblyToSelf(readStorePaths<StorePathSet>(*this, conn->from));
- conn->from >> info->registrationTime >> info->narSize;
- if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) {
- conn->from >> info->ultimate;
- info->sigs = readStrings<StringSet>(conn->from);
- info->ca = parseContentAddressOpt(readString(conn->from));
- }
+ info = readValidPathInfo(conn, path);
}
callback(std::move(info));
} catch (...) { callback.rethrow(); }
@@ -469,12 +499,28 @@ StorePathSet RemoteStore::queryDerivationOutputs(const StorePath & path)
}
-OutputPathMap RemoteStore::queryDerivationOutputMap(const StorePath & path)
+std::map<std::string, std::optional<StorePath>> RemoteStore::queryPartialDerivationOutputMap(const StorePath & path)
{
- auto conn(getConnection());
- conn->to << wopQueryDerivationOutputMap << printStorePath(path);
- conn.processStderr();
- return readOutputPathMap(*this, conn->from);
+ if (GET_PROTOCOL_MINOR(getProtocol()) >= 0x16) {
+ auto conn(getConnection());
+ conn->to << wopQueryDerivationOutputMap << printStorePath(path);
+ conn.processStderr();
+ return worker_proto::read(*this, conn->from, Phantom<std::map<std::string, std::optional<StorePath>>> {});
+ } else {
+ // Fallback for old daemon versions.
+ // For floating-CA derivations (and their co-dependencies) this is an
+ // under-approximation as it only returns the paths that can be inferred
+ // from the derivation itself (and not the ones that are known because
+ // the have been built), but as old stores don't handle floating-CA
+ // derivations this shouldn't matter
+ auto derivation = readDerivation(path);
+ auto outputsWithOptPaths = derivation.outputsAndOptPaths(*this);
+ std::map<std::string, std::optional<StorePath>> ret;
+ for (auto & [outputName, outputAndPath] : outputsWithOptPaths) {
+ ret.emplace(outputName, outputAndPath.second);
+ }
+ return ret;
+ }
}
@@ -489,6 +535,93 @@ std::optional<StorePath> RemoteStore::queryPathFromHashPart(const std::string &
}
+ref<const ValidPathInfo> RemoteStore::addCAToStore(
+ Source & dump,
+ const string & name,
+ ContentAddressMethod caMethod,
+ const StorePathSet & references,
+ RepairFlag repair)
+{
+ std::optional<ConnectionHandle> conn_(getConnection());
+ auto & conn = *conn_;
+
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 25) {
+
+ conn->to
+ << wopAddToStore
+ << name
+ << renderContentAddressMethod(caMethod);
+ writeStorePaths(*this, conn->to, references);
+ conn->to << repair;
+
+ conn.withFramedSink([&](Sink & sink) {
+ dump.drainInto(sink);
+ });
+
+ auto path = parseStorePath(readString(conn->from));
+ return readValidPathInfo(conn, path);
+ }
+ else {
+ if (repair) throw Error("repairing is not supported when building through the Nix daemon protocol < 1.25");
+
+ std::visit(overloaded {
+ [&](TextHashMethod thm) -> void {
+ std::string s = dump.drain();
+ conn->to << wopAddTextToStore << name << s;
+ writeStorePaths(*this, conn->to, references);
+ conn.processStderr();
+ },
+ [&](FixedOutputHashMethod fohm) -> void {
+ conn->to
+ << wopAddToStore
+ << name
+ << ((fohm.hashType == htSHA256 && fohm.fileIngestionMethod == FileIngestionMethod::Recursive) ? 0 : 1) /* backwards compatibility hack */
+ << (fohm.fileIngestionMethod == FileIngestionMethod::Recursive ? 1 : 0)
+ << printHashType(fohm.hashType);
+
+ try {
+ conn->to.written = 0;
+ conn->to.warn = true;
+ connections->incCapacity();
+ {
+ Finally cleanup([&]() { connections->decCapacity(); });
+ if (fohm.fileIngestionMethod == FileIngestionMethod::Recursive) {
+ dump.drainInto(conn->to);
+ } else {
+ std::string contents = dump.drain();
+ dumpString(contents, conn->to);
+ }
+ }
+ conn->to.warn = false;
+ conn.processStderr();
+ } catch (SysError & e) {
+ /* Daemon closed while we were sending the path. Probably OOM
+ or I/O error. */
+ if (e.errNo == EPIPE)
+ try {
+ conn.processStderr();
+ } catch (EndOfFile & e) { }
+ throw;
+ }
+
+ }
+ }, caMethod);
+ auto path = parseStorePath(readString(conn->from));
+ // Release our connection to prevent a deadlock in queryPathInfo().
+ conn_.reset();
+ return queryPathInfo(path);
+ }
+}
+
+
+StorePath RemoteStore::addToStoreFromDump(Source & dump, const string & name,
+ FileIngestionMethod method, HashType hashType, RepairFlag repair)
+{
+ StorePathSet references;
+ return addCAToStore(dump, name, FixedOutputHashMethod{ .fileIngestionMethod = method, .hashType = hashType }, references, repair)->path;
+}
+
+
void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
RepairFlag repair, CheckSigsFlag checkSigs)
{
@@ -522,83 +655,16 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
conn->to << wopAddToStoreNar
<< printStorePath(info.path)
<< (info.deriver ? printStorePath(*info.deriver) : "")
- << info.narHash->to_string(Base16, false);
+ << info.narHash.to_string(Base16, false);
writeStorePaths(*this, conn->to, info.referencesPossiblyToSelf());
conn->to << info.registrationTime << info.narSize
<< info.ultimate << info.sigs << renderContentAddress(info.ca)
<< repair << !checkSigs;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 23) {
-
- std::exception_ptr ex;
-
- struct FramedSink : BufferedSink
- {
- ConnectionHandle & conn;
- std::exception_ptr & ex;
-
- FramedSink(ConnectionHandle & conn, std::exception_ptr & ex) : conn(conn), ex(ex)
- { }
-
- ~FramedSink()
- {
- try {
- conn->to << 0;
- conn->to.flush();
- } catch (...) {
- ignoreException();
- }
- }
-
- void write(const unsigned char * data, size_t len) override
- {
- /* Don't send more data if the remote has
- encountered an error. */
- if (ex) {
- auto ex2 = ex;
- ex = nullptr;
- std::rethrow_exception(ex2);
- }
- conn->to << len;
- conn->to(data, len);
- };
- };
-
- /* Handle log messages / exceptions from the remote on a
- separate thread. */
- std::thread stderrThread([&]()
- {
- try {
- conn.processStderr();
- } catch (...) {
- ex = std::current_exception();
- }
- });
-
- Finally joinStderrThread([&]()
- {
- if (stderrThread.joinable()) {
- stderrThread.join();
- if (ex) {
- try {
- std::rethrow_exception(ex);
- } catch (...) {
- ignoreException();
- }
- }
- }
- });
-
- {
- FramedSink sink(conn, ex);
+ conn.withFramedSink([&](Sink & sink) {
copyNAR(source, sink);
- sink.flush();
- }
-
- stderrThread.join();
- if (ex)
- std::rethrow_exception(ex);
-
+ });
} else if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21) {
conn.processStderr(0, &source);
} else {
@@ -609,57 +675,11 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
}
-StorePath RemoteStore::addToStore(const string & name, const Path & _srcPath,
- FileIngestionMethod method, HashType hashAlgo, PathFilter & filter, RepairFlag repair)
-{
- if (repair) throw Error("repairing is not supported when building through the Nix daemon");
-
- auto conn(getConnection());
-
- Path srcPath(absPath(_srcPath));
-
- conn->to
- << wopAddToStore
- << name
- << ((hashAlgo == htSHA256 && method == FileIngestionMethod::Recursive) ? 0 : 1) /* backwards compatibility hack */
- << (method == FileIngestionMethod::Recursive ? 1 : 0)
- << printHashType(hashAlgo);
-
- try {
- conn->to.written = 0;
- conn->to.warn = true;
- connections->incCapacity();
- {
- Finally cleanup([&]() { connections->decCapacity(); });
- dumpPath(srcPath, conn->to, filter);
- }
- conn->to.warn = false;
- conn.processStderr();
- } catch (SysError & e) {
- /* Daemon closed while we were sending the path. Probably OOM
- or I/O error. */
- if (e.errNo == EPIPE)
- try {
- conn.processStderr();
- } catch (EndOfFile & e) { }
- throw;
- }
-
- return parseStorePath(readString(conn->from));
-}
-
-
StorePath RemoteStore::addTextToStore(const string & name, const string & s,
const StorePathSet & references, RepairFlag repair)
{
- if (repair) throw Error("repairing is not supported when building through the Nix daemon");
-
- auto conn(getConnection());
- conn->to << wopAddTextToStore << name << s;
- writeStorePaths(*this, conn->to, references);
-
- conn.processStderr();
- return parseStorePath(readString(conn->from));
+ StringSource source(s);
+ return addCAToStore(source, name, TextHashMethod{}, references, repair)->path;
}
@@ -861,6 +881,18 @@ RemoteStore::Connection::~Connection()
}
}
+void RemoteStore::narFromPath(const StorePath & path, Sink & sink)
+{
+ auto conn(connections->get());
+ conn->to << wopNarFromPath << printStorePath(path);
+ conn->processStderr();
+ copyNAR(conn->from, sink);
+}
+
+ref<FSAccessor> RemoteStore::getFSAccessor()
+{
+ return make_ref<RemoteFSAccessor>(ref<Store>(shared_from_this()));
+}
static Logger::Fields readFields(Source & from)
{
@@ -879,9 +911,10 @@ static Logger::Fields readFields(Source & from)
}
-std::exception_ptr RemoteStore::Connection::processStderr(Sink * sink, Source * source)
+std::exception_ptr RemoteStore::Connection::processStderr(Sink * sink, Source * source, bool flush)
{
- to.flush();
+ if (flush)
+ to.flush();
while (true) {
@@ -942,14 +975,49 @@ std::exception_ptr RemoteStore::Connection::processStderr(Sink * sink, Source *
return nullptr;
}
-static std::string uriScheme = "unix://";
-
-static RegisterStoreImplementation regStore([](
- const std::string & uri, const Store::Params & params)
- -> std::shared_ptr<Store>
+void ConnectionHandle::withFramedSink(std::function<void(Sink &sink)> fun)
{
- if (std::string(uri, 0, uriScheme.size()) != uriScheme) return 0;
- return std::make_shared<UDSRemoteStore>(std::string(uri, uriScheme.size()), params);
-});
+ (*this)->to.flush();
+
+ std::exception_ptr ex;
+
+ /* Handle log messages / exceptions from the remote on a
+ separate thread. */
+ std::thread stderrThread([&]()
+ {
+ try {
+ processStderr(nullptr, nullptr, false);
+ } catch (...) {
+ ex = std::current_exception();
+ }
+ });
+
+ Finally joinStderrThread([&]()
+ {
+ if (stderrThread.joinable()) {
+ stderrThread.join();
+ if (ex) {
+ try {
+ std::rethrow_exception(ex);
+ } catch (...) {
+ ignoreException();
+ }
+ }
+ }
+ });
+
+ {
+ FramedSink sink((*this)->to, ex);
+ fun(sink);
+ sink.flush();
+ }
+
+ stderrThread.join();
+ if (ex)
+ std::rethrow_exception(ex);
+
+}
+
+static RegisterStoreImplementation<UDSRemoteStore, UDSRemoteStoreConfig> regStore;
}