blob: 779b9b2d54a3e20ce2836d84ebbafeb056f6fe85 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
#include <set>
#include <future>
#include "sync.hh"
using std::set;
namespace nix {
template<typename T>
using GetEdgesAsync = std::function<void(const T &, std::function<void(std::promise<set<T>> &)>)>;
template<typename T>
void computeClosure(
const set<T> startElts,
set<T> & res,
GetEdgesAsync<T> getEdgesAsync
)
{
struct State
{
size_t pending;
set<T> & res;
std::exception_ptr exc;
};
Sync<State> state_(State{0, res, 0});
std::function<void(const T &)> enqueue;
std::condition_variable done;
enqueue = [&](const T & current) -> void {
{
auto state(state_.lock());
if (state->exc) return;
if (!state->res.insert(current).second) return;
state->pending++;
}
getEdgesAsync(current, [&](std::promise<set<T>> & prom) {
try {
auto children = prom.get_future().get();
for (auto & child : children)
enqueue(child);
{
auto state(state_.lock());
assert(state->pending);
if (!--state->pending) done.notify_one();
}
} catch (...) {
auto state(state_.lock());
if (!state->exc) state->exc = std::current_exception();
assert(state->pending);
if (!--state->pending) done.notify_one();
};
});
};
for (auto & startElt : startElts)
enqueue(startElt);
{
auto state(state_.lock());
while (state->pending) state.wait(done);
if (state->exc) std::rethrow_exception(state->exc);
}
}
}
|