aboutsummaryrefslogtreecommitdiff
path: root/src/libutil
diff options
context:
space:
mode:
authorEelco Dolstra <eelco.dolstra@logicblox.com>2016-04-22 18:19:17 +0200
committerEelco Dolstra <eelco.dolstra@logicblox.com>2016-04-22 18:19:17 +0200
commitb2ce6fde5a46b0ebf32139cbbfe97294120f3a90 (patch)
tree7f1104a89f6f83681368d4cfe0c9f87ab944ab38 /src/libutil
parent58c84cda3bdf537dd68ecf5c6bfeb7e942472396 (diff)
ThreadPool: Start doing work as soon as work items are enqueued
Diffstat (limited to 'src/libutil')
-rw-r--r--src/libutil/thread-pool.cc130
-rw-r--r--src/libutil/thread-pool.hh13
2 files changed, 84 insertions, 59 deletions
diff --git a/src/libutil/thread-pool.cc b/src/libutil/thread-pool.cc
index 743038b58..32363ecf0 100644
--- a/src/libutil/thread-pool.cc
+++ b/src/libutil/thread-pool.cc
@@ -1,79 +1,99 @@
#include "thread-pool.hh"
+#include "affinity.hh"
namespace nix {
-ThreadPool::ThreadPool(size_t _nrThreads)
- : nrThreads(_nrThreads)
+ThreadPool::ThreadPool(size_t _maxThreads)
+ : maxThreads(_maxThreads)
{
- if (!nrThreads) {
- nrThreads = std::thread::hardware_concurrency();
- if (!nrThreads) nrThreads = 1;
+ restoreAffinity(); // FIXME
+
+ if (!maxThreads) {
+ maxThreads = std::thread::hardware_concurrency();
+ if (!maxThreads) maxThreads = 1;
}
+
+ debug(format("starting pool of %d threads") % maxThreads);
+}
+
+ThreadPool::~ThreadPool()
+{
+ std::vector<std::thread> workers;
+ {
+ auto state(state_.lock());
+ state->quit = true;
+ std::swap(workers, state->workers);
+ }
+
+ debug(format("reaping %d worker threads") % workers.size());
+
+ work.notify_all();
+
+ for (auto & thr : workers)
+ thr.join();
}
void ThreadPool::enqueue(const work_t & t)
{
- auto state_(state.lock());
- state_->left.push(t);
- wakeup.notify_one();
+ auto state(state_.lock());
+ assert(!state->quit);
+ state->left.push(t);
+ if (state->left.size() > state->workers.size() && state->workers.size() < maxThreads)
+ state->workers.emplace_back(&ThreadPool::workerEntry, this);
+ work.notify_one();
}
void ThreadPool::process()
{
- printMsg(lvlDebug, format("starting pool of %d threads") % nrThreads);
-
- std::vector<std::thread> workers;
+ while (true) {
+ auto state(state_.lock());
+ if (state->exception)
+ std::rethrow_exception(state->exception);
+ if (state->left.empty() && !state->pending) break;
+ state.wait(done);
+ }
+}
- for (size_t n = 0; n < nrThreads; n++)
- workers.push_back(std::thread([&]() {
- bool first = true;
+void ThreadPool::workerEntry()
+{
+ bool didWork = false;
+ while (true) {
+ work_t w;
+ {
+ auto state(state_.lock());
while (true) {
- work_t work;
- {
- auto state_(state.lock());
- if (state_->exception) return;
- if (!first) {
- assert(state_->pending);
- state_->pending--;
- }
- first = false;
- while (state_->left.empty()) {
- if (!state_->pending) {
- wakeup.notify_all();
- return;
- }
- if (state_->exception) return;
- state_.wait(wakeup);
- }
- work = state_->left.front();
- state_->left.pop();
- state_->pending++;
- }
-
- try {
- work();
- } catch (std::exception & e) {
- auto state_(state.lock());
- if (state_->exception) {
- if (!dynamic_cast<Interrupted*>(&e))
- printMsg(lvlError, format("error: %s") % e.what());
- } else {
- state_->exception = std::current_exception();
- wakeup.notify_all();
- }
+ if (state->quit || state->exception) return;
+ if (didWork) {
+ assert(state->pending);
+ state->pending--;
+ didWork = false;
}
+ if (!state->left.empty()) break;
+ if (!state->pending)
+ done.notify_all();
+ state.wait(work);
}
+ w = state->left.front();
+ state->left.pop();
+ state->pending++;
+ }
- }));
-
- for (auto & thr : workers)
- thr.join();
+ try {
+ w();
+ } catch (std::exception & e) {
+ auto state(state_.lock());
+ if (state->exception) {
+ if (!dynamic_cast<Interrupted*>(&e))
+ printMsg(lvlError, format("error: %s") % e.what());
+ } else {
+ state->exception = std::current_exception();
+ work.notify_all();
+ done.notify_all();
+ }
+ }
- {
- auto state_(state.lock());
- if (state_->exception)
- std::rethrow_exception(state_->exception);
+ didWork = true;
}
}
diff --git a/src/libutil/thread-pool.hh b/src/libutil/thread-pool.hh
index 77641d88b..939bcf1ef 100644
--- a/src/libutil/thread-pool.hh
+++ b/src/libutil/thread-pool.hh
@@ -15,7 +15,9 @@ class ThreadPool
{
public:
- ThreadPool(size_t nrThreads = 0);
+ ThreadPool(size_t maxThreads = 0);
+
+ ~ThreadPool();
// FIXME: use std::packaged_task?
typedef std::function<void()> work_t;
@@ -34,19 +36,22 @@ public:
private:
- size_t nrThreads;
+ size_t maxThreads;
struct State
{
std::queue<work_t> left;
size_t pending = 0;
std::exception_ptr exception;
+ std::vector<std::thread> workers;
+ bool quit = false;
};
- Sync<State> state;
+ Sync<State> state_;
- std::condition_variable wakeup;
+ std::condition_variable work, done;
+ void workerEntry();
};
}