#pragma once /// @file #include #include #include #include #include #include namespace nix { template class AsyncCollect { public: using Item = std::conditional_t, K, std::pair>; private: kj::ForkedPromise allPromises; std::list results; size_t remaining; kj::ForkedPromise signal; kj::Maybe>> notify; void oneDone(Item item) { results.emplace_back(std::move(item)); remaining -= 1; KJ_IF_MAYBE (n, notify) { (*n)->fulfill(); notify = nullptr; } } kj::Promise collectorFor(K key, kj::Promise promise) { if constexpr (std::is_void_v) { return promise.then([this, key{std::move(key)}] { oneDone(std::move(key)); }); } else { return promise.then([this, key{std::move(key)}](V v) { oneDone(Item{std::move(key), std::move(v)}); }); } } kj::ForkedPromise waitForAll(kj::Array>> & promises) { kj::Vector> wrappers; for (auto & [key, promise] : promises) { wrappers.add(collectorFor(std::move(key), std::move(promise))); } return kj::joinPromisesFailFast(wrappers.releaseAsArray()).fork(); } public: AsyncCollect(kj::Array>> && promises) : allPromises(waitForAll(promises)) , remaining(promises.size()) , signal{nullptr} { } kj::Promise> next() { if (remaining == 0 && results.empty()) { return {std::nullopt}; } if (!results.empty()) { auto result = std::move(results.front()); results.pop_front(); return {{std::move(result)}}; } if (notify == nullptr) { auto pair = kj::newPromiseAndFulfiller(); notify = std::move(pair.fulfiller); signal = pair.promise.fork(); } return signal.addBranch().exclusiveJoin(allPromises.addBranch()).then([this] { return next(); }); } }; /** * Collect the results of a list of promises, in order of completion. * Once any input promise is rejected all promises that have not been * resolved or rejected will be cancelled and the exception rethrown. */ template AsyncCollect asyncCollect(kj::Array>> promises) { return AsyncCollect(std::move(promises)); } }