aboutsummaryrefslogtreecommitdiff
path: root/src/libstore/remote-store.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstore/remote-store.cc')
-rw-r--r--src/libstore/remote-store.cc138
1 files changed, 125 insertions, 13 deletions
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc
index 3b3cdbc02..1200ab200 100644
--- a/src/libstore/remote-store.cc
+++ b/src/libstore/remote-store.cc
@@ -39,13 +39,45 @@ void writeStorePaths(const Store & store, Sink & out, const StorePathSet & paths
out << store.printStorePath(i);
}
-
StorePath read(const Store & store, Source & from, Proxy<StorePath> _)
{
auto path = readString(from);
return store.parseStorePath(path);
}
+StorePathCAMap readStorePathCAMap(const Store & store, Source & from)
+{
+ StorePathCAMap paths;
+ auto count = readNum<size_t>(from);
+ while (count--)
+ paths.insert_or_assign(store.parseStorePath(readString(from)), parseContentAddressOpt(readString(from)));
+ return paths;
+}
+
+void writeStorePathCAMap(const Store & store, Sink & out, const StorePathCAMap & paths)
+{
+ out << paths.size();
+ for (auto & i : paths) {
+ out << store.printStorePath(i.first);
+ out << renderContentAddress(i.second);
+ }
+}
+
+std::map<string, StorePath> readOutputPathMap(const Store & store, Source & from)
+{
+ std::map<string, StorePath> pathMap;
+ auto rawInput = readStrings<Strings>(from);
+ if (rawInput.size() % 2)
+ throw Error("got an odd number of elements from the daemon when trying to read a output path map");
+ auto curInput = rawInput.begin();
+ while (curInput != rawInput.end()) {
+ auto thisKey = *curInput++;
+ auto thisValue = *curInput++;
+ pathMap.emplace(thisKey, store.parseStorePath(thisValue));
+ }
+ return pathMap;
+}
+
void write(const Store & store, Sink & out, const StorePath & storePath)
{
@@ -323,18 +355,17 @@ StorePathSet RemoteStore::querySubstitutablePaths(const StorePathSet & paths)
}
-void RemoteStore::querySubstitutablePathInfos(const StorePathSet & paths,
- SubstitutablePathInfos & infos)
+void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, SubstitutablePathInfos & infos)
{
- if (paths.empty()) return;
+ if (pathsMap.empty()) return;
auto conn(getConnection());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
- for (auto & i : paths) {
+ for (auto & i : pathsMap) {
SubstitutablePathInfo info;
- conn->to << wopQuerySubstitutablePathInfo << printStorePath(i);
+ conn->to << wopQuerySubstitutablePathInfo << printStorePath(i.first);
conn.processStderr();
unsigned int reply = readInt(conn->from);
if (reply == 0) continue;
@@ -344,13 +375,19 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathSet & paths,
info.references = readStorePaths<StorePathSet>(*this, conn->from);
info.downloadSize = readLongLong(conn->from);
info.narSize = readLongLong(conn->from);
- infos.insert_or_assign(i, std::move(info));
+ infos.insert_or_assign(i.first, std::move(info));
}
} else {
conn->to << wopQuerySubstitutablePathInfos;
- writeStorePaths(*this, conn->to, paths);
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 22) {
+ StorePathSet paths;
+ for (auto & path : pathsMap)
+ paths.insert(path.first);
+ writeStorePaths(*this, conn->to, paths);
+ } else
+ writeStorePathCAMap(*this, conn->to, pathsMap);
conn.processStderr();
size_t count = readNum<size_t>(conn->from);
for (size_t n = 0; n < count; n++) {
@@ -489,14 +526,89 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
conn->to << wopAddToStoreNar
<< printStorePath(info.path)
<< (info.deriver ? printStorePath(*info.deriver) : "")
- << info.narHash.to_string(Base16, false);
+ << info.narHash->to_string(Base16, false);
writeStorePaths(*this, conn->to, info.references);
conn->to << info.registrationTime << info.narSize
<< info.ultimate << info.sigs << renderContentAddress(info.ca)
<< repair << !checkSigs;
- bool tunnel = GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21;
- if (!tunnel) copyNAR(source, conn->to);
- conn.processStderr(0, tunnel ? &source : nullptr);
+
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 23) {
+
+ std::exception_ptr ex;
+
+ struct FramedSink : BufferedSink
+ {
+ ConnectionHandle & conn;
+ std::exception_ptr & ex;
+
+ FramedSink(ConnectionHandle & conn, std::exception_ptr & ex) : conn(conn), ex(ex)
+ { }
+
+ ~FramedSink()
+ {
+ try {
+ conn->to << 0;
+ conn->to.flush();
+ } catch (...) {
+ ignoreException();
+ }
+ }
+
+ void write(const unsigned char * data, size_t len) override
+ {
+ /* Don't send more data if the remote has
+ encountered an error. */
+ if (ex) {
+ auto ex2 = ex;
+ ex = nullptr;
+ std::rethrow_exception(ex2);
+ }
+ conn->to << len;
+ conn->to(data, len);
+ };
+ };
+
+ /* Handle log messages / exceptions from the remote on a
+ separate thread. */
+ std::thread stderrThread([&]()
+ {
+ try {
+ conn.processStderr();
+ } catch (...) {
+ ex = std::current_exception();
+ }
+ });
+
+ Finally joinStderrThread([&]()
+ {
+ if (stderrThread.joinable()) {
+ stderrThread.join();
+ if (ex) {
+ try {
+ std::rethrow_exception(ex);
+ } catch (...) {
+ ignoreException();
+ }
+ }
+ }
+ });
+
+ {
+ FramedSink sink(conn, ex);
+ copyNAR(source, sink);
+ sink.flush();
+ }
+
+ stderrThread.join();
+ if (ex)
+ std::rethrow_exception(ex);
+
+ } else if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21) {
+ conn.processStderr(0, &source);
+ } else {
+ copyNAR(source, conn->to);
+ conn.processStderr(0, nullptr);
+ }
}
}
@@ -698,7 +810,7 @@ void RemoteStore::addSignatures(const StorePath & storePath, const StringSet & s
void RemoteStore::queryMissing(const std::vector<StorePathWithOutputs> & targets,
StorePathSet & willBuild, StorePathSet & willSubstitute, StorePathSet & unknown,
- unsigned long long & downloadSize, unsigned long long & narSize)
+ uint64_t & downloadSize, uint64_t & narSize)
{
{
auto conn(getConnection());