diff options
author | Théophane Hufschmitt <theophane.hufschmitt@tweag.io> | 2022-06-03 17:01:16 +0200 |
---|---|---|
committer | Théophane Hufschmitt <theophane.hufschmitt@tweag.io> | 2022-06-03 17:01:16 +0200 |
commit | 95f47c28fb8786f8d8d529192465bb6ec20db46b (patch) | |
tree | b471a7f9387106a8e48723c253d138853b2a6d64 | |
parent | 1dd7253133c4dfd2e7a16ad6fe505442cef38a5b (diff) |
Make nix copy parallel again
FILLME
-rw-r--r-- | src/libstore/store-api.cc | 207 | ||||
-rw-r--r-- | src/libstore/store-api.hh | 7 |
2 files changed, 117 insertions, 97 deletions
diff --git a/src/libstore/store-api.cc b/src/libstore/store-api.cc index 8861274a2..008451666 100644 --- a/src/libstore/store-api.cc +++ b/src/libstore/store-api.cc @@ -258,6 +258,86 @@ StorePath Store::addToStore( return addToStoreFromDump(*source, name, method, hashAlgo, repair, references); } +void Store::addMultipleToStore( + std::vector<std::pair<ValidPathInfo, std::unique_ptr<Source>>> & pathsToCopy, + Activity & act, + RepairFlag repair, + CheckSigsFlag checkSigs) +{ + std::atomic<size_t> nrDone{0}; + std::atomic<size_t> nrFailed{0}; + std::atomic<uint64_t> bytesExpected{0}; + std::atomic<uint64_t> nrRunning{0}; + + using PathWithInfo = std::pair<ValidPathInfo, std::unique_ptr<Source>>; + + std::map<StorePath, PathWithInfo*> infosMap; + StorePathSet storePathsToAdd; + for (auto & thingToAdd : pathsToCopy) { + infosMap.insert_or_assign(thingToAdd.first.path, &thingToAdd); + storePathsToAdd.insert(thingToAdd.first.path); + } + + auto showProgress = [&]() { + act.progress(nrDone, pathsToCopy.size(), nrRunning, nrFailed); + }; + + ThreadPool pool; + + processGraph<StorePath>(pool, + storePathsToAdd, + + [&](const StorePath & path) { + auto & [info, source] = *infosMap.at(path); + /* auto storePathForDst = info.storePath; */ + /* if (info->ca && info->references.empty()) { */ + /* storePathForDst = dstStore.makeFixedOutputPathFromCA(storePath.name(), *info->ca); */ + /* if (dstStore.storeDir == srcStore.storeDir) */ + /* assert(storePathForDst == storePath); */ + /* if (storePathForDst != storePath) */ + /* debug("replaced path '%s' to '%s' for substituter '%s'", */ + /* srcStore.printStorePath(storePath), */ + /* dstStore.printStorePath(storePathForDst), */ + /* dstStore.getUri()); */ + /* } */ + /* pathsMap.insert_or_assign(storePath, storePathForDst); */ + + if (isValidPath(info.path)) { + nrDone++; + showProgress(); + return StorePathSet(); + } + + bytesExpected += info.narSize; + act.setExpected(actCopyPath, bytesExpected); + + return info.references; + }, + + [&](const StorePath & path) { + checkInterrupt(); + + auto & [info, source] = *infosMap.at(path); + + if (!isValidPath(info.path)) { + MaintainCount<decltype(nrRunning)> mc(nrRunning); + showProgress(); + try { + addToStore(info, *source, repair, checkSigs); + } catch (Error &e) { + nrFailed++; + if (!settings.keepGoing) + throw e; + printMsg(lvlError, "could not copy %s: %s", printStorePath(path), e.what()); + showProgress(); + return; + } + } + + nrDone++; + showProgress(); + }); +} void Store::addMultipleToStore( Source & source, @@ -998,106 +1078,39 @@ std::map<StorePath, StorePath> copyPaths( Activity act(*logger, lvlInfo, actCopyPaths, fmt("copying %d paths", missing.size())); - auto sorted = srcStore.topoSortPaths(missing); - std::reverse(sorted.begin(), sorted.end()); - - auto source = sinkToSource([&](Sink & sink) { - sink << sorted.size(); - for (auto & storePath : sorted) { - auto srcUri = srcStore.getUri(); - auto dstUri = dstStore.getUri(); - auto storePathS = srcStore.printStorePath(storePath); - Activity act(*logger, lvlInfo, actCopyPath, - makeCopyPathMessage(srcUri, dstUri, storePathS), - {storePathS, srcUri, dstUri}); - PushActivity pact(act.id); - - auto info = srcStore.queryPathInfo(storePath); - info->write(sink, srcStore, 16); - srcStore.narFromPath(storePath, sink); - } - }); + /* auto sorted = srcStore.topoSortPaths(missing); */ + /* std::reverse(sorted.begin(), sorted.end()); */ + + /* auto source = sinkToSource([&](Sink & sink) { */ + /* sink << sorted.size(); */ + /* for (auto & storePath : sorted) { */ + /* auto srcUri = srcStore.getUri(); */ + /* auto dstUri = dstStore.getUri(); */ + /* auto storePathS = srcStore.printStorePath(storePath); */ + /* Activity act(*logger, lvlInfo, actCopyPath, */ + /* makeCopyPathMessage(srcUri, dstUri, storePathS), */ + /* {storePathS, srcUri, dstUri}); */ + /* PushActivity pact(act.id); */ + + /* auto info = srcStore.queryPathInfo(storePath); */ + /* info->write(sink, srcStore, 16); */ + /* srcStore.narFromPath(storePath, sink); */ + /* } */ + /* }); */ + + std::vector<std::pair<ValidPathInfo, std::unique_ptr<Source>>> pathsToCopy; + + for (auto & missingPath : missing) { + auto info = srcStore.queryPathInfo(missingPath); + auto source = sinkToSource([&](Sink & sink) { + srcStore.narFromPath(missingPath, sink); + }); + pathsToCopy.push_back(std::pair{*info, std::move(source)}); + } - dstStore.addMultipleToStore(*source, repair, checkSigs); + dstStore.addMultipleToStore(pathsToCopy, act, repair, checkSigs); #if 0 - std::atomic<size_t> nrDone{0}; - std::atomic<size_t> nrFailed{0}; - std::atomic<uint64_t> bytesExpected{0}; - std::atomic<uint64_t> nrRunning{0}; - - auto showProgress = [&]() { - act.progress(nrDone, missing.size(), nrRunning, nrFailed); - }; - - ThreadPool pool; - - processGraph<StorePath>(pool, - StorePathSet(missing.begin(), missing.end()), - - [&](const StorePath & storePath) { - auto info = srcStore.queryPathInfo(storePath); - auto storePathForDst = storePath; - if (info->ca && info->references.empty()) { - storePathForDst = dstStore.makeFixedOutputPathFromCA(storePath.name(), *info->ca); - if (dstStore.storeDir == srcStore.storeDir) - assert(storePathForDst == storePath); - if (storePathForDst != storePath) - debug("replaced path '%s' to '%s' for substituter '%s'", - srcStore.printStorePath(storePath), - dstStore.printStorePath(storePathForDst), - dstStore.getUri()); - } - pathsMap.insert_or_assign(storePath, storePathForDst); - - if (dstStore.isValidPath(storePath)) { - nrDone++; - showProgress(); - return StorePathSet(); - } - - bytesExpected += info->narSize; - act.setExpected(actCopyPath, bytesExpected); - - return info->references; - }, - - [&](const StorePath & storePath) { - checkInterrupt(); - - auto info = srcStore.queryPathInfo(storePath); - - auto storePathForDst = storePath; - if (info->ca && info->references.empty()) { - storePathForDst = dstStore.makeFixedOutputPathFromCA(storePath.name(), *info->ca); - if (dstStore.storeDir == srcStore.storeDir) - assert(storePathForDst == storePath); - if (storePathForDst != storePath) - debug("replaced path '%s' to '%s' for substituter '%s'", - srcStore.printStorePath(storePath), - dstStore.printStorePath(storePathForDst), - dstStore.getUri()); - } - pathsMap.insert_or_assign(storePath, storePathForDst); - - if (!dstStore.isValidPath(storePathForDst)) { - MaintainCount<decltype(nrRunning)> mc(nrRunning); - showProgress(); - try { - copyStorePath(srcStore, dstStore, storePath, repair, checkSigs); - } catch (Error &e) { - nrFailed++; - if (!settings.keepGoing) - throw e; - printMsg(lvlError, "could not copy %s: %s", dstStore.printStorePath(storePath), e.what()); - showProgress(); - return; - } - } - - nrDone++; - showProgress(); - }); #endif return pathsMap; diff --git a/src/libstore/store-api.hh b/src/libstore/store-api.hh index 0c8a4db56..d934979cf 100644 --- a/src/libstore/store-api.hh +++ b/src/libstore/store-api.hh @@ -1,5 +1,6 @@ #pragma once +#include "nar-info.hh" #include "realisation.hh" #include "path.hh" #include "derived-path.hh" @@ -364,6 +365,12 @@ public: Source & source, RepairFlag repair = NoRepair, CheckSigsFlag checkSigs = CheckSigs); + virtual void addMultipleToStore( + std::vector<std::pair<ValidPathInfo, std::unique_ptr<Source>>> & pathsToCopy, + Activity & act, + RepairFlag repair = NoRepair, + CheckSigsFlag checkSigs = CheckSigs + ); /* Copy the contents of a path to the store and register the validity the resulting path. The resulting path is returned. |