aboutsummaryrefslogtreecommitdiff
path: root/src/libstore
diff options
context:
space:
mode:
authorThéophane Hufschmitt <theophane.hufschmitt@tweag.io>2022-06-03 17:01:16 +0200
committerThéophane Hufschmitt <theophane.hufschmitt@tweag.io>2022-06-03 17:01:16 +0200
commit95f47c28fb8786f8d8d529192465bb6ec20db46b (patch)
treeb471a7f9387106a8e48723c253d138853b2a6d64 /src/libstore
parent1dd7253133c4dfd2e7a16ad6fe505442cef38a5b (diff)
Make nix copy parallel again
FILLME
Diffstat (limited to 'src/libstore')
-rw-r--r--src/libstore/store-api.cc207
-rw-r--r--src/libstore/store-api.hh7
2 files changed, 117 insertions, 97 deletions
diff --git a/src/libstore/store-api.cc b/src/libstore/store-api.cc
index 8861274a2..008451666 100644
--- a/src/libstore/store-api.cc
+++ b/src/libstore/store-api.cc
@@ -258,6 +258,86 @@ StorePath Store::addToStore(
return addToStoreFromDump(*source, name, method, hashAlgo, repair, references);
}
+void Store::addMultipleToStore(
+ std::vector<std::pair<ValidPathInfo, std::unique_ptr<Source>>> & pathsToCopy,
+ Activity & act,
+ RepairFlag repair,
+ CheckSigsFlag checkSigs)
+{
+ std::atomic<size_t> nrDone{0};
+ std::atomic<size_t> nrFailed{0};
+ std::atomic<uint64_t> bytesExpected{0};
+ std::atomic<uint64_t> nrRunning{0};
+
+ using PathWithInfo = std::pair<ValidPathInfo, std::unique_ptr<Source>>;
+
+ std::map<StorePath, PathWithInfo*> infosMap;
+ StorePathSet storePathsToAdd;
+ for (auto & thingToAdd : pathsToCopy) {
+ infosMap.insert_or_assign(thingToAdd.first.path, &thingToAdd);
+ storePathsToAdd.insert(thingToAdd.first.path);
+ }
+
+ auto showProgress = [&]() {
+ act.progress(nrDone, pathsToCopy.size(), nrRunning, nrFailed);
+ };
+
+ ThreadPool pool;
+
+ processGraph<StorePath>(pool,
+ storePathsToAdd,
+
+ [&](const StorePath & path) {
+ auto & [info, source] = *infosMap.at(path);
+ /* auto storePathForDst = info.storePath; */
+ /* if (info->ca && info->references.empty()) { */
+ /* storePathForDst = dstStore.makeFixedOutputPathFromCA(storePath.name(), *info->ca); */
+ /* if (dstStore.storeDir == srcStore.storeDir) */
+ /* assert(storePathForDst == storePath); */
+ /* if (storePathForDst != storePath) */
+ /* debug("replaced path '%s' to '%s' for substituter '%s'", */
+ /* srcStore.printStorePath(storePath), */
+ /* dstStore.printStorePath(storePathForDst), */
+ /* dstStore.getUri()); */
+ /* } */
+ /* pathsMap.insert_or_assign(storePath, storePathForDst); */
+
+ if (isValidPath(info.path)) {
+ nrDone++;
+ showProgress();
+ return StorePathSet();
+ }
+
+ bytesExpected += info.narSize;
+ act.setExpected(actCopyPath, bytesExpected);
+
+ return info.references;
+ },
+
+ [&](const StorePath & path) {
+ checkInterrupt();
+
+ auto & [info, source] = *infosMap.at(path);
+
+ if (!isValidPath(info.path)) {
+ MaintainCount<decltype(nrRunning)> mc(nrRunning);
+ showProgress();
+ try {
+ addToStore(info, *source, repair, checkSigs);
+ } catch (Error &e) {
+ nrFailed++;
+ if (!settings.keepGoing)
+ throw e;
+ printMsg(lvlError, "could not copy %s: %s", printStorePath(path), e.what());
+ showProgress();
+ return;
+ }
+ }
+
+ nrDone++;
+ showProgress();
+ });
+}
void Store::addMultipleToStore(
Source & source,
@@ -998,106 +1078,39 @@ std::map<StorePath, StorePath> copyPaths(
Activity act(*logger, lvlInfo, actCopyPaths, fmt("copying %d paths", missing.size()));
- auto sorted = srcStore.topoSortPaths(missing);
- std::reverse(sorted.begin(), sorted.end());
-
- auto source = sinkToSource([&](Sink & sink) {
- sink << sorted.size();
- for (auto & storePath : sorted) {
- auto srcUri = srcStore.getUri();
- auto dstUri = dstStore.getUri();
- auto storePathS = srcStore.printStorePath(storePath);
- Activity act(*logger, lvlInfo, actCopyPath,
- makeCopyPathMessage(srcUri, dstUri, storePathS),
- {storePathS, srcUri, dstUri});
- PushActivity pact(act.id);
-
- auto info = srcStore.queryPathInfo(storePath);
- info->write(sink, srcStore, 16);
- srcStore.narFromPath(storePath, sink);
- }
- });
+ /* auto sorted = srcStore.topoSortPaths(missing); */
+ /* std::reverse(sorted.begin(), sorted.end()); */
+
+ /* auto source = sinkToSource([&](Sink & sink) { */
+ /* sink << sorted.size(); */
+ /* for (auto & storePath : sorted) { */
+ /* auto srcUri = srcStore.getUri(); */
+ /* auto dstUri = dstStore.getUri(); */
+ /* auto storePathS = srcStore.printStorePath(storePath); */
+ /* Activity act(*logger, lvlInfo, actCopyPath, */
+ /* makeCopyPathMessage(srcUri, dstUri, storePathS), */
+ /* {storePathS, srcUri, dstUri}); */
+ /* PushActivity pact(act.id); */
+
+ /* auto info = srcStore.queryPathInfo(storePath); */
+ /* info->write(sink, srcStore, 16); */
+ /* srcStore.narFromPath(storePath, sink); */
+ /* } */
+ /* }); */
+
+ std::vector<std::pair<ValidPathInfo, std::unique_ptr<Source>>> pathsToCopy;
+
+ for (auto & missingPath : missing) {
+ auto info = srcStore.queryPathInfo(missingPath);
+ auto source = sinkToSource([&](Sink & sink) {
+ srcStore.narFromPath(missingPath, sink);
+ });
+ pathsToCopy.push_back(std::pair{*info, std::move(source)});
+ }
- dstStore.addMultipleToStore(*source, repair, checkSigs);
+ dstStore.addMultipleToStore(pathsToCopy, act, repair, checkSigs);
#if 0
- std::atomic<size_t> nrDone{0};
- std::atomic<size_t> nrFailed{0};
- std::atomic<uint64_t> bytesExpected{0};
- std::atomic<uint64_t> nrRunning{0};
-
- auto showProgress = [&]() {
- act.progress(nrDone, missing.size(), nrRunning, nrFailed);
- };
-
- ThreadPool pool;
-
- processGraph<StorePath>(pool,
- StorePathSet(missing.begin(), missing.end()),
-
- [&](const StorePath & storePath) {
- auto info = srcStore.queryPathInfo(storePath);
- auto storePathForDst = storePath;
- if (info->ca && info->references.empty()) {
- storePathForDst = dstStore.makeFixedOutputPathFromCA(storePath.name(), *info->ca);
- if (dstStore.storeDir == srcStore.storeDir)
- assert(storePathForDst == storePath);
- if (storePathForDst != storePath)
- debug("replaced path '%s' to '%s' for substituter '%s'",
- srcStore.printStorePath(storePath),
- dstStore.printStorePath(storePathForDst),
- dstStore.getUri());
- }
- pathsMap.insert_or_assign(storePath, storePathForDst);
-
- if (dstStore.isValidPath(storePath)) {
- nrDone++;
- showProgress();
- return StorePathSet();
- }
-
- bytesExpected += info->narSize;
- act.setExpected(actCopyPath, bytesExpected);
-
- return info->references;
- },
-
- [&](const StorePath & storePath) {
- checkInterrupt();
-
- auto info = srcStore.queryPathInfo(storePath);
-
- auto storePathForDst = storePath;
- if (info->ca && info->references.empty()) {
- storePathForDst = dstStore.makeFixedOutputPathFromCA(storePath.name(), *info->ca);
- if (dstStore.storeDir == srcStore.storeDir)
- assert(storePathForDst == storePath);
- if (storePathForDst != storePath)
- debug("replaced path '%s' to '%s' for substituter '%s'",
- srcStore.printStorePath(storePath),
- dstStore.printStorePath(storePathForDst),
- dstStore.getUri());
- }
- pathsMap.insert_or_assign(storePath, storePathForDst);
-
- if (!dstStore.isValidPath(storePathForDst)) {
- MaintainCount<decltype(nrRunning)> mc(nrRunning);
- showProgress();
- try {
- copyStorePath(srcStore, dstStore, storePath, repair, checkSigs);
- } catch (Error &e) {
- nrFailed++;
- if (!settings.keepGoing)
- throw e;
- printMsg(lvlError, "could not copy %s: %s", dstStore.printStorePath(storePath), e.what());
- showProgress();
- return;
- }
- }
-
- nrDone++;
- showProgress();
- });
#endif
return pathsMap;
diff --git a/src/libstore/store-api.hh b/src/libstore/store-api.hh
index 0c8a4db56..d934979cf 100644
--- a/src/libstore/store-api.hh
+++ b/src/libstore/store-api.hh
@@ -1,5 +1,6 @@
#pragma once
+#include "nar-info.hh"
#include "realisation.hh"
#include "path.hh"
#include "derived-path.hh"
@@ -364,6 +365,12 @@ public:
Source & source,
RepairFlag repair = NoRepair,
CheckSigsFlag checkSigs = CheckSigs);
+ virtual void addMultipleToStore(
+ std::vector<std::pair<ValidPathInfo, std::unique_ptr<Source>>> & pathsToCopy,
+ Activity & act,
+ RepairFlag repair = NoRepair,
+ CheckSigsFlag checkSigs = CheckSigs
+ );
/* Copy the contents of a path to the store and register the
validity the resulting path. The resulting path is returned.