aboutsummaryrefslogtreecommitdiff
path: root/src/libstore/daemon.cc
diff options
context:
space:
mode:
authorJohn Ericson <John.Ericson@Obsidian.Systems>2023-04-17 13:40:46 -0400
committerJohn Ericson <John.Ericson@Obsidian.Systems>2023-06-19 12:08:23 -0400
commit9f69b7dee9fc6035b8aa0cc718f5e74af460d9aa (patch)
tree167c44235e63dd0ed73b7ee3497ee04ecccfda86 /src/libstore/daemon.cc
parent4e8b495ad7dddabc35bf9d6afe3573426ffed15d (diff)
Create `worker_proto::{Read,Write}Conn`
Pass this around instead of `Source &` and `Sink &` directly. This will give us something to put the protocol version on once the time comes. To do this ergonomically, we need to expose `RemoteStore::Connection`, so do that too. Give it some more API docs while we are at it.
Diffstat (limited to 'src/libstore/daemon.cc')
-rw-r--r--src/libstore/daemon.cc64
1 files changed, 34 insertions, 30 deletions
diff --git a/src/libstore/daemon.cc b/src/libstore/daemon.cc
index 7eba1a79d..75c3d2aca 100644
--- a/src/libstore/daemon.cc
+++ b/src/libstore/daemon.cc
@@ -260,13 +260,13 @@ struct ClientSettings
}
};
-static std::vector<DerivedPath> readDerivedPaths(Store & store, unsigned int clientVersion, Source & from)
+static std::vector<DerivedPath> readDerivedPaths(Store & store, unsigned int clientVersion, WorkerProto::ReadConn conn)
{
std::vector<DerivedPath> reqs;
if (GET_PROTOCOL_MINOR(clientVersion) >= 30) {
- reqs = WorkerProto::Serialise<std::vector<DerivedPath>>::read(store, from);
+ reqs = WorkerProto::Serialise<std::vector<DerivedPath>>::read(store, conn);
} else {
- for (auto & s : readStrings<Strings>(from))
+ for (auto & s : readStrings<Strings>(conn.from))
reqs.push_back(parsePathWithOutputs(store, s).toDerivedPath());
}
return reqs;
@@ -276,6 +276,9 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
TrustedFlag trusted, RecursiveFlag recursive, unsigned int clientVersion,
Source & from, BufferedSink & to, WorkerProto::Op op)
{
+ WorkerProto::ReadConn rconn { .from = from };
+ WorkerProto::WriteConn wconn { .to = to };
+
switch (op) {
case WorkerProto::Op::IsValidPath: {
@@ -288,7 +291,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
}
case WorkerProto::Op::QueryValidPaths: {
- auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, from);
+ auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
SubstituteFlag substitute = NoSubstitute;
if (GET_PROTOCOL_MINOR(clientVersion) >= 27) {
@@ -301,7 +304,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
}
auto res = store->queryValidPaths(paths, substitute);
logger->stopWork();
- WorkerProto::write(*store, to, res);
+ WorkerProto::write(*store, wconn, res);
break;
}
@@ -317,11 +320,11 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
}
case WorkerProto::Op::QuerySubstitutablePaths: {
- auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, from);
+ auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
logger->startWork();
auto res = store->querySubstitutablePaths(paths);
logger->stopWork();
- WorkerProto::write(*store, to, res);
+ WorkerProto::write(*store, wconn, res);
break;
}
@@ -350,7 +353,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
paths = store->queryValidDerivers(path);
else paths = store->queryDerivationOutputs(path);
logger->stopWork();
- WorkerProto::write(*store, to, paths);
+ WorkerProto::write(*store, wconn, paths);
break;
}
@@ -368,7 +371,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
logger->startWork();
auto outputs = store->queryPartialDerivationOutputMap(path);
logger->stopWork();
- WorkerProto::write(*store, to, outputs);
+ WorkerProto::write(*store, wconn, outputs);
break;
}
@@ -394,7 +397,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
if (GET_PROTOCOL_MINOR(clientVersion) >= 25) {
auto name = readString(from);
auto camStr = readString(from);
- auto refs = WorkerProto::Serialise<StorePathSet>::read(*store, from);
+ auto refs = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
bool repairBool;
from >> repairBool;
auto repair = RepairFlag{repairBool};
@@ -496,7 +499,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
case WorkerProto::Op::AddTextToStore: {
std::string suffix = readString(from);
std::string s = readString(from);
- auto refs = WorkerProto::Serialise<StorePathSet>::read(*store, from);
+ auto refs = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
logger->startWork();
auto path = store->addTextToStore(suffix, s, refs, NoRepair);
logger->stopWork();
@@ -528,7 +531,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
}
case WorkerProto::Op::BuildPaths: {
- auto drvs = readDerivedPaths(*store, clientVersion, from);
+ auto drvs = readDerivedPaths(*store, clientVersion, rconn);
BuildMode mode = bmNormal;
if (GET_PROTOCOL_MINOR(clientVersion) >= 15) {
mode = (BuildMode) readInt(from);
@@ -553,7 +556,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
}
case WorkerProto::Op::BuildPathsWithResults: {
- auto drvs = readDerivedPaths(*store, clientVersion, from);
+ auto drvs = readDerivedPaths(*store, clientVersion, rconn);
BuildMode mode = bmNormal;
mode = (BuildMode) readInt(from);
@@ -568,7 +571,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
auto results = store->buildPathsWithResults(drvs, mode);
logger->stopWork();
- WorkerProto::write(*store, to, results);
+ WorkerProto::write(*store, wconn, results);
break;
}
@@ -645,7 +648,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
DrvOutputs builtOutputs;
for (auto & [output, realisation] : res.builtOutputs)
builtOutputs.insert_or_assign(realisation.id, realisation);
- WorkerProto::write(*store, to, builtOutputs);
+ WorkerProto::write(*store, wconn, builtOutputs);
}
break;
}
@@ -710,7 +713,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
case WorkerProto::Op::CollectGarbage: {
GCOptions options;
options.action = (GCOptions::GCAction) readInt(from);
- options.pathsToDelete = WorkerProto::Serialise<StorePathSet>::read(*store, from);
+ options.pathsToDelete = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
from >> options.ignoreLiveness >> options.maxFreed;
// obsolete fields
readInt(from);
@@ -780,7 +783,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
else {
to << 1
<< (i->second.deriver ? store->printStorePath(*i->second.deriver) : "");
- WorkerProto::write(*store, to, i->second.references);
+ WorkerProto::write(*store, wconn, i->second.references);
to << i->second.downloadSize
<< i->second.narSize;
}
@@ -791,11 +794,11 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
SubstitutablePathInfos infos;
StorePathCAMap pathsMap = {};
if (GET_PROTOCOL_MINOR(clientVersion) < 22) {
- auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, from);
+ auto paths = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
for (auto & path : paths)
pathsMap.emplace(path, std::nullopt);
} else
- pathsMap = WorkerProto::Serialise<StorePathCAMap>::read(*store, from);
+ pathsMap = WorkerProto::Serialise<StorePathCAMap>::read(*store, rconn);
logger->startWork();
store->querySubstitutablePathInfos(pathsMap, infos);
logger->stopWork();
@@ -803,7 +806,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
for (auto & i : infos) {
to << store->printStorePath(i.first)
<< (i.second.deriver ? store->printStorePath(*i.second.deriver) : "");
- WorkerProto::write(*store, to, i.second.references);
+ WorkerProto::write(*store, wconn, i.second.references);
to << i.second.downloadSize << i.second.narSize;
}
break;
@@ -813,7 +816,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
logger->startWork();
auto paths = store->queryAllValidPaths();
logger->stopWork();
- WorkerProto::write(*store, to, paths);
+ WorkerProto::write(*store, wconn, paths);
break;
}
@@ -885,7 +888,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
ValidPathInfo info { path, narHash };
if (deriver != "")
info.deriver = store->parseStorePath(deriver);
- info.references = WorkerProto::Serialise<StorePathSet>::read(*store, from);
+ info.references = WorkerProto::Serialise<StorePathSet>::read(*store, rconn);
from >> info.registrationTime >> info.narSize >> info.ultimate;
info.sigs = readStrings<StringSet>(from);
info.ca = ContentAddress::parseOpt(readString(from));
@@ -930,15 +933,15 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
}
case WorkerProto::Op::QueryMissing: {
- auto targets = readDerivedPaths(*store, clientVersion, from);
+ auto targets = readDerivedPaths(*store, clientVersion, rconn);
logger->startWork();
StorePathSet willBuild, willSubstitute, unknown;
uint64_t downloadSize, narSize;
store->queryMissing(targets, willBuild, willSubstitute, unknown, downloadSize, narSize);
logger->stopWork();
- WorkerProto::write(*store, to, willBuild);
- WorkerProto::write(*store, to, willSubstitute);
- WorkerProto::write(*store, to, unknown);
+ WorkerProto::write(*store, wconn, willBuild);
+ WorkerProto::write(*store, wconn, willSubstitute);
+ WorkerProto::write(*store, wconn, unknown);
to << downloadSize << narSize;
break;
}
@@ -951,7 +954,7 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
store->registerDrvOutput(Realisation{
.id = outputId, .outPath = outputPath});
} else {
- auto realisation = WorkerProto::Serialise<Realisation>::read(*store, from);
+ auto realisation = WorkerProto::Serialise<Realisation>::read(*store, rconn);
store->registerDrvOutput(realisation);
}
logger->stopWork();
@@ -966,11 +969,11 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
if (GET_PROTOCOL_MINOR(clientVersion) < 31) {
std::set<StorePath> outPaths;
if (info) outPaths.insert(info->outPath);
- WorkerProto::write(*store, to, outPaths);
+ WorkerProto::write(*store, wconn, outPaths);
} else {
std::set<Realisation> realisations;
if (info) realisations.insert(*info);
- WorkerProto::write(*store, to, realisations);
+ WorkerProto::write(*store, wconn, realisations);
}
break;
}
@@ -1050,7 +1053,8 @@ void processConnection(
auto temp = trusted
? store->isTrustedClient()
: std::optional { NotTrusted };
- WorkerProto::write(*store, to, temp);
+ WorkerProto::WriteConn wconn { .to = to };
+ WorkerProto::write(*store, wconn, temp);
}
/* Send startup error messages to the client. */