aboutsummaryrefslogtreecommitdiff
path: root/src/libstore
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstore')
-rw-r--r--src/libstore/remote-store.cc29
-rw-r--r--src/libstore/store-api.cc80
2 files changed, 62 insertions, 47 deletions
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc
index 56b6093bc..1b0524316 100644
--- a/src/libstore/remote-store.cc
+++ b/src/libstore/remote-store.cc
@@ -485,17 +485,26 @@ void RemoteStore::addMultipleToStore(
{
auto remoteVersion = getProtocol();
- auto source = sinkToSource([&](Sink & sink) {
- sink << pathsToCopy.size();
- for (auto & [pathInfo, pathSource] : pathsToCopy) {
- sink << WorkerProto::Serialise<ValidPathInfo>::write(*this,
- WorkerProto::WriteConn {remoteVersion},
- pathInfo);
- pathSource->drainInto(sink);
- }
- });
+ GeneratorSource source{
+ [](auto self, auto & pathsToCopy, auto remoteVersion) -> WireFormatGenerator {
+ co_yield pathsToCopy.size();
+ for (auto & [pathInfo, pathSource] : pathsToCopy) {
+ co_yield WorkerProto::Serialise<ValidPathInfo>::write(*self,
+ WorkerProto::WriteConn {remoteVersion},
+ pathInfo);
+ try {
+ char buf[65536];
+ while (true) {
+ const auto read = pathSource->read(buf, sizeof(buf));
+ co_yield std::span{buf, read};
+ }
+ } catch (EndOfFile &) {
+ }
+ }
+ }(this, pathsToCopy, remoteVersion)
+ };
- addMultipleToStore(*source, repair, checkSigs);
+ addMultipleToStore(source, repair, checkSigs);
}
void RemoteStore::addMultipleToStore(
diff --git a/src/libstore/store-api.cc b/src/libstore/store-api.cc
index 28a414555..cb027d311 100644
--- a/src/libstore/store-api.cc
+++ b/src/libstore/store-api.cc
@@ -335,9 +335,9 @@ void Store::addMultipleToStore(
info.ultimate = false;
/* Make sure that the Source object is destroyed when
- we're done. In particular, a SinkToSource object must
- be destroyed to ensure that the destructors on its
- stack frame are run; this includes
+ we're done. In particular, a coroutine object must
+ be destroyed to ensure that the destructors in its
+ state are run; this includes
LegacySSHStore::narFromPath()'s connection lock. */
auto source = std::move(source_);
@@ -1059,16 +1059,19 @@ void copyStorePath(
info = info2;
}
- auto source = sinkToSource([&](Sink & sink) {
- LambdaSink progressSink([&, total = 0ULL](std::string_view data) mutable {
- total += data.size();
- act.progress(total, info->narSize);
- });
- TeeSink tee { sink, progressSink };
- tee << srcStore.narFromPath(storePath);
- });
+ GeneratorSource source{
+ [](auto & act, auto & info, auto & srcStore, auto & storePath) -> WireFormatGenerator {
+ auto nar = srcStore.narFromPath(storePath);
+ uint64_t total = 0;
+ while (auto data = nar.next()) {
+ total += data->size();
+ act.progress(total, info->narSize);
+ co_yield *data;
+ }
+ }(act, info, srcStore, storePath)
+ };
- dstStore.addToStore(*info, *source, repair, checkSigs);
+ dstStore.addToStore(*info, source, repair, checkSigs);
}
@@ -1180,31 +1183,34 @@ std::map<StorePath, StorePath> copyPaths(
ValidPathInfo infoForDst = *info;
infoForDst.path = storePathForDst;
- auto source =
- sinkToSource([&srcStore, &dstStore, missingPath = missingPath, info = std::move(info)](Sink & sink) {
- // We can reasonably assume that the copy will happen whenever we
- // read the path, so log something about that at that point
- auto srcUri = srcStore.getUri();
- auto dstUri = dstStore.getUri();
- auto storePathS = srcStore.printStorePath(missingPath);
- Activity act(
- *logger,
- lvlInfo,
- actCopyPath,
- makeCopyPathMessage(srcUri, dstUri, storePathS),
- {storePathS, srcUri, dstUri}
- );
- PushActivity pact(act.id);
-
- LambdaSink progressSink([&, total = 0ULL](std::string_view data) mutable {
- total += data.size();
- act.progress(total, info->narSize);
- });
- TeeSink tee{sink, progressSink};
-
- tee << srcStore.narFromPath(missingPath);
- });
- pathsToCopy.push_back(std::pair{infoForDst, std::move(source)});
+ auto source = [](auto & srcStore, auto & dstStore, auto missingPath, auto info
+ ) -> WireFormatGenerator {
+ // We can reasonably assume that the copy will happen whenever we
+ // read the path, so log something about that at that point
+ auto srcUri = srcStore.getUri();
+ auto dstUri = dstStore.getUri();
+ auto storePathS = srcStore.printStorePath(missingPath);
+ Activity act(
+ *logger,
+ lvlInfo,
+ actCopyPath,
+ makeCopyPathMessage(srcUri, dstUri, storePathS),
+ {storePathS, srcUri, dstUri}
+ );
+ PushActivity pact(act.id);
+
+ auto nar = srcStore.narFromPath(missingPath);
+ uint64_t total = 0;
+ while (auto data = nar.next()) {
+ total += data->size();
+ act.progress(total, info->narSize);
+ co_yield *data;
+ }
+ };
+ pathsToCopy.push_back(std::pair{
+ infoForDst,
+ std::make_unique<GeneratorSource>(source(srcStore, dstStore, missingPath, info))
+ });
}
dstStore.addMultipleToStore(pathsToCopy, act, repair, checkSigs);