aboutsummaryrefslogtreecommitdiff
path: root/src/libutil/thread-pool.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/libutil/thread-pool.cc')
-rw-r--r--src/libutil/thread-pool.cc81
1 files changed, 81 insertions, 0 deletions
diff --git a/src/libutil/thread-pool.cc b/src/libutil/thread-pool.cc
new file mode 100644
index 000000000..819aed748
--- /dev/null
+++ b/src/libutil/thread-pool.cc
@@ -0,0 +1,81 @@
+#include "thread-pool.hh"
+
+namespace nix {
+
+ThreadPool::ThreadPool(size_t _nrThreads)
+ : nrThreads(_nrThreads)
+{
+ if (!nrThreads) {
+ nrThreads = std::thread::hardware_concurrency();
+ if (!nrThreads) nrThreads = 1;
+ }
+}
+
+void ThreadPool::enqueue(const work_t & t)
+{
+ auto state_(state.lock());
+ state_->left.push(t);
+ wakeup.notify_one();
+}
+
+void ThreadPool::process()
+{
+ printMsg(lvlDebug, format("starting pool of %d threads") % nrThreads);
+
+ std::vector<std::thread> workers;
+
+ for (size_t n = 0; n < nrThreads; n++)
+ workers.push_back(std::thread([&]() {
+ bool first = true;
+
+ while (true) {
+ work_t work;
+ {
+ auto state_(state.lock());
+ if (state_->exception) return;
+ if (!first) {
+ assert(state_->pending);
+ state_->pending--;
+ }
+ first = false;
+ while (state_->left.empty()) {
+ if (!state_->pending) {
+ wakeup.notify_all();
+ return;
+ }
+ if (state_->exception) return;
+ state_.wait(wakeup);
+ }
+ work = state_->left.front();
+ state_->left.pop();
+ state_->pending++;
+ }
+
+ try {
+ work();
+ } catch (std::exception & e) {
+ auto state_(state.lock());
+ if (state_->exception)
+ printMsg(lvlError, format("error: %s") % e.what());
+ else {
+ state_->exception = std::current_exception();
+ wakeup.notify_all();
+ }
+ }
+ }
+
+ }));
+
+ for (auto & thr : workers)
+ thr.join();
+
+ {
+ auto state_(state.lock());
+ if (state_->exception)
+ std::rethrow_exception(state_->exception);
+ }
+}
+
+}
+
+