diff options
author | Eelco Dolstra <eelco.dolstra@logicblox.com> | 2016-04-22 18:19:17 +0200 |
---|---|---|
committer | Eelco Dolstra <eelco.dolstra@logicblox.com> | 2016-04-22 18:19:17 +0200 |
commit | b2ce6fde5a46b0ebf32139cbbfe97294120f3a90 (patch) | |
tree | 7f1104a89f6f83681368d4cfe0c9f87ab944ab38 /src/libutil | |
parent | 58c84cda3bdf537dd68ecf5c6bfeb7e942472396 (diff) |
ThreadPool: Start doing work as soon as work items are enqueued
Diffstat (limited to 'src/libutil')
-rw-r--r-- | src/libutil/thread-pool.cc | 130 | ||||
-rw-r--r-- | src/libutil/thread-pool.hh | 13 |
2 files changed, 84 insertions, 59 deletions
diff --git a/src/libutil/thread-pool.cc b/src/libutil/thread-pool.cc index 743038b58..32363ecf0 100644 --- a/src/libutil/thread-pool.cc +++ b/src/libutil/thread-pool.cc @@ -1,79 +1,99 @@ #include "thread-pool.hh" +#include "affinity.hh" namespace nix { -ThreadPool::ThreadPool(size_t _nrThreads) - : nrThreads(_nrThreads) +ThreadPool::ThreadPool(size_t _maxThreads) + : maxThreads(_maxThreads) { - if (!nrThreads) { - nrThreads = std::thread::hardware_concurrency(); - if (!nrThreads) nrThreads = 1; + restoreAffinity(); // FIXME + + if (!maxThreads) { + maxThreads = std::thread::hardware_concurrency(); + if (!maxThreads) maxThreads = 1; } + + debug(format("starting pool of %d threads") % maxThreads); +} + +ThreadPool::~ThreadPool() +{ + std::vector<std::thread> workers; + { + auto state(state_.lock()); + state->quit = true; + std::swap(workers, state->workers); + } + + debug(format("reaping %d worker threads") % workers.size()); + + work.notify_all(); + + for (auto & thr : workers) + thr.join(); } void ThreadPool::enqueue(const work_t & t) { - auto state_(state.lock()); - state_->left.push(t); - wakeup.notify_one(); + auto state(state_.lock()); + assert(!state->quit); + state->left.push(t); + if (state->left.size() > state->workers.size() && state->workers.size() < maxThreads) + state->workers.emplace_back(&ThreadPool::workerEntry, this); + work.notify_one(); } void ThreadPool::process() { - printMsg(lvlDebug, format("starting pool of %d threads") % nrThreads); - - std::vector<std::thread> workers; + while (true) { + auto state(state_.lock()); + if (state->exception) + std::rethrow_exception(state->exception); + if (state->left.empty() && !state->pending) break; + state.wait(done); + } +} - for (size_t n = 0; n < nrThreads; n++) - workers.push_back(std::thread([&]() { - bool first = true; +void ThreadPool::workerEntry() +{ + bool didWork = false; + while (true) { + work_t w; + { + auto state(state_.lock()); while (true) { - work_t work; - { - auto state_(state.lock()); - if (state_->exception) return; - if (!first) { - assert(state_->pending); - state_->pending--; - } - first = false; - while (state_->left.empty()) { - if (!state_->pending) { - wakeup.notify_all(); - return; - } - if (state_->exception) return; - state_.wait(wakeup); - } - work = state_->left.front(); - state_->left.pop(); - state_->pending++; - } - - try { - work(); - } catch (std::exception & e) { - auto state_(state.lock()); - if (state_->exception) { - if (!dynamic_cast<Interrupted*>(&e)) - printMsg(lvlError, format("error: %s") % e.what()); - } else { - state_->exception = std::current_exception(); - wakeup.notify_all(); - } + if (state->quit || state->exception) return; + if (didWork) { + assert(state->pending); + state->pending--; + didWork = false; } + if (!state->left.empty()) break; + if (!state->pending) + done.notify_all(); + state.wait(work); } + w = state->left.front(); + state->left.pop(); + state->pending++; + } - })); - - for (auto & thr : workers) - thr.join(); + try { + w(); + } catch (std::exception & e) { + auto state(state_.lock()); + if (state->exception) { + if (!dynamic_cast<Interrupted*>(&e)) + printMsg(lvlError, format("error: %s") % e.what()); + } else { + state->exception = std::current_exception(); + work.notify_all(); + done.notify_all(); + } + } - { - auto state_(state.lock()); - if (state_->exception) - std::rethrow_exception(state_->exception); + didWork = true; } } diff --git a/src/libutil/thread-pool.hh b/src/libutil/thread-pool.hh index 77641d88b..939bcf1ef 100644 --- a/src/libutil/thread-pool.hh +++ b/src/libutil/thread-pool.hh @@ -15,7 +15,9 @@ class ThreadPool { public: - ThreadPool(size_t nrThreads = 0); + ThreadPool(size_t maxThreads = 0); + + ~ThreadPool(); // FIXME: use std::packaged_task? typedef std::function<void()> work_t; @@ -34,19 +36,22 @@ public: private: - size_t nrThreads; + size_t maxThreads; struct State { std::queue<work_t> left; size_t pending = 0; std::exception_ptr exception; + std::vector<std::thread> workers; + bool quit = false; }; - Sync<State> state; + Sync<State> state_; - std::condition_variable wakeup; + std::condition_variable work, done; + void workerEntry(); }; } |