aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/libutil/thread-pool.hh75
1 files changed, 47 insertions, 28 deletions
diff --git a/src/libutil/thread-pool.hh b/src/libutil/thread-pool.hh
index b64dc52d4..361a9d33a 100644
--- a/src/libutil/thread-pool.hh
+++ b/src/libutil/thread-pool.hh
@@ -70,50 +70,69 @@ void processGraph(
struct Graph {
std::set<T> left;
std::map<T, std::set<T>> refs, rrefs;
- std::function<void(T)> wrap;
};
- ref<Sync<Graph>> graph_ = make_ref<Sync<Graph>>();
+ Sync<Graph> graph_(Graph{nodes, {}, {}});
- auto wrapWork = [&pool, graph_, processNode](const T & node) {
+ std::function<void(const T &)> worker;
+
+ worker = [&](const T & node) {
+
+ {
+ auto graph(graph_.lock());
+ auto i = graph->refs.find(node);
+ if (i == graph->refs.end())
+ goto getRefs;
+ goto doWork;
+ }
+
+ getRefs:
+ {
+ auto refs = getEdges(node);
+ refs.erase(node);
+
+ {
+ auto graph(graph_.lock());
+ for (auto & ref : refs)
+ if (graph->left.count(ref)) {
+ graph->refs[node].insert(ref);
+ graph->rrefs[ref].insert(node);
+ }
+ if (graph->refs[node].empty())
+ goto doWork;
+ }
+ }
+
+ return;
+
+ doWork:
processNode(node);
- /* Enqueue work for all nodes that were waiting on this one. */
+ /* Enqueue work for all nodes that were waiting on this one
+ and have no unprocessed dependencies. */
{
- auto graph(graph_->lock());
- graph->left.erase(node);
+ auto graph(graph_.lock());
for (auto & rref : graph->rrefs[node]) {
auto & refs(graph->refs[rref]);
auto i = refs.find(node);
assert(i != refs.end());
refs.erase(i);
if (refs.empty())
- pool.enqueue(std::bind(graph->wrap, rref));
+ pool.enqueue(std::bind(worker, rref));
}
+ graph->left.erase(node);
+ graph->refs.erase(node);
+ graph->rrefs.erase(node);
}
};
- {
- auto graph(graph_->lock());
- graph->left = nodes;
- graph->wrap = wrapWork;
- }
-
- /* Build the dependency graph; enqueue all nodes with no
- dependencies. */
- for (auto & node : nodes) {
- auto refs = getEdges(node);
- {
- auto graph(graph_->lock());
- for (auto & ref : refs)
- if (ref != node && graph->left.count(ref)) {
- graph->refs[node].insert(ref);
- graph->rrefs[ref].insert(node);
- }
- if (graph->refs[node].empty())
- pool.enqueue(std::bind(graph->wrap, node));
- }
- }
+ for (auto & node : nodes)
+ pool.enqueue(std::bind(worker, std::ref(node)));
+
+ pool.process();
+
+ if (!graph_.lock()->left.empty())
+ throw Error("graph processing incomplete (cyclic reference?)");
}
}