diff options
Diffstat (limited to 'src/libutil')
-rw-r--r-- | src/libutil/async-collect.hh | 101 | ||||
-rw-r--r-- | src/libutil/async-semaphore.hh | 122 | ||||
-rw-r--r-- | src/libutil/current-process.cc | 2 | ||||
-rw-r--r-- | src/libutil/error.cc | 16 | ||||
-rw-r--r-- | src/libutil/error.hh | 23 | ||||
-rw-r--r-- | src/libutil/file-descriptor.cc | 2 | ||||
-rw-r--r-- | src/libutil/file-system.cc | 2 | ||||
-rw-r--r-- | src/libutil/fmt.hh | 14 | ||||
-rw-r--r-- | src/libutil/logging.cc | 2 | ||||
-rw-r--r-- | src/libutil/meson.build | 2 | ||||
-rw-r--r-- | src/libutil/processes.hh | 10 | ||||
-rw-r--r-- | src/libutil/serialise.cc | 2 | ||||
-rw-r--r-- | src/libutil/serialise.hh | 4 | ||||
-rw-r--r-- | src/libutil/signals.cc | 9 | ||||
-rw-r--r-- | src/libutil/signals.hh | 3 | ||||
-rw-r--r-- | src/libutil/thread-pool.cc | 18 |
16 files changed, 304 insertions, 28 deletions
diff --git a/src/libutil/async-collect.hh b/src/libutil/async-collect.hh new file mode 100644 index 000000000..9e0b8bad9 --- /dev/null +++ b/src/libutil/async-collect.hh @@ -0,0 +1,101 @@ +#pragma once +/// @file + +#include <kj/async.h> +#include <kj/common.h> +#include <kj/vector.h> +#include <list> +#include <optional> +#include <type_traits> + +namespace nix { + +template<typename K, typename V> +class AsyncCollect +{ +public: + using Item = std::conditional_t<std::is_void_v<V>, K, std::pair<K, V>>; + +private: + kj::ForkedPromise<void> allPromises; + std::list<Item> results; + size_t remaining; + + kj::ForkedPromise<void> signal; + kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> notify; + + void oneDone(Item item) + { + results.emplace_back(std::move(item)); + remaining -= 1; + KJ_IF_MAYBE (n, notify) { + (*n)->fulfill(); + notify = nullptr; + } + } + + kj::Promise<void> collectorFor(K key, kj::Promise<V> promise) + { + if constexpr (std::is_void_v<V>) { + return promise.then([this, key{std::move(key)}] { oneDone(std::move(key)); }); + } else { + return promise.then([this, key{std::move(key)}](V v) { + oneDone(Item{std::move(key), std::move(v)}); + }); + } + } + + kj::ForkedPromise<void> waitForAll(kj::Array<std::pair<K, kj::Promise<V>>> & promises) + { + kj::Vector<kj::Promise<void>> wrappers; + for (auto & [key, promise] : promises) { + wrappers.add(collectorFor(std::move(key), std::move(promise))); + } + + return kj::joinPromisesFailFast(wrappers.releaseAsArray()).fork(); + } + +public: + AsyncCollect(kj::Array<std::pair<K, kj::Promise<V>>> && promises) + : allPromises(waitForAll(promises)) + , remaining(promises.size()) + , signal{nullptr} + { + } + + kj::Promise<std::optional<Item>> next() + { + if (remaining == 0 && results.empty()) { + return {std::nullopt}; + } + + if (!results.empty()) { + auto result = std::move(results.front()); + results.pop_front(); + return {{std::move(result)}}; + } + + if (notify == nullptr) { + auto pair = kj::newPromiseAndFulfiller<void>(); + notify = std::move(pair.fulfiller); + signal = pair.promise.fork(); + } + + return signal.addBranch().exclusiveJoin(allPromises.addBranch()).then([this] { + return next(); + }); + } +}; + +/** + * Collect the results of a list of promises, in order of completion. + * Once any input promise is rejected all promises that have not been + * resolved or rejected will be cancelled and the exception rethrown. + */ +template<typename K, typename V> +AsyncCollect<K, V> asyncCollect(kj::Array<std::pair<K, kj::Promise<V>>> promises) +{ + return AsyncCollect<K, V>(std::move(promises)); +} + +} diff --git a/src/libutil/async-semaphore.hh b/src/libutil/async-semaphore.hh new file mode 100644 index 000000000..f8db31a68 --- /dev/null +++ b/src/libutil/async-semaphore.hh @@ -0,0 +1,122 @@ +#pragma once +/// @file +/// @brief A semaphore implementation usable from within a KJ event loop. + +#include <cassert> +#include <kj/async.h> +#include <kj/common.h> +#include <kj/exception.h> +#include <kj/list.h> +#include <kj/source-location.h> +#include <memory> +#include <optional> + +namespace nix { + +class AsyncSemaphore +{ +public: + class [[nodiscard("destroying a semaphore guard releases the semaphore immediately")]] Token + { + struct Release + { + void operator()(AsyncSemaphore * sem) const + { + sem->unsafeRelease(); + } + }; + + std::unique_ptr<AsyncSemaphore, Release> parent; + + public: + Token() = default; + Token(AsyncSemaphore & parent, kj::Badge<AsyncSemaphore>) : parent(&parent) {} + + bool valid() const + { + return parent != nullptr; + } + }; + +private: + struct Waiter + { + kj::PromiseFulfiller<Token> & fulfiller; + kj::ListLink<Waiter> link; + kj::List<Waiter, &Waiter::link> & list; + + Waiter(kj::PromiseFulfiller<Token> & fulfiller, kj::List<Waiter, &Waiter::link> & list) + : fulfiller(fulfiller) + , list(list) + { + list.add(*this); + } + + ~Waiter() + { + if (link.isLinked()) { + list.remove(*this); + } + } + }; + + const unsigned capacity_; + unsigned used_ = 0; + kj::List<Waiter, &Waiter::link> waiters; + + void unsafeRelease() + { + used_ -= 1; + while (used_ < capacity_ && !waiters.empty()) { + used_ += 1; + auto & w = waiters.front(); + w.fulfiller.fulfill(Token{*this, {}}); + waiters.remove(w); + } + } + +public: + explicit AsyncSemaphore(unsigned capacity) : capacity_(capacity) {} + + KJ_DISALLOW_COPY_AND_MOVE(AsyncSemaphore); + + ~AsyncSemaphore() + { + assert(waiters.empty() && "destroyed a semaphore with active waiters"); + } + + std::optional<Token> tryAcquire() + { + if (used_ < capacity_) { + used_ += 1; + return Token{*this, {}}; + } else { + return {}; + } + } + + kj::Promise<Token> acquire() + { + if (auto t = tryAcquire()) { + return std::move(*t); + } else { + return kj::newAdaptedPromise<Token, Waiter>(waiters); + } + } + + unsigned capacity() const + { + return capacity_; + } + + unsigned used() const + { + return used_; + } + + unsigned available() const + { + return capacity_ - used_; + } +}; +} diff --git a/src/libutil/current-process.cc b/src/libutil/current-process.cc index 33cda211b..3b3e46a9a 100644 --- a/src/libutil/current-process.cc +++ b/src/libutil/current-process.cc @@ -49,7 +49,7 @@ unsigned int getMaxCPU() auto period = cpuMaxParts[1]; if (quota != "max") return std::ceil(std::stoi(quota) / std::stof(period)); - } catch (Error &) { ignoreException(lvlDebug); } + } catch (Error &) { ignoreExceptionInDestructor(lvlDebug); } #endif return 0; diff --git a/src/libutil/error.cc b/src/libutil/error.cc index a7cbfbfd0..f57e3ef7d 100644 --- a/src/libutil/error.cc +++ b/src/libutil/error.cc @@ -4,6 +4,7 @@ #include "position.hh" #include "terminal.hh" #include "strings.hh" +#include "signals.hh" #include <iostream> #include <optional> @@ -132,7 +133,7 @@ static std::string indent(std::string_view indentFirst, std::string_view indentR /** * A development aid for finding missing positions, to improve error messages. Example use: * - * NIX_DEVELOPER_SHOW_UNKNOWN_LOCATIONS=1 _NIX_TEST_ACCEPT=1 make tests/lang.sh.test + * NIX_DEVELOPER_SHOW_UNKNOWN_LOCATIONS=1 _NIX_TEST_ACCEPT=1 just test --suite installcheck -v functional-lang * git diff -U20 tests * */ @@ -416,7 +417,7 @@ std::ostream & showErrorInfo(std::ostream & out, const ErrorInfo & einfo, bool s return out; } -void ignoreException(Verbosity lvl) +void ignoreExceptionInDestructor(Verbosity lvl) { /* Make sure no exceptions leave this function. printError() also throws when remote is closed. */ @@ -429,4 +430,15 @@ void ignoreException(Verbosity lvl) } catch (...) { } } +void ignoreExceptionExceptInterrupt(Verbosity lvl) +{ + try { + throw; + } catch (const Interrupted & e) { + throw; + } catch (std::exception & e) { + printMsg(lvl, "error (ignored): %1%", e.what()); + } +} + } diff --git a/src/libutil/error.hh b/src/libutil/error.hh index 73c1ccadd..885a2b218 100644 --- a/src/libutil/error.hh +++ b/src/libutil/error.hh @@ -70,17 +70,17 @@ inline bool operator<=(const Trace& lhs, const Trace& rhs); inline bool operator>=(const Trace& lhs, const Trace& rhs); struct ErrorInfo { - Verbosity level; + Verbosity level = Verbosity::lvlError; HintFmt msg; std::shared_ptr<Pos> pos; - std::list<Trace> traces; + std::list<Trace> traces = {}; /** * Exit status. */ unsigned int status = 1; - Suggestions suggestions; + Suggestions suggestions = {}; static std::optional<std::string> programName; }; @@ -204,7 +204,22 @@ public: /** * Exception handling in destructors: print an error message, then * ignore the exception. + * + * If you're not in a destructor, you usually want to use `ignoreExceptionExceptInterrupt()`. + * + * This function might also be used in callbacks whose caller may not handle exceptions, + * but ideally we propagate the exception using an exception_ptr in such cases. + * See e.g. `PackBuilderContext` + */ +void ignoreExceptionInDestructor(Verbosity lvl = lvlError); + +/** + * Not destructor-safe. + * Print an error message, then ignore the exception. + * If the exception is an `Interrupted` exception, rethrow it. + * + * This may be used in a few places where Interrupt can't happen, but that's ok. */ -void ignoreException(Verbosity lvl = lvlError); +void ignoreExceptionExceptInterrupt(Verbosity lvl = lvlError); } diff --git a/src/libutil/file-descriptor.cc b/src/libutil/file-descriptor.cc index 8385ea402..cbb2bb539 100644 --- a/src/libutil/file-descriptor.cc +++ b/src/libutil/file-descriptor.cc @@ -146,7 +146,7 @@ AutoCloseFD::~AutoCloseFD() try { close(); } catch (...) { - ignoreException(); + ignoreExceptionInDestructor(); } } diff --git a/src/libutil/file-system.cc b/src/libutil/file-system.cc index 1d3eba58f..c4ffb1d0c 100644 --- a/src/libutil/file-system.cc +++ b/src/libutil/file-system.cc @@ -522,7 +522,7 @@ AutoDelete::~AutoDelete() } } } catch (...) { - ignoreException(); + ignoreExceptionInDestructor(); } } diff --git a/src/libutil/fmt.hh b/src/libutil/fmt.hh index ee3e1e2e7..5feefdf90 100644 --- a/src/libutil/fmt.hh +++ b/src/libutil/fmt.hh @@ -136,11 +136,17 @@ inline std::string fmt(const char * s) template<typename... Args> inline std::string fmt(const std::string & fs, const Args &... args) -{ +try { boost::format f(fs); fmt_internal::setExceptions(f); (f % ... % args); return f.str(); +} catch (boost::io::format_error & fe) { + // I don't care who catches this, we do not put up with boost format errors + // Give me a stack trace and a core dump + std::cerr << "nix::fmt threw format error. Original format string: '"; + std::cerr << fs << "'; number of arguments: " << sizeof...(args) << "\n"; + std::terminate(); } /** @@ -174,15 +180,13 @@ public: std::cerr << "HintFmt received incorrect number of format args. Original format string: '"; std::cerr << format << "'; number of arguments: " << sizeof...(args) << "\n"; // And regardless of the coredump give me a damn stacktrace. - printStackTrace(); - abort(); + std::terminate(); } } catch (boost::io::format_error & ex) { // Same thing, but for anything that happens in the member initializers. std::cerr << "HintFmt received incorrect format string. Original format string: '"; std::cerr << format << "'; number of arguments: " << sizeof...(args) << "\n"; - printStackTrace(); - abort(); + std::terminate(); } HintFmt(const HintFmt & hf) : fmt(hf.fmt) {} diff --git a/src/libutil/logging.cc b/src/libutil/logging.cc index 7d9482814..7609e6e39 100644 --- a/src/libutil/logging.cc +++ b/src/libutil/logging.cc @@ -352,7 +352,7 @@ Activity::~Activity() try { logger.stopActivity(id); } catch (...) { - ignoreException(); + ignoreExceptionInDestructor(); } } diff --git a/src/libutil/meson.build b/src/libutil/meson.build index a3f21de59..afca4e021 100644 --- a/src/libutil/meson.build +++ b/src/libutil/meson.build @@ -53,6 +53,8 @@ libutil_headers = files( 'archive.hh', 'args/root.hh', 'args.hh', + 'async-collect.hh', + 'async-semaphore.hh', 'backed-string-view.hh', 'box_ptr.hh', 'canon-path.hh', diff --git a/src/libutil/processes.hh b/src/libutil/processes.hh index dc09a9ba4..dd6e2978e 100644 --- a/src/libutil/processes.hh +++ b/src/libutil/processes.hh @@ -78,11 +78,11 @@ struct RunOptions { Path program; bool searchPath = true; - Strings args; - std::optional<uid_t> uid; - std::optional<uid_t> gid; - std::optional<Path> chdir; - std::optional<std::map<std::string, std::string>> environment; + Strings args = {}; + std::optional<uid_t> uid = {}; + std::optional<uid_t> gid = {}; + std::optional<Path> chdir = {}; + std::optional<std::map<std::string, std::string>> environment = {}; bool captureStdout = false; bool mergeStderrToStdout = false; bool isInteractive = false; diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc index f509fedff..2f5a11a28 100644 --- a/src/libutil/serialise.cc +++ b/src/libutil/serialise.cc @@ -83,7 +83,7 @@ void BufferedSink::flush() FdSink::~FdSink() { - try { flush(); } catch (...) { ignoreException(); } + try { flush(); } catch (...) { ignoreExceptionInDestructor(); } } diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh index 3a9685e0e..08ea9a135 100644 --- a/src/libutil/serialise.hh +++ b/src/libutil/serialise.hh @@ -549,7 +549,7 @@ struct FramedSource : Source } } } catch (...) { - ignoreException(); + ignoreExceptionInDestructor(); } } @@ -595,7 +595,7 @@ struct FramedSink : nix::BufferedSink to << 0; to.flush(); } catch (...) { - ignoreException(); + ignoreExceptionInDestructor(); } } diff --git a/src/libutil/signals.cc b/src/libutil/signals.cc index 04a697d01..dac2964ae 100644 --- a/src/libutil/signals.cc +++ b/src/libutil/signals.cc @@ -12,13 +12,18 @@ std::atomic<bool> _isInterrupted = false; thread_local std::function<bool()> interruptCheck; +Interrupted makeInterrupted() +{ + return Interrupted("interrupted by the user"); +} + void _interrupted() { /* Block user interrupts while an exception is being handled. Throwing an exception while another exception is being handled kills the program! */ if (!std::uncaught_exceptions()) { - throw Interrupted("interrupted by the user"); + throw makeInterrupted(); } } @@ -78,7 +83,7 @@ void triggerInterrupt() try { callback(); } catch (...) { - ignoreException(); + ignoreExceptionInDestructor(); } } } diff --git a/src/libutil/signals.hh b/src/libutil/signals.hh index 02f8d2ca3..538ff94b4 100644 --- a/src/libutil/signals.hh +++ b/src/libutil/signals.hh @@ -16,10 +16,13 @@ namespace nix { /* User interruption. */ +class Interrupted; + extern std::atomic<bool> _isInterrupted; extern thread_local std::function<bool()> interruptCheck; +Interrupted makeInterrupted(); void _interrupted(); void inline checkInterrupt() diff --git a/src/libutil/thread-pool.cc b/src/libutil/thread-pool.cc index 0ff83e997..1c4488373 100644 --- a/src/libutil/thread-pool.cc +++ b/src/libutil/thread-pool.cc @@ -109,9 +109,21 @@ void ThreadPool::doWork(bool mainThread) try { std::rethrow_exception(exc); } catch (std::exception & e) { - if (!dynamic_cast<Interrupted*>(&e) && - !dynamic_cast<ThreadPoolShutDown*>(&e)) - ignoreException(); + if (!dynamic_cast<ThreadPoolShutDown*>(&e)) { + // Yes, this is not a destructor, but we cannot + // safely propagate an exception out of here. + // + // What happens is that if we do, shutdown() + // will have join() throw an exception if we + // are on a worker thread, preventing us from + // joining the rest of the threads. Although we + // could make the joining eat exceptions too, + // we could just as well not let Interrupted + // fall out to begin with, since the thread + // will immediately cleanly quit because of + // quit == true anyway. + ignoreExceptionInDestructor(); + } } catch (...) { } } |