aboutsummaryrefslogtreecommitdiff
path: root/src/libutil
diff options
context:
space:
mode:
Diffstat (limited to 'src/libutil')
-rw-r--r--src/libutil/async-collect.hh101
-rw-r--r--src/libutil/async-semaphore.hh122
-rw-r--r--src/libutil/current-process.cc2
-rw-r--r--src/libutil/error.cc16
-rw-r--r--src/libutil/error.hh23
-rw-r--r--src/libutil/file-descriptor.cc2
-rw-r--r--src/libutil/file-system.cc2
-rw-r--r--src/libutil/fmt.hh14
-rw-r--r--src/libutil/logging.cc2
-rw-r--r--src/libutil/meson.build2
-rw-r--r--src/libutil/processes.hh10
-rw-r--r--src/libutil/serialise.cc2
-rw-r--r--src/libutil/serialise.hh4
-rw-r--r--src/libutil/signals.cc9
-rw-r--r--src/libutil/signals.hh3
-rw-r--r--src/libutil/thread-pool.cc18
16 files changed, 304 insertions, 28 deletions
diff --git a/src/libutil/async-collect.hh b/src/libutil/async-collect.hh
new file mode 100644
index 000000000..9e0b8bad9
--- /dev/null
+++ b/src/libutil/async-collect.hh
@@ -0,0 +1,101 @@
+#pragma once
+/// @file
+
+#include <kj/async.h>
+#include <kj/common.h>
+#include <kj/vector.h>
+#include <list>
+#include <optional>
+#include <type_traits>
+
+namespace nix {
+
+template<typename K, typename V>
+class AsyncCollect
+{
+public:
+ using Item = std::conditional_t<std::is_void_v<V>, K, std::pair<K, V>>;
+
+private:
+ kj::ForkedPromise<void> allPromises;
+ std::list<Item> results;
+ size_t remaining;
+
+ kj::ForkedPromise<void> signal;
+ kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> notify;
+
+ void oneDone(Item item)
+ {
+ results.emplace_back(std::move(item));
+ remaining -= 1;
+ KJ_IF_MAYBE (n, notify) {
+ (*n)->fulfill();
+ notify = nullptr;
+ }
+ }
+
+ kj::Promise<void> collectorFor(K key, kj::Promise<V> promise)
+ {
+ if constexpr (std::is_void_v<V>) {
+ return promise.then([this, key{std::move(key)}] { oneDone(std::move(key)); });
+ } else {
+ return promise.then([this, key{std::move(key)}](V v) {
+ oneDone(Item{std::move(key), std::move(v)});
+ });
+ }
+ }
+
+ kj::ForkedPromise<void> waitForAll(kj::Array<std::pair<K, kj::Promise<V>>> & promises)
+ {
+ kj::Vector<kj::Promise<void>> wrappers;
+ for (auto & [key, promise] : promises) {
+ wrappers.add(collectorFor(std::move(key), std::move(promise)));
+ }
+
+ return kj::joinPromisesFailFast(wrappers.releaseAsArray()).fork();
+ }
+
+public:
+ AsyncCollect(kj::Array<std::pair<K, kj::Promise<V>>> && promises)
+ : allPromises(waitForAll(promises))
+ , remaining(promises.size())
+ , signal{nullptr}
+ {
+ }
+
+ kj::Promise<std::optional<Item>> next()
+ {
+ if (remaining == 0 && results.empty()) {
+ return {std::nullopt};
+ }
+
+ if (!results.empty()) {
+ auto result = std::move(results.front());
+ results.pop_front();
+ return {{std::move(result)}};
+ }
+
+ if (notify == nullptr) {
+ auto pair = kj::newPromiseAndFulfiller<void>();
+ notify = std::move(pair.fulfiller);
+ signal = pair.promise.fork();
+ }
+
+ return signal.addBranch().exclusiveJoin(allPromises.addBranch()).then([this] {
+ return next();
+ });
+ }
+};
+
+/**
+ * Collect the results of a list of promises, in order of completion.
+ * Once any input promise is rejected all promises that have not been
+ * resolved or rejected will be cancelled and the exception rethrown.
+ */
+template<typename K, typename V>
+AsyncCollect<K, V> asyncCollect(kj::Array<std::pair<K, kj::Promise<V>>> promises)
+{
+ return AsyncCollect<K, V>(std::move(promises));
+}
+
+}
diff --git a/src/libutil/async-semaphore.hh b/src/libutil/async-semaphore.hh
new file mode 100644
index 000000000..f8db31a68
--- /dev/null
+++ b/src/libutil/async-semaphore.hh
@@ -0,0 +1,122 @@
+#pragma once
+/// @file
+/// @brief A semaphore implementation usable from within a KJ event loop.
+
+#include <cassert>
+#include <kj/async.h>
+#include <kj/common.h>
+#include <kj/exception.h>
+#include <kj/list.h>
+#include <kj/source-location.h>
+#include <memory>
+#include <optional>
+
+namespace nix {
+
+class AsyncSemaphore
+{
+public:
+ class [[nodiscard("destroying a semaphore guard releases the semaphore immediately")]] Token
+ {
+ struct Release
+ {
+ void operator()(AsyncSemaphore * sem) const
+ {
+ sem->unsafeRelease();
+ }
+ };
+
+ std::unique_ptr<AsyncSemaphore, Release> parent;
+
+ public:
+ Token() = default;
+ Token(AsyncSemaphore & parent, kj::Badge<AsyncSemaphore>) : parent(&parent) {}
+
+ bool valid() const
+ {
+ return parent != nullptr;
+ }
+ };
+
+private:
+ struct Waiter
+ {
+ kj::PromiseFulfiller<Token> & fulfiller;
+ kj::ListLink<Waiter> link;
+ kj::List<Waiter, &Waiter::link> & list;
+
+ Waiter(kj::PromiseFulfiller<Token> & fulfiller, kj::List<Waiter, &Waiter::link> & list)
+ : fulfiller(fulfiller)
+ , list(list)
+ {
+ list.add(*this);
+ }
+
+ ~Waiter()
+ {
+ if (link.isLinked()) {
+ list.remove(*this);
+ }
+ }
+ };
+
+ const unsigned capacity_;
+ unsigned used_ = 0;
+ kj::List<Waiter, &Waiter::link> waiters;
+
+ void unsafeRelease()
+ {
+ used_ -= 1;
+ while (used_ < capacity_ && !waiters.empty()) {
+ used_ += 1;
+ auto & w = waiters.front();
+ w.fulfiller.fulfill(Token{*this, {}});
+ waiters.remove(w);
+ }
+ }
+
+public:
+ explicit AsyncSemaphore(unsigned capacity) : capacity_(capacity) {}
+
+ KJ_DISALLOW_COPY_AND_MOVE(AsyncSemaphore);
+
+ ~AsyncSemaphore()
+ {
+ assert(waiters.empty() && "destroyed a semaphore with active waiters");
+ }
+
+ std::optional<Token> tryAcquire()
+ {
+ if (used_ < capacity_) {
+ used_ += 1;
+ return Token{*this, {}};
+ } else {
+ return {};
+ }
+ }
+
+ kj::Promise<Token> acquire()
+ {
+ if (auto t = tryAcquire()) {
+ return std::move(*t);
+ } else {
+ return kj::newAdaptedPromise<Token, Waiter>(waiters);
+ }
+ }
+
+ unsigned capacity() const
+ {
+ return capacity_;
+ }
+
+ unsigned used() const
+ {
+ return used_;
+ }
+
+ unsigned available() const
+ {
+ return capacity_ - used_;
+ }
+};
+}
diff --git a/src/libutil/current-process.cc b/src/libutil/current-process.cc
index 33cda211b..3b3e46a9a 100644
--- a/src/libutil/current-process.cc
+++ b/src/libutil/current-process.cc
@@ -49,7 +49,7 @@ unsigned int getMaxCPU()
auto period = cpuMaxParts[1];
if (quota != "max")
return std::ceil(std::stoi(quota) / std::stof(period));
- } catch (Error &) { ignoreException(lvlDebug); }
+ } catch (Error &) { ignoreExceptionInDestructor(lvlDebug); }
#endif
return 0;
diff --git a/src/libutil/error.cc b/src/libutil/error.cc
index a7cbfbfd0..f57e3ef7d 100644
--- a/src/libutil/error.cc
+++ b/src/libutil/error.cc
@@ -4,6 +4,7 @@
#include "position.hh"
#include "terminal.hh"
#include "strings.hh"
+#include "signals.hh"
#include <iostream>
#include <optional>
@@ -132,7 +133,7 @@ static std::string indent(std::string_view indentFirst, std::string_view indentR
/**
* A development aid for finding missing positions, to improve error messages. Example use:
*
- * NIX_DEVELOPER_SHOW_UNKNOWN_LOCATIONS=1 _NIX_TEST_ACCEPT=1 make tests/lang.sh.test
+ * NIX_DEVELOPER_SHOW_UNKNOWN_LOCATIONS=1 _NIX_TEST_ACCEPT=1 just test --suite installcheck -v functional-lang
* git diff -U20 tests
*
*/
@@ -416,7 +417,7 @@ std::ostream & showErrorInfo(std::ostream & out, const ErrorInfo & einfo, bool s
return out;
}
-void ignoreException(Verbosity lvl)
+void ignoreExceptionInDestructor(Verbosity lvl)
{
/* Make sure no exceptions leave this function.
printError() also throws when remote is closed. */
@@ -429,4 +430,15 @@ void ignoreException(Verbosity lvl)
} catch (...) { }
}
+void ignoreExceptionExceptInterrupt(Verbosity lvl)
+{
+ try {
+ throw;
+ } catch (const Interrupted & e) {
+ throw;
+ } catch (std::exception & e) {
+ printMsg(lvl, "error (ignored): %1%", e.what());
+ }
+}
+
}
diff --git a/src/libutil/error.hh b/src/libutil/error.hh
index 73c1ccadd..885a2b218 100644
--- a/src/libutil/error.hh
+++ b/src/libutil/error.hh
@@ -70,17 +70,17 @@ inline bool operator<=(const Trace& lhs, const Trace& rhs);
inline bool operator>=(const Trace& lhs, const Trace& rhs);
struct ErrorInfo {
- Verbosity level;
+ Verbosity level = Verbosity::lvlError;
HintFmt msg;
std::shared_ptr<Pos> pos;
- std::list<Trace> traces;
+ std::list<Trace> traces = {};
/**
* Exit status.
*/
unsigned int status = 1;
- Suggestions suggestions;
+ Suggestions suggestions = {};
static std::optional<std::string> programName;
};
@@ -204,7 +204,22 @@ public:
/**
* Exception handling in destructors: print an error message, then
* ignore the exception.
+ *
+ * If you're not in a destructor, you usually want to use `ignoreExceptionExceptInterrupt()`.
+ *
+ * This function might also be used in callbacks whose caller may not handle exceptions,
+ * but ideally we propagate the exception using an exception_ptr in such cases.
+ * See e.g. `PackBuilderContext`
+ */
+void ignoreExceptionInDestructor(Verbosity lvl = lvlError);
+
+/**
+ * Not destructor-safe.
+ * Print an error message, then ignore the exception.
+ * If the exception is an `Interrupted` exception, rethrow it.
+ *
+ * This may be used in a few places where Interrupt can't happen, but that's ok.
*/
-void ignoreException(Verbosity lvl = lvlError);
+void ignoreExceptionExceptInterrupt(Verbosity lvl = lvlError);
}
diff --git a/src/libutil/file-descriptor.cc b/src/libutil/file-descriptor.cc
index 8385ea402..cbb2bb539 100644
--- a/src/libutil/file-descriptor.cc
+++ b/src/libutil/file-descriptor.cc
@@ -146,7 +146,7 @@ AutoCloseFD::~AutoCloseFD()
try {
close();
} catch (...) {
- ignoreException();
+ ignoreExceptionInDestructor();
}
}
diff --git a/src/libutil/file-system.cc b/src/libutil/file-system.cc
index 1d3eba58f..c4ffb1d0c 100644
--- a/src/libutil/file-system.cc
+++ b/src/libutil/file-system.cc
@@ -522,7 +522,7 @@ AutoDelete::~AutoDelete()
}
}
} catch (...) {
- ignoreException();
+ ignoreExceptionInDestructor();
}
}
diff --git a/src/libutil/fmt.hh b/src/libutil/fmt.hh
index ee3e1e2e7..5feefdf90 100644
--- a/src/libutil/fmt.hh
+++ b/src/libutil/fmt.hh
@@ -136,11 +136,17 @@ inline std::string fmt(const char * s)
template<typename... Args>
inline std::string fmt(const std::string & fs, const Args &... args)
-{
+try {
boost::format f(fs);
fmt_internal::setExceptions(f);
(f % ... % args);
return f.str();
+} catch (boost::io::format_error & fe) {
+ // I don't care who catches this, we do not put up with boost format errors
+ // Give me a stack trace and a core dump
+ std::cerr << "nix::fmt threw format error. Original format string: '";
+ std::cerr << fs << "'; number of arguments: " << sizeof...(args) << "\n";
+ std::terminate();
}
/**
@@ -174,15 +180,13 @@ public:
std::cerr << "HintFmt received incorrect number of format args. Original format string: '";
std::cerr << format << "'; number of arguments: " << sizeof...(args) << "\n";
// And regardless of the coredump give me a damn stacktrace.
- printStackTrace();
- abort();
+ std::terminate();
}
} catch (boost::io::format_error & ex) {
// Same thing, but for anything that happens in the member initializers.
std::cerr << "HintFmt received incorrect format string. Original format string: '";
std::cerr << format << "'; number of arguments: " << sizeof...(args) << "\n";
- printStackTrace();
- abort();
+ std::terminate();
}
HintFmt(const HintFmt & hf) : fmt(hf.fmt) {}
diff --git a/src/libutil/logging.cc b/src/libutil/logging.cc
index 7d9482814..7609e6e39 100644
--- a/src/libutil/logging.cc
+++ b/src/libutil/logging.cc
@@ -352,7 +352,7 @@ Activity::~Activity()
try {
logger.stopActivity(id);
} catch (...) {
- ignoreException();
+ ignoreExceptionInDestructor();
}
}
diff --git a/src/libutil/meson.build b/src/libutil/meson.build
index a3f21de59..afca4e021 100644
--- a/src/libutil/meson.build
+++ b/src/libutil/meson.build
@@ -53,6 +53,8 @@ libutil_headers = files(
'archive.hh',
'args/root.hh',
'args.hh',
+ 'async-collect.hh',
+ 'async-semaphore.hh',
'backed-string-view.hh',
'box_ptr.hh',
'canon-path.hh',
diff --git a/src/libutil/processes.hh b/src/libutil/processes.hh
index dc09a9ba4..dd6e2978e 100644
--- a/src/libutil/processes.hh
+++ b/src/libutil/processes.hh
@@ -78,11 +78,11 @@ struct RunOptions
{
Path program;
bool searchPath = true;
- Strings args;
- std::optional<uid_t> uid;
- std::optional<uid_t> gid;
- std::optional<Path> chdir;
- std::optional<std::map<std::string, std::string>> environment;
+ Strings args = {};
+ std::optional<uid_t> uid = {};
+ std::optional<uid_t> gid = {};
+ std::optional<Path> chdir = {};
+ std::optional<std::map<std::string, std::string>> environment = {};
bool captureStdout = false;
bool mergeStderrToStdout = false;
bool isInteractive = false;
diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc
index f509fedff..2f5a11a28 100644
--- a/src/libutil/serialise.cc
+++ b/src/libutil/serialise.cc
@@ -83,7 +83,7 @@ void BufferedSink::flush()
FdSink::~FdSink()
{
- try { flush(); } catch (...) { ignoreException(); }
+ try { flush(); } catch (...) { ignoreExceptionInDestructor(); }
}
diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh
index 3a9685e0e..08ea9a135 100644
--- a/src/libutil/serialise.hh
+++ b/src/libutil/serialise.hh
@@ -549,7 +549,7 @@ struct FramedSource : Source
}
}
} catch (...) {
- ignoreException();
+ ignoreExceptionInDestructor();
}
}
@@ -595,7 +595,7 @@ struct FramedSink : nix::BufferedSink
to << 0;
to.flush();
} catch (...) {
- ignoreException();
+ ignoreExceptionInDestructor();
}
}
diff --git a/src/libutil/signals.cc b/src/libutil/signals.cc
index 04a697d01..dac2964ae 100644
--- a/src/libutil/signals.cc
+++ b/src/libutil/signals.cc
@@ -12,13 +12,18 @@ std::atomic<bool> _isInterrupted = false;
thread_local std::function<bool()> interruptCheck;
+Interrupted makeInterrupted()
+{
+ return Interrupted("interrupted by the user");
+}
+
void _interrupted()
{
/* Block user interrupts while an exception is being handled.
Throwing an exception while another exception is being handled
kills the program! */
if (!std::uncaught_exceptions()) {
- throw Interrupted("interrupted by the user");
+ throw makeInterrupted();
}
}
@@ -78,7 +83,7 @@ void triggerInterrupt()
try {
callback();
} catch (...) {
- ignoreException();
+ ignoreExceptionInDestructor();
}
}
}
diff --git a/src/libutil/signals.hh b/src/libutil/signals.hh
index 02f8d2ca3..538ff94b4 100644
--- a/src/libutil/signals.hh
+++ b/src/libutil/signals.hh
@@ -16,10 +16,13 @@ namespace nix {
/* User interruption. */
+class Interrupted;
+
extern std::atomic<bool> _isInterrupted;
extern thread_local std::function<bool()> interruptCheck;
+Interrupted makeInterrupted();
void _interrupted();
void inline checkInterrupt()
diff --git a/src/libutil/thread-pool.cc b/src/libutil/thread-pool.cc
index 0ff83e997..1c4488373 100644
--- a/src/libutil/thread-pool.cc
+++ b/src/libutil/thread-pool.cc
@@ -109,9 +109,21 @@ void ThreadPool::doWork(bool mainThread)
try {
std::rethrow_exception(exc);
} catch (std::exception & e) {
- if (!dynamic_cast<Interrupted*>(&e) &&
- !dynamic_cast<ThreadPoolShutDown*>(&e))
- ignoreException();
+ if (!dynamic_cast<ThreadPoolShutDown*>(&e)) {
+ // Yes, this is not a destructor, but we cannot
+ // safely propagate an exception out of here.
+ //
+ // What happens is that if we do, shutdown()
+ // will have join() throw an exception if we
+ // are on a worker thread, preventing us from
+ // joining the rest of the threads. Although we
+ // could make the joining eat exceptions too,
+ // we could just as well not let Interrupted
+ // fall out to begin with, since the thread
+ // will immediately cleanly quit because of
+ // quit == true anyway.
+ ignoreExceptionInDestructor();
+ }
} catch (...) {
}
}