diff options
Diffstat (limited to 'src/libutil')
-rw-r--r-- | src/libutil/async-collect.hh | 101 | ||||
-rw-r--r-- | src/libutil/async-semaphore.hh | 122 | ||||
-rw-r--r-- | src/libutil/fmt.hh | 14 | ||||
-rw-r--r-- | src/libutil/meson.build | 2 |
4 files changed, 234 insertions, 5 deletions
diff --git a/src/libutil/async-collect.hh b/src/libutil/async-collect.hh new file mode 100644 index 000000000..9e0b8bad9 --- /dev/null +++ b/src/libutil/async-collect.hh @@ -0,0 +1,101 @@ +#pragma once +/// @file + +#include <kj/async.h> +#include <kj/common.h> +#include <kj/vector.h> +#include <list> +#include <optional> +#include <type_traits> + +namespace nix { + +template<typename K, typename V> +class AsyncCollect +{ +public: + using Item = std::conditional_t<std::is_void_v<V>, K, std::pair<K, V>>; + +private: + kj::ForkedPromise<void> allPromises; + std::list<Item> results; + size_t remaining; + + kj::ForkedPromise<void> signal; + kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> notify; + + void oneDone(Item item) + { + results.emplace_back(std::move(item)); + remaining -= 1; + KJ_IF_MAYBE (n, notify) { + (*n)->fulfill(); + notify = nullptr; + } + } + + kj::Promise<void> collectorFor(K key, kj::Promise<V> promise) + { + if constexpr (std::is_void_v<V>) { + return promise.then([this, key{std::move(key)}] { oneDone(std::move(key)); }); + } else { + return promise.then([this, key{std::move(key)}](V v) { + oneDone(Item{std::move(key), std::move(v)}); + }); + } + } + + kj::ForkedPromise<void> waitForAll(kj::Array<std::pair<K, kj::Promise<V>>> & promises) + { + kj::Vector<kj::Promise<void>> wrappers; + for (auto & [key, promise] : promises) { + wrappers.add(collectorFor(std::move(key), std::move(promise))); + } + + return kj::joinPromisesFailFast(wrappers.releaseAsArray()).fork(); + } + +public: + AsyncCollect(kj::Array<std::pair<K, kj::Promise<V>>> && promises) + : allPromises(waitForAll(promises)) + , remaining(promises.size()) + , signal{nullptr} + { + } + + kj::Promise<std::optional<Item>> next() + { + if (remaining == 0 && results.empty()) { + return {std::nullopt}; + } + + if (!results.empty()) { + auto result = std::move(results.front()); + results.pop_front(); + return {{std::move(result)}}; + } + + if (notify == nullptr) { + auto pair = kj::newPromiseAndFulfiller<void>(); + notify = std::move(pair.fulfiller); + signal = pair.promise.fork(); + } + + return signal.addBranch().exclusiveJoin(allPromises.addBranch()).then([this] { + return next(); + }); + } +}; + +/** + * Collect the results of a list of promises, in order of completion. + * Once any input promise is rejected all promises that have not been + * resolved or rejected will be cancelled and the exception rethrown. + */ +template<typename K, typename V> +AsyncCollect<K, V> asyncCollect(kj::Array<std::pair<K, kj::Promise<V>>> promises) +{ + return AsyncCollect<K, V>(std::move(promises)); +} + +} diff --git a/src/libutil/async-semaphore.hh b/src/libutil/async-semaphore.hh new file mode 100644 index 000000000..f8db31a68 --- /dev/null +++ b/src/libutil/async-semaphore.hh @@ -0,0 +1,122 @@ +#pragma once +/// @file +/// @brief A semaphore implementation usable from within a KJ event loop. + +#include <cassert> +#include <kj/async.h> +#include <kj/common.h> +#include <kj/exception.h> +#include <kj/list.h> +#include <kj/source-location.h> +#include <memory> +#include <optional> + +namespace nix { + +class AsyncSemaphore +{ +public: + class [[nodiscard("destroying a semaphore guard releases the semaphore immediately")]] Token + { + struct Release + { + void operator()(AsyncSemaphore * sem) const + { + sem->unsafeRelease(); + } + }; + + std::unique_ptr<AsyncSemaphore, Release> parent; + + public: + Token() = default; + Token(AsyncSemaphore & parent, kj::Badge<AsyncSemaphore>) : parent(&parent) {} + + bool valid() const + { + return parent != nullptr; + } + }; + +private: + struct Waiter + { + kj::PromiseFulfiller<Token> & fulfiller; + kj::ListLink<Waiter> link; + kj::List<Waiter, &Waiter::link> & list; + + Waiter(kj::PromiseFulfiller<Token> & fulfiller, kj::List<Waiter, &Waiter::link> & list) + : fulfiller(fulfiller) + , list(list) + { + list.add(*this); + } + + ~Waiter() + { + if (link.isLinked()) { + list.remove(*this); + } + } + }; + + const unsigned capacity_; + unsigned used_ = 0; + kj::List<Waiter, &Waiter::link> waiters; + + void unsafeRelease() + { + used_ -= 1; + while (used_ < capacity_ && !waiters.empty()) { + used_ += 1; + auto & w = waiters.front(); + w.fulfiller.fulfill(Token{*this, {}}); + waiters.remove(w); + } + } + +public: + explicit AsyncSemaphore(unsigned capacity) : capacity_(capacity) {} + + KJ_DISALLOW_COPY_AND_MOVE(AsyncSemaphore); + + ~AsyncSemaphore() + { + assert(waiters.empty() && "destroyed a semaphore with active waiters"); + } + + std::optional<Token> tryAcquire() + { + if (used_ < capacity_) { + used_ += 1; + return Token{*this, {}}; + } else { + return {}; + } + } + + kj::Promise<Token> acquire() + { + if (auto t = tryAcquire()) { + return std::move(*t); + } else { + return kj::newAdaptedPromise<Token, Waiter>(waiters); + } + } + + unsigned capacity() const + { + return capacity_; + } + + unsigned used() const + { + return used_; + } + + unsigned available() const + { + return capacity_ - used_; + } +}; +} diff --git a/src/libutil/fmt.hh b/src/libutil/fmt.hh index ee3e1e2e7..5feefdf90 100644 --- a/src/libutil/fmt.hh +++ b/src/libutil/fmt.hh @@ -136,11 +136,17 @@ inline std::string fmt(const char * s) template<typename... Args> inline std::string fmt(const std::string & fs, const Args &... args) -{ +try { boost::format f(fs); fmt_internal::setExceptions(f); (f % ... % args); return f.str(); +} catch (boost::io::format_error & fe) { + // I don't care who catches this, we do not put up with boost format errors + // Give me a stack trace and a core dump + std::cerr << "nix::fmt threw format error. Original format string: '"; + std::cerr << fs << "'; number of arguments: " << sizeof...(args) << "\n"; + std::terminate(); } /** @@ -174,15 +180,13 @@ public: std::cerr << "HintFmt received incorrect number of format args. Original format string: '"; std::cerr << format << "'; number of arguments: " << sizeof...(args) << "\n"; // And regardless of the coredump give me a damn stacktrace. - printStackTrace(); - abort(); + std::terminate(); } } catch (boost::io::format_error & ex) { // Same thing, but for anything that happens in the member initializers. std::cerr << "HintFmt received incorrect format string. Original format string: '"; std::cerr << format << "'; number of arguments: " << sizeof...(args) << "\n"; - printStackTrace(); - abort(); + std::terminate(); } HintFmt(const HintFmt & hf) : fmt(hf.fmt) {} diff --git a/src/libutil/meson.build b/src/libutil/meson.build index a3f21de59..afca4e021 100644 --- a/src/libutil/meson.build +++ b/src/libutil/meson.build @@ -53,6 +53,8 @@ libutil_headers = files( 'archive.hh', 'args/root.hh', 'args.hh', + 'async-collect.hh', + 'async-semaphore.hh', 'backed-string-view.hh', 'box_ptr.hh', 'canon-path.hh', |