aboutsummaryrefslogtreecommitdiff
path: root/src/libstore/remote-store.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstore/remote-store.cc')
-rw-r--r--src/libstore/remote-store.cc449
1 files changed, 305 insertions, 144 deletions
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc
index 118aadf7e..23b1942ce 100644
--- a/src/libstore/remote-store.cc
+++ b/src/libstore/remote-store.cc
@@ -1,5 +1,6 @@
#include "serialise.hh"
#include "util.hh"
+#include "remote-fs-accessor.hh"
#include "remote-store.hh"
#include "worker-protocol.hh"
#include "archive.hh"
@@ -9,6 +10,7 @@
#include "pool.hh"
#include "finally.hh"
#include "logging.hh"
+#include "callback.hh"
#include <sys/types.h>
#include <sys/stat.h>
@@ -22,53 +24,82 @@
namespace nix {
+namespace worker_proto {
-template<> StorePathSet readStorePaths(const Store & store, Source & from)
+std::string read(const Store & store, Source & from, Phantom<std::string> _)
{
- StorePathSet paths;
- for (auto & i : readStrings<Strings>(from))
- paths.insert(store.parseStorePath(i));
- return paths;
+ return readString(from);
}
+void write(const Store & store, Sink & out, const std::string & str)
+{
+ out << str;
+}
-void writeStorePaths(const Store & store, Sink & out, const StorePathSet & paths)
+
+StorePath read(const Store & store, Source & from, Phantom<StorePath> _)
{
- out << paths.size();
- for (auto & i : paths)
- out << store.printStorePath(i);
+ return store.parseStorePath(readString(from));
}
-std::map<string, StorePath> readOutputPathMap(const Store & store, Source & from)
+void write(const Store & store, Sink & out, const StorePath & storePath)
{
- std::map<string, StorePath> pathMap;
- auto rawInput = readStrings<Strings>(from);
- if (rawInput.size() % 2)
- throw Error("got an odd number of elements from the daemon when trying to read a output path map");
- auto curInput = rawInput.begin();
- while (curInput != rawInput.end()) {
- auto thisKey = *curInput++;
- auto thisValue = *curInput++;
- pathMap.emplace(thisKey, store.parseStorePath(thisValue));
- }
- return pathMap;
+ out << store.printStorePath(storePath);
}
-void writeOutputPathMap(const Store & store, Sink & out, const std::map<string, StorePath> & pathMap)
+
+ContentAddress read(const Store & store, Source & from, Phantom<ContentAddress> _)
{
- out << 2*pathMap.size();
- for (auto & i : pathMap) {
- out << i.first;
- out << store.printStorePath(i.second);
- }
+ return parseContentAddress(readString(from));
}
+void write(const Store & store, Sink & out, const ContentAddress & ca)
+{
+ out << renderContentAddress(ca);
+}
+
+
+std::optional<StorePath> read(const Store & store, Source & from, Phantom<std::optional<StorePath>> _)
+{
+ auto s = readString(from);
+ return s == "" ? std::optional<StorePath> {} : store.parseStorePath(s);
+}
+
+void write(const Store & store, Sink & out, const std::optional<StorePath> & storePathOpt)
+{
+ out << (storePathOpt ? store.printStorePath(*storePathOpt) : "");
+}
+
+
+std::optional<ContentAddress> read(const Store & store, Source & from, Phantom<std::optional<ContentAddress>> _)
+{
+ return parseContentAddressOpt(readString(from));
+}
+
+void write(const Store & store, Sink & out, const std::optional<ContentAddress> & caOpt)
+{
+ out << (caOpt ? renderContentAddress(*caOpt) : "");
+}
+
+}
+
+
/* TODO: Separate these store impls into different files, give them better names */
RemoteStore::RemoteStore(const Params & params)
: Store(params)
+ , RemoteStoreConfig(params)
, connections(make_ref<Pool<Connection>>(
std::max(1, (int) maxConnections),
- [this]() { return openConnectionWrapper(); },
+ [this]() {
+ auto conn = openConnectionWrapper();
+ try {
+ initConnection(*conn);
+ } catch (...) {
+ failed = true;
+ throw;
+ }
+ return conn;
+ },
[this](const ref<Connection> & r) {
return
r->to.good()
@@ -95,19 +126,21 @@ ref<RemoteStore::Connection> RemoteStore::openConnectionWrapper()
UDSRemoteStore::UDSRemoteStore(const Params & params)
- : Store(params)
+ : StoreConfig(params)
+ , Store(params)
, LocalFSStore(params)
, RemoteStore(params)
{
}
-UDSRemoteStore::UDSRemoteStore(std::string socket_path, const Params & params)
- : Store(params)
- , LocalFSStore(params)
- , RemoteStore(params)
- , path(socket_path)
+UDSRemoteStore::UDSRemoteStore(
+ const std::string scheme,
+ std::string socket_path,
+ const Params & params)
+ : UDSRemoteStore(params)
{
+ path.emplace(socket_path);
}
@@ -151,8 +184,6 @@ ref<RemoteStore::Connection> UDSRemoteStore::openConnection()
conn->startTime = std::chrono::steady_clock::now();
- initConnection(*conn);
-
return conn;
}
@@ -260,14 +291,16 @@ struct ConnectionHandle
RemoteStore::Connection * operator -> () { return &*handle; }
- void processStderr(Sink * sink = 0, Source * source = 0)
+ void processStderr(Sink * sink = 0, Source * source = 0, bool flush = true)
{
- auto ex = handle->processStderr(sink, source);
+ auto ex = handle->processStderr(sink, source, flush);
if (ex) {
daemonException = true;
std::rethrow_exception(ex);
}
}
+
+ void withFramedSink(std::function<void(Sink & sink)> fun);
};
@@ -296,9 +329,9 @@ StorePathSet RemoteStore::queryValidPaths(const StorePathSet & paths, Substitute
return res;
} else {
conn->to << wopQueryValidPaths;
- writeStorePaths(*this, conn->to, paths);
+ worker_proto::write(*this, conn->to, paths);
conn.processStderr();
- return readStorePaths<StorePathSet>(*this, conn->from);
+ return worker_proto::read(*this, conn->from, Phantom<StorePathSet> {});
}
}
@@ -308,7 +341,7 @@ StorePathSet RemoteStore::queryAllValidPaths()
auto conn(getConnection());
conn->to << wopQueryAllValidPaths;
conn.processStderr();
- return readStorePaths<StorePathSet>(*this, conn->from);
+ return worker_proto::read(*this, conn->from, Phantom<StorePathSet> {});
}
@@ -325,41 +358,46 @@ StorePathSet RemoteStore::querySubstitutablePaths(const StorePathSet & paths)
return res;
} else {
conn->to << wopQuerySubstitutablePaths;
- writeStorePaths(*this, conn->to, paths);
+ worker_proto::write(*this, conn->to, paths);
conn.processStderr();
- return readStorePaths<StorePathSet>(*this, conn->from);
+ return worker_proto::read(*this, conn->from, Phantom<StorePathSet> {});
}
}
-void RemoteStore::querySubstitutablePathInfos(const StorePathSet & paths,
- SubstitutablePathInfos & infos)
+void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, SubstitutablePathInfos & infos)
{
- if (paths.empty()) return;
+ if (pathsMap.empty()) return;
auto conn(getConnection());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
- for (auto & i : paths) {
+ for (auto & i : pathsMap) {
SubstitutablePathInfo info;
- conn->to << wopQuerySubstitutablePathInfo << printStorePath(i);
+ conn->to << wopQuerySubstitutablePathInfo << printStorePath(i.first);
conn.processStderr();
unsigned int reply = readInt(conn->from);
if (reply == 0) continue;
auto deriver = readString(conn->from);
if (deriver != "")
info.deriver = parseStorePath(deriver);
- info.references = readStorePaths<StorePathSet>(*this, conn->from);
+ info.references = worker_proto::read(*this, conn->from, Phantom<StorePathSet> {});
info.downloadSize = readLongLong(conn->from);
info.narSize = readLongLong(conn->from);
- infos.insert_or_assign(i, std::move(info));
+ infos.insert_or_assign(i.first, std::move(info));
}
} else {
conn->to << wopQuerySubstitutablePathInfos;
- writeStorePaths(*this, conn->to, paths);
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 22) {
+ StorePathSet paths;
+ for (auto & path : pathsMap)
+ paths.insert(path.first);
+ worker_proto::write(*this, conn->to, paths);
+ } else
+ worker_proto::write(*this, conn->to, pathsMap);
conn.processStderr();
size_t count = readNum<size_t>(conn->from);
for (size_t n = 0; n < count; n++) {
@@ -367,7 +405,7 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathSet & paths,
auto deriver = readString(conn->from);
if (deriver != "")
info.deriver = parseStorePath(deriver);
- info.references = readStorePaths<StorePathSet>(*this, conn->from);
+ info.references = worker_proto::read(*this, conn->from, Phantom<StorePathSet> {});
info.downloadSize = readLongLong(conn->from);
info.narSize = readLongLong(conn->from);
}
@@ -376,11 +414,28 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathSet & paths,
}
+ref<const ValidPathInfo> RemoteStore::readValidPathInfo(ConnectionHandle & conn, const StorePath & path)
+{
+ auto deriver = readString(conn->from);
+ auto narHash = Hash::parseAny(readString(conn->from), htSHA256);
+ auto info = make_ref<ValidPathInfo>(path, narHash);
+ if (deriver != "") info->deriver = parseStorePath(deriver);
+ info->references = worker_proto::read(*this, conn->from, Phantom<StorePathSet> {});
+ conn->from >> info->registrationTime >> info->narSize;
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) {
+ conn->from >> info->ultimate;
+ info->sigs = readStrings<StringSet>(conn->from);
+ info->ca = parseContentAddressOpt(readString(conn->from));
+ }
+ return info;
+}
+
+
void RemoteStore::queryPathInfoUncached(const StorePath & path,
Callback<std::shared_ptr<const ValidPathInfo>> callback) noexcept
{
try {
- std::shared_ptr<ValidPathInfo> info;
+ std::shared_ptr<const ValidPathInfo> info;
{
auto conn(getConnection());
conn->to << wopQueryPathInfo << printStorePath(path);
@@ -396,17 +451,7 @@ void RemoteStore::queryPathInfoUncached(const StorePath & path,
bool valid; conn->from >> valid;
if (!valid) throw InvalidPath("path '%s' is not valid", printStorePath(path));
}
- info = std::make_shared<ValidPathInfo>(StorePath(path));
- auto deriver = readString(conn->from);
- if (deriver != "") info->deriver = parseStorePath(deriver);
- info->narHash = Hash(readString(conn->from), htSHA256);
- info->references = readStorePaths<StorePathSet>(*this, conn->from);
- conn->from >> info->registrationTime >> info->narSize;
- if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) {
- conn->from >> info->ultimate;
- info->sigs = readStrings<StringSet>(conn->from);
- info->ca = parseContentAddressOpt(readString(conn->from));
- }
+ info = readValidPathInfo(conn, path);
}
callback(std::move(info));
} catch (...) { callback.rethrow(); }
@@ -419,7 +464,7 @@ void RemoteStore::queryReferrers(const StorePath & path,
auto conn(getConnection());
conn->to << wopQueryReferrers << printStorePath(path);
conn.processStderr();
- for (auto & i : readStorePaths<StorePathSet>(*this, conn->from))
+ for (auto & i : worker_proto::read(*this, conn->from, Phantom<StorePathSet> {}))
referrers.insert(i);
}
@@ -429,7 +474,7 @@ StorePathSet RemoteStore::queryValidDerivers(const StorePath & path)
auto conn(getConnection());
conn->to << wopQueryValidDerivers << printStorePath(path);
conn.processStderr();
- return readStorePaths<StorePathSet>(*this, conn->from);
+ return worker_proto::read(*this, conn->from, Phantom<StorePathSet> {});
}
@@ -441,17 +486,32 @@ StorePathSet RemoteStore::queryDerivationOutputs(const StorePath & path)
}
conn->to << wopQueryDerivationOutputs << printStorePath(path);
conn.processStderr();
- return readStorePaths<StorePathSet>(*this, conn->from);
+ return worker_proto::read(*this, conn->from, Phantom<StorePathSet> {});
}
-OutputPathMap RemoteStore::queryDerivationOutputMap(const StorePath & path)
+std::map<std::string, std::optional<StorePath>> RemoteStore::queryPartialDerivationOutputMap(const StorePath & path)
{
- auto conn(getConnection());
- conn->to << wopQueryDerivationOutputMap << printStorePath(path);
- conn.processStderr();
- return readOutputPathMap(*this, conn->from);
-
+ if (GET_PROTOCOL_MINOR(getProtocol()) >= 0x16) {
+ auto conn(getConnection());
+ conn->to << wopQueryDerivationOutputMap << printStorePath(path);
+ conn.processStderr();
+ return worker_proto::read(*this, conn->from, Phantom<std::map<std::string, std::optional<StorePath>>> {});
+ } else {
+ // Fallback for old daemon versions.
+ // For floating-CA derivations (and their co-dependencies) this is an
+ // under-approximation as it only returns the paths that can be inferred
+ // from the derivation itself (and not the ones that are known because
+ // the have been built), but as old stores don't handle floating-CA
+ // derivations this shouldn't matter
+ auto derivation = readDerivation(path);
+ auto outputsWithOptPaths = derivation.outputsAndOptPaths(*this);
+ std::map<std::string, std::optional<StorePath>> ret;
+ for (auto & [outputName, outputAndPath] : outputsWithOptPaths) {
+ ret.emplace(outputName, outputAndPath.second);
+ }
+ return ret;
+ }
}
std::optional<StorePath> RemoteStore::queryPathFromHashPart(const std::string & hashPart)
@@ -465,8 +525,95 @@ std::optional<StorePath> RemoteStore::queryPathFromHashPart(const std::string &
}
+ref<const ValidPathInfo> RemoteStore::addCAToStore(
+ Source & dump,
+ const string & name,
+ ContentAddressMethod caMethod,
+ const StorePathSet & references,
+ RepairFlag repair)
+{
+ std::optional<ConnectionHandle> conn_(getConnection());
+ auto & conn = *conn_;
+
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 25) {
+
+ conn->to
+ << wopAddToStore
+ << name
+ << renderContentAddressMethod(caMethod);
+ worker_proto::write(*this, conn->to, references);
+ conn->to << repair;
+
+ conn.withFramedSink([&](Sink & sink) {
+ dump.drainInto(sink);
+ });
+
+ auto path = parseStorePath(readString(conn->from));
+ return readValidPathInfo(conn, path);
+ }
+ else {
+ if (repair) throw Error("repairing is not supported when building through the Nix daemon protocol < 1.25");
+
+ std::visit(overloaded {
+ [&](TextHashMethod thm) -> void {
+ std::string s = dump.drain();
+ conn->to << wopAddTextToStore << name << s;
+ worker_proto::write(*this, conn->to, references);
+ conn.processStderr();
+ },
+ [&](FixedOutputHashMethod fohm) -> void {
+ conn->to
+ << wopAddToStore
+ << name
+ << ((fohm.hashType == htSHA256 && fohm.fileIngestionMethod == FileIngestionMethod::Recursive) ? 0 : 1) /* backwards compatibility hack */
+ << (fohm.fileIngestionMethod == FileIngestionMethod::Recursive ? 1 : 0)
+ << printHashType(fohm.hashType);
+
+ try {
+ conn->to.written = 0;
+ conn->to.warn = true;
+ connections->incCapacity();
+ {
+ Finally cleanup([&]() { connections->decCapacity(); });
+ if (fohm.fileIngestionMethod == FileIngestionMethod::Recursive) {
+ dump.drainInto(conn->to);
+ } else {
+ std::string contents = dump.drain();
+ dumpString(contents, conn->to);
+ }
+ }
+ conn->to.warn = false;
+ conn.processStderr();
+ } catch (SysError & e) {
+ /* Daemon closed while we were sending the path. Probably OOM
+ or I/O error. */
+ if (e.errNo == EPIPE)
+ try {
+ conn.processStderr();
+ } catch (EndOfFile & e) { }
+ throw;
+ }
+
+ }
+ }, caMethod);
+ auto path = parseStorePath(readString(conn->from));
+ // Release our connection to prevent a deadlock in queryPathInfo().
+ conn_.reset();
+ return queryPathInfo(path);
+ }
+}
+
+
+StorePath RemoteStore::addToStoreFromDump(Source & dump, const string & name,
+ FileIngestionMethod method, HashType hashType, RepairFlag repair)
+{
+ StorePathSet references;
+ return addCAToStore(dump, name, FixedOutputHashMethod{ .fileIngestionMethod = method, .hashType = hashType }, references, repair)->path;
+}
+
+
void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
- RepairFlag repair, CheckSigsFlag checkSigs, std::shared_ptr<FSAccessor> accessor)
+ RepairFlag repair, CheckSigsFlag checkSigs)
{
auto conn(getConnection());
@@ -480,7 +627,7 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
sink
<< exportMagic
<< printStorePath(info.path);
- writeStorePaths(*this, sink, info.references);
+ worker_proto::write(*this, sink, info.references);
sink
<< (info.deriver ? printStorePath(*info.deriver) : "")
<< 0 // == no legacy signature
@@ -490,7 +637,7 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
conn.processStderr(0, source2.get());
- auto importedPaths = readStorePaths<StorePathSet>(*this, conn->from);
+ auto importedPaths = worker_proto::read(*this, conn->from, Phantom<StorePathSet> {});
assert(importedPaths.size() <= 1);
}
@@ -499,68 +646,30 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
<< printStorePath(info.path)
<< (info.deriver ? printStorePath(*info.deriver) : "")
<< info.narHash.to_string(Base16, false);
- writeStorePaths(*this, conn->to, info.references);
+ worker_proto::write(*this, conn->to, info.references);
conn->to << info.registrationTime << info.narSize
<< info.ultimate << info.sigs << renderContentAddress(info.ca)
<< repair << !checkSigs;
- bool tunnel = GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21;
- if (!tunnel) copyNAR(source, conn->to);
- conn.processStderr(0, tunnel ? &source : nullptr);
- }
-}
-
-StorePath RemoteStore::addToStore(const string & name, const Path & _srcPath,
- FileIngestionMethod method, HashType hashAlgo, PathFilter & filter, RepairFlag repair)
-{
- if (repair) throw Error("repairing is not supported when building through the Nix daemon");
-
- auto conn(getConnection());
-
- Path srcPath(absPath(_srcPath));
-
- conn->to
- << wopAddToStore
- << name
- << ((hashAlgo == htSHA256 && method == FileIngestionMethod::Recursive) ? 0 : 1) /* backwards compatibility hack */
- << (method == FileIngestionMethod::Recursive ? 1 : 0)
- << printHashType(hashAlgo);
-
- try {
- conn->to.written = 0;
- conn->to.warn = true;
- connections->incCapacity();
- {
- Finally cleanup([&]() { connections->decCapacity(); });
- dumpPath(srcPath, conn->to, filter);
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 23) {
+ conn.withFramedSink([&](Sink & sink) {
+ copyNAR(source, sink);
+ });
+ } else if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21) {
+ conn.processStderr(0, &source);
+ } else {
+ copyNAR(source, conn->to);
+ conn.processStderr(0, nullptr);
}
- conn->to.warn = false;
- conn.processStderr();
- } catch (SysError & e) {
- /* Daemon closed while we were sending the path. Probably OOM
- or I/O error. */
- if (e.errNo == EPIPE)
- try {
- conn.processStderr();
- } catch (EndOfFile & e) { }
- throw;
}
-
- return parseStorePath(readString(conn->from));
}
StorePath RemoteStore::addTextToStore(const string & name, const string & s,
const StorePathSet & references, RepairFlag repair)
{
- if (repair) throw Error("repairing is not supported when building through the Nix daemon");
-
- auto conn(getConnection());
- conn->to << wopAddTextToStore << name << s;
- writeStorePaths(*this, conn->to, references);
-
- conn.processStderr();
- return parseStorePath(readString(conn->from));
+ StringSource source(s);
+ return addCAToStore(source, name, TextHashMethod{}, references, repair)->path;
}
@@ -659,7 +768,7 @@ void RemoteStore::collectGarbage(const GCOptions & options, GCResults & results)
conn->to
<< wopCollectGarbage << options.action;
- writeStorePaths(*this, conn->to, options.pathsToDelete);
+ worker_proto::write(*this, conn->to, options.pathsToDelete);
conn->to << options.ignoreLiveness
<< options.maxFreed
/* removed options */
@@ -707,7 +816,7 @@ void RemoteStore::addSignatures(const StorePath & storePath, const StringSet & s
void RemoteStore::queryMissing(const std::vector<StorePathWithOutputs> & targets,
StorePathSet & willBuild, StorePathSet & willSubstitute, StorePathSet & unknown,
- unsigned long long & downloadSize, unsigned long long & narSize)
+ uint64_t & downloadSize, uint64_t & narSize)
{
{
auto conn(getConnection());
@@ -721,9 +830,9 @@ void RemoteStore::queryMissing(const std::vector<StorePathWithOutputs> & targets
ss.push_back(p.to_string(*this));
conn->to << ss;
conn.processStderr();
- willBuild = readStorePaths<StorePathSet>(*this, conn->from);
- willSubstitute = readStorePaths<StorePathSet>(*this, conn->from);
- unknown = readStorePaths<StorePathSet>(*this, conn->from);
+ willBuild = worker_proto::read(*this, conn->from, Phantom<StorePathSet> {});
+ willSubstitute = worker_proto::read(*this, conn->from, Phantom<StorePathSet> {});
+ unknown = worker_proto::read(*this, conn->from, Phantom<StorePathSet> {});
conn->from >> downloadSize >> narSize;
return;
}
@@ -762,6 +871,18 @@ RemoteStore::Connection::~Connection()
}
}
+void RemoteStore::narFromPath(const StorePath & path, Sink & sink)
+{
+ auto conn(connections->get());
+ conn->to << wopNarFromPath << printStorePath(path);
+ conn->processStderr();
+ copyNAR(conn->from, sink);
+}
+
+ref<FSAccessor> RemoteStore::getFSAccessor()
+{
+ return make_ref<RemoteFSAccessor>(ref<Store>(shared_from_this()));
+}
static Logger::Fields readFields(Source & from)
{
@@ -780,9 +901,10 @@ static Logger::Fields readFields(Source & from)
}
-std::exception_ptr RemoteStore::Connection::processStderr(Sink * sink, Source * source)
+std::exception_ptr RemoteStore::Connection::processStderr(Sink * sink, Source * source, bool flush)
{
- to.flush();
+ if (flush)
+ to.flush();
while (true) {
@@ -803,9 +925,13 @@ std::exception_ptr RemoteStore::Connection::processStderr(Sink * sink, Source *
}
else if (msg == STDERR_ERROR) {
- string error = readString(from);
- unsigned int status = readInt(from);
- return std::make_exception_ptr(Error(status, error));
+ if (GET_PROTOCOL_MINOR(daemonVersion) >= 26) {
+ return std::make_exception_ptr(readError(from));
+ } else {
+ string error = readString(from);
+ unsigned int status = readInt(from);
+ return std::make_exception_ptr(Error(status, error));
+ }
}
else if (msg == STDERR_NEXT)
@@ -843,14 +969,49 @@ std::exception_ptr RemoteStore::Connection::processStderr(Sink * sink, Source *
return nullptr;
}
-static std::string uriScheme = "unix://";
-
-static RegisterStoreImplementation regStore([](
- const std::string & uri, const Store::Params & params)
- -> std::shared_ptr<Store>
+void ConnectionHandle::withFramedSink(std::function<void(Sink &sink)> fun)
{
- if (std::string(uri, 0, uriScheme.size()) != uriScheme) return 0;
- return std::make_shared<UDSRemoteStore>(std::string(uri, uriScheme.size()), params);
-});
+ (*this)->to.flush();
+
+ std::exception_ptr ex;
+
+ /* Handle log messages / exceptions from the remote on a
+ separate thread. */
+ std::thread stderrThread([&]()
+ {
+ try {
+ processStderr(nullptr, nullptr, false);
+ } catch (...) {
+ ex = std::current_exception();
+ }
+ });
+
+ Finally joinStderrThread([&]()
+ {
+ if (stderrThread.joinable()) {
+ stderrThread.join();
+ if (ex) {
+ try {
+ std::rethrow_exception(ex);
+ } catch (...) {
+ ignoreException();
+ }
+ }
+ }
+ });
+
+ {
+ FramedSink sink((*this)->to, ex);
+ fun(sink);
+ sink.flush();
+ }
+
+ stderrThread.join();
+ if (ex)
+ std::rethrow_exception(ex);
+
+}
+
+static RegisterStoreImplementation<UDSRemoteStore, UDSRemoteStoreConfig> regUDSRemoteStore;
}