aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/libstore/misc.cc143
-rw-r--r--src/libutil/closure.hh69
-rw-r--r--src/libutil/tests/closure.cc70
3 files changed, 198 insertions, 84 deletions
diff --git a/src/libstore/misc.cc b/src/libstore/misc.cc
index a99a2fc78..bc5fd968c 100644
--- a/src/libstore/misc.cc
+++ b/src/libstore/misc.cc
@@ -6,98 +6,73 @@
#include "thread-pool.hh"
#include "topo-sort.hh"
#include "callback.hh"
+#include "closure.hh"
namespace nix {
-
void Store::computeFSClosure(const StorePathSet & startPaths,
StorePathSet & paths_, bool flipDirection, bool includeOutputs, bool includeDerivers)
{
- struct State
- {
- size_t pending;
- StorePathSet & paths;
- std::exception_ptr exc;
- };
-
- Sync<State> state_(State{0, paths_, 0});
-
- std::function<void(const StorePath &)> enqueue;
-
- std::condition_variable done;
-
- enqueue = [&](const StorePath & path) -> void {
- {
- auto state(state_.lock());
- if (state->exc) return;
- if (!state->paths.insert(path).second) return;
- state->pending++;
- }
-
- queryPathInfo(path, {[&](std::future<ref<const ValidPathInfo>> fut) {
- // FIXME: calls to isValidPath() should be async
-
- try {
- auto info = fut.get();
-
- if (flipDirection) {
-
- StorePathSet referrers;
- queryReferrers(path, referrers);
- for (auto & ref : referrers)
- if (ref != path)
- enqueue(ref);
-
- if (includeOutputs)
- for (auto & i : queryValidDerivers(path))
- enqueue(i);
-
- if (includeDerivers && path.isDerivation())
- for (auto & i : queryDerivationOutputs(path))
- if (isValidPath(i) && queryPathInfo(i)->deriver == path)
- enqueue(i);
-
- } else {
-
- for (auto & ref : info->references)
- if (ref != path)
- enqueue(ref);
-
- if (includeOutputs && path.isDerivation())
- for (auto & i : queryDerivationOutputs(path))
- if (isValidPath(i)) enqueue(i);
-
- if (includeDerivers && info->deriver && isValidPath(*info->deriver))
- enqueue(*info->deriver);
-
- }
-
- {
- auto state(state_.lock());
- assert(state->pending);
- if (!--state->pending) done.notify_one();
- }
-
- } catch (...) {
- auto state(state_.lock());
- if (!state->exc) state->exc = std::current_exception();
- assert(state->pending);
- if (!--state->pending) done.notify_one();
- };
- }});
- };
-
- for (auto & startPath : startPaths)
- enqueue(startPath);
-
- {
- auto state(state_.lock());
- while (state->pending) state.wait(done);
- if (state->exc) std::rethrow_exception(state->exc);
- }
+ std::function<std::set<StorePath>(const StorePath & path, std::future<ref<const ValidPathInfo>> &)> queryDeps;
+ if (flipDirection)
+ queryDeps = [&](const StorePath& path,
+ std::future<ref<const ValidPathInfo>> & fut) {
+ StorePathSet res;
+ StorePathSet referrers;
+ queryReferrers(path, referrers);
+ for (auto& ref : referrers)
+ if (ref != path)
+ res.insert(ref);
+
+ if (includeOutputs)
+ for (auto& i : queryValidDerivers(path))
+ res.insert(i);
+
+ if (includeDerivers && path.isDerivation())
+ for (auto& i : queryDerivationOutputs(path))
+ if (isValidPath(i) && queryPathInfo(i)->deriver == path)
+ res.insert(i);
+ return res;
+ };
+ else
+ queryDeps = [&](const StorePath& path,
+ std::future<ref<const ValidPathInfo>> & fut) {
+ StorePathSet res;
+ auto info = fut.get();
+ for (auto& ref : info->references)
+ if (ref != path)
+ res.insert(ref);
+
+ if (includeOutputs && path.isDerivation())
+ for (auto& i : queryDerivationOutputs(path))
+ if (isValidPath(i))
+ res.insert(i);
+
+ if (includeDerivers && info->deriver && isValidPath(*info->deriver))
+ res.insert(*info->deriver);
+ return res;
+ };
+
+ computeClosure<StorePath>(
+ startPaths, paths_,
+ [&](const StorePath& path,
+ std::function<void(std::promise<std::set<StorePath>>&)>
+ processEdges) {
+ std::promise<std::set<StorePath>> promise;
+ std::function<void(std::future<ref<const ValidPathInfo>>)>
+ getDependencies =
+ [&](std::future<ref<const ValidPathInfo>> fut) {
+ try {
+ promise.set_value(queryDeps(path, fut));
+ } catch (...) {
+ promise.set_exception(std::current_exception());
+ }
+ };
+ queryPathInfo(path, getDependencies);
+ processEdges(promise);
+ });
}
-
void Store::computeFSClosure(const StorePath & startPath,
StorePathSet & paths_, bool flipDirection, bool includeOutputs, bool includeDerivers)
{
diff --git a/src/libutil/closure.hh b/src/libutil/closure.hh
new file mode 100644
index 000000000..779b9b2d5
--- /dev/null
+++ b/src/libutil/closure.hh
@@ -0,0 +1,69 @@
+#include <set>
+#include <future>
+#include "sync.hh"
+
+using std::set;
+
+namespace nix {
+
+template<typename T>
+using GetEdgesAsync = std::function<void(const T &, std::function<void(std::promise<set<T>> &)>)>;
+
+template<typename T>
+void computeClosure(
+ const set<T> startElts,
+ set<T> & res,
+ GetEdgesAsync<T> getEdgesAsync
+)
+{
+ struct State
+ {
+ size_t pending;
+ set<T> & res;
+ std::exception_ptr exc;
+ };
+
+ Sync<State> state_(State{0, res, 0});
+
+ std::function<void(const T &)> enqueue;
+
+ std::condition_variable done;
+
+ enqueue = [&](const T & current) -> void {
+ {
+ auto state(state_.lock());
+ if (state->exc) return;
+ if (!state->res.insert(current).second) return;
+ state->pending++;
+ }
+
+ getEdgesAsync(current, [&](std::promise<set<T>> & prom) {
+ try {
+ auto children = prom.get_future().get();
+ for (auto & child : children)
+ enqueue(child);
+ {
+ auto state(state_.lock());
+ assert(state->pending);
+ if (!--state->pending) done.notify_one();
+ }
+ } catch (...) {
+ auto state(state_.lock());
+ if (!state->exc) state->exc = std::current_exception();
+ assert(state->pending);
+ if (!--state->pending) done.notify_one();
+ };
+ });
+ };
+
+ for (auto & startElt : startElts)
+ enqueue(startElt);
+
+ {
+ auto state(state_.lock());
+ while (state->pending) state.wait(done);
+ if (state->exc) std::rethrow_exception(state->exc);
+ }
+}
+
+}
diff --git a/src/libutil/tests/closure.cc b/src/libutil/tests/closure.cc
new file mode 100644
index 000000000..7597e7807
--- /dev/null
+++ b/src/libutil/tests/closure.cc
@@ -0,0 +1,70 @@
+#include "closure.hh"
+#include <gtest/gtest.h>
+
+namespace nix {
+
+using namespace std;
+
+map<string, set<string>> testGraph = {
+ { "A", { "B", "C", "G" } },
+ { "B", { "A" } }, // Loops back to A
+ { "C", { "F" } }, // Indirect reference
+ { "D", { "A" } }, // Not reachable, but has backreferences
+ { "E", {} }, // Just not reachable
+ { "F", {} },
+ { "G", { "G" } }, // Self reference
+};
+
+TEST(closure, correctClosure) {
+ set<string> aClosure;
+ set<string> expectedClosure = {"A", "B", "C", "F", "G"};
+ computeClosure<string>(
+ {"A"},
+ aClosure,
+ [&](const string currentNode, function<void(promise<set<string>> &)> processEdges) {
+ promise<set<string>> promisedNodes;
+ promisedNodes.set_value(testGraph[currentNode]);
+ processEdges(promisedNodes);
+ }
+ );
+
+ ASSERT_EQ(aClosure, expectedClosure);
+}
+
+TEST(closure, properlyHandlesDirectExceptions) {
+ struct TestExn {};
+ set<string> aClosure;
+ EXPECT_THROW(
+ computeClosure<string>(
+ {"A"},
+ aClosure,
+ [&](const string currentNode, function<void(promise<set<string>> &)> processEdges) {
+ throw TestExn();
+ }
+ ),
+ TestExn
+ );
+}
+
+TEST(closure, properlyHandlesExceptionsInPromise) {
+ struct TestExn {};
+ set<string> aClosure;
+ EXPECT_THROW(
+ computeClosure<string>(
+ {"A"},
+ aClosure,
+ [&](const string currentNode, function<void(promise<set<string>> &)> processEdges) {
+ promise<set<string>> promise;
+ try {
+ throw TestExn();
+ } catch (...) {
+ promise.set_exception(std::current_exception());
+ }
+ processEdges(promise);
+ }
+ ),
+ TestExn
+ );
+}
+
+}