aboutsummaryrefslogtreecommitdiff
path: root/src/libutil
diff options
context:
space:
mode:
authorEelco Dolstra <eelco.dolstra@logicblox.com>2016-03-29 14:29:50 +0200
committerEelco Dolstra <eelco.dolstra@logicblox.com>2016-03-29 16:37:16 +0200
commit784ee35c80774c5f073b6b8be6ab3d4d7e38e2f1 (patch)
tree1442cbfe72d75f5cb1dd8d68706220e5821ab374 /src/libutil
parent0ebe69dc67853e9e2b2b7b22069e766a7cbc057d (diff)
Add "nix verify-paths" command
Unlike "nix-store --verify-path", this command verifies signatures in addition to store path contents, is multi-threaded (especially useful when verifying binary caches), and has a progress indicator. Example use: $ nix verify-paths --store https://cache.nixos.org -r $(type -p thunderbird) ... [17/132 checked] checking ‘/nix/store/rawakphadqrqxr6zri2rmnxh03gqkrl3-autogen-5.18.6’
Diffstat (limited to 'src/libutil')
-rw-r--r--src/libutil/thread-pool.cc81
-rw-r--r--src/libutil/thread-pool.hh52
-rw-r--r--src/libutil/util.cc2
-rw-r--r--src/libutil/util.hh2
4 files changed, 135 insertions, 2 deletions
diff --git a/src/libutil/thread-pool.cc b/src/libutil/thread-pool.cc
new file mode 100644
index 000000000..819aed748
--- /dev/null
+++ b/src/libutil/thread-pool.cc
@@ -0,0 +1,81 @@
+#include "thread-pool.hh"
+
+namespace nix {
+
+ThreadPool::ThreadPool(size_t _nrThreads)
+ : nrThreads(_nrThreads)
+{
+ if (!nrThreads) {
+ nrThreads = std::thread::hardware_concurrency();
+ if (!nrThreads) nrThreads = 1;
+ }
+}
+
+void ThreadPool::enqueue(const work_t & t)
+{
+ auto state_(state.lock());
+ state_->left.push(t);
+ wakeup.notify_one();
+}
+
+void ThreadPool::process()
+{
+ printMsg(lvlDebug, format("starting pool of %d threads") % nrThreads);
+
+ std::vector<std::thread> workers;
+
+ for (size_t n = 0; n < nrThreads; n++)
+ workers.push_back(std::thread([&]() {
+ bool first = true;
+
+ while (true) {
+ work_t work;
+ {
+ auto state_(state.lock());
+ if (state_->exception) return;
+ if (!first) {
+ assert(state_->pending);
+ state_->pending--;
+ }
+ first = false;
+ while (state_->left.empty()) {
+ if (!state_->pending) {
+ wakeup.notify_all();
+ return;
+ }
+ if (state_->exception) return;
+ state_.wait(wakeup);
+ }
+ work = state_->left.front();
+ state_->left.pop();
+ state_->pending++;
+ }
+
+ try {
+ work();
+ } catch (std::exception & e) {
+ auto state_(state.lock());
+ if (state_->exception)
+ printMsg(lvlError, format("error: %s") % e.what());
+ else {
+ state_->exception = std::current_exception();
+ wakeup.notify_all();
+ }
+ }
+ }
+
+ }));
+
+ for (auto & thr : workers)
+ thr.join();
+
+ {
+ auto state_(state.lock());
+ if (state_->exception)
+ std::rethrow_exception(state_->exception);
+ }
+}
+
+}
+
+
diff --git a/src/libutil/thread-pool.hh b/src/libutil/thread-pool.hh
new file mode 100644
index 000000000..77641d88b
--- /dev/null
+++ b/src/libutil/thread-pool.hh
@@ -0,0 +1,52 @@
+#pragma once
+
+#include "sync.hh"
+#include "util.hh"
+
+#include <queue>
+#include <functional>
+#include <thread>
+
+namespace nix {
+
+/* A simple thread pool that executes a queue of work items
+ (lambdas). */
+class ThreadPool
+{
+public:
+
+ ThreadPool(size_t nrThreads = 0);
+
+ // FIXME: use std::packaged_task?
+ typedef std::function<void()> work_t;
+
+ /* Enqueue a function to be executed by the thread pool. */
+ void enqueue(const work_t & t);
+
+ /* Execute work items until the queue is empty. Note that work
+ items are allowed to add new items to the queue; this is
+ handled correctly. Queue processing stops prematurely if any
+ work item throws an exception. This exception is propagated to
+ the calling thread. If multiple work items throw an exception
+ concurrently, only one item is propagated; the others are
+ printed on stderr and otherwise ignored. */
+ void process();
+
+private:
+
+ size_t nrThreads;
+
+ struct State
+ {
+ std::queue<work_t> left;
+ size_t pending = 0;
+ std::exception_ptr exception;
+ };
+
+ Sync<State> state;
+
+ std::condition_variable wakeup;
+
+};
+
+}
diff --git a/src/libutil/util.cc b/src/libutil/util.cc
index 25246a3e8..d4ac3fb60 100644
--- a/src/libutil/util.cc
+++ b/src/libutil/util.cc
@@ -548,7 +548,7 @@ void writeToStderr(const string & s)
}
-void (*_writeToStderr) (const unsigned char * buf, size_t count) = 0;
+std::function<void(const unsigned char * buf, size_t count)> _writeToStderr;
void readFull(int fd, unsigned char * buf, size_t count)
diff --git a/src/libutil/util.hh b/src/libutil/util.hh
index 3606f6ec9..0a72cc592 100644
--- a/src/libutil/util.hh
+++ b/src/libutil/util.hh
@@ -167,7 +167,7 @@ void warnOnce(bool & haveWarned, const FormatOrString & fs);
void writeToStderr(const string & s);
-extern void (*_writeToStderr) (const unsigned char * buf, size_t count);
+extern std::function<void(const unsigned char * buf, size_t count)> _writeToStderr;
/* Wrappers arount read()/write() that read/write exactly the