#pragma once /// @file /// @brief A semaphore implementation usable from within a KJ event loop. #include #include #include #include #include #include #include #include namespace nix { class AsyncSemaphore { public: class [[nodiscard("destroying a semaphore guard releases the semaphore immediately")]] Token { struct Release { void operator()(AsyncSemaphore * sem) const { sem->unsafeRelease(); } }; std::unique_ptr parent; public: Token() = default; Token(AsyncSemaphore & parent, kj::Badge) : parent(&parent) {} bool valid() const { return parent != nullptr; } }; private: struct Waiter { kj::PromiseFulfiller & fulfiller; kj::ListLink link; kj::List & list; Waiter(kj::PromiseFulfiller & fulfiller, kj::List & list) : fulfiller(fulfiller) , list(list) { list.add(*this); } ~Waiter() { if (link.isLinked()) { list.remove(*this); } } }; const unsigned capacity_; unsigned used_ = 0; kj::List waiters; void unsafeRelease() { used_ -= 1; while (used_ < capacity_ && !waiters.empty()) { used_ += 1; auto & w = waiters.front(); w.fulfiller.fulfill(Token{*this, {}}); waiters.remove(w); } } public: explicit AsyncSemaphore(unsigned capacity) : capacity_(capacity) {} KJ_DISALLOW_COPY_AND_MOVE(AsyncSemaphore); ~AsyncSemaphore() { assert(waiters.empty() && "destroyed a semaphore with active waiters"); } std::optional tryAcquire() { if (used_ < capacity_) { used_ += 1; return Token{*this, {}}; } else { return {}; } } kj::Promise acquire() { if (auto t = tryAcquire()) { return std::move(*t); } else { return kj::newAdaptedPromise(waiters); } } unsigned capacity() const { return capacity_; } unsigned used() const { return used_; } unsigned available() const { return capacity_ - used_; } }; }