aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/libutil/thread-pool.cc81
-rw-r--r--src/libutil/thread-pool.hh9
2 files changed, 57 insertions, 33 deletions
diff --git a/src/libutil/thread-pool.cc b/src/libutil/thread-pool.cc
index ce126d36d..857ee91f8 100644
--- a/src/libutil/thread-pool.cc
+++ b/src/libutil/thread-pool.cc
@@ -13,11 +13,16 @@ ThreadPool::ThreadPool(size_t _maxThreads)
if (!maxThreads) maxThreads = 1;
}
- debug(format("starting pool of %d threads") % maxThreads);
+ debug("starting pool of %d threads", maxThreads - 1);
}
ThreadPool::~ThreadPool()
{
+ shutdown();
+}
+
+void ThreadPool::shutdown()
+{
std::vector<std::thread> workers;
{
auto state(state_.lock());
@@ -25,7 +30,9 @@ ThreadPool::~ThreadPool()
std::swap(workers, state->workers);
}
- debug(format("reaping %d worker threads") % workers.size());
+ if (workers.empty()) return;
+
+ debug("reaping %d worker threads", workers.size());
work.notify_all();
@@ -38,32 +45,43 @@ void ThreadPool::enqueue(const work_t & t)
auto state(state_.lock());
if (quit)
throw ThreadPoolShutDown("cannot enqueue a work item while the thread pool is shutting down");
- state->left.push(t);
- if (state->left.size() > state->workers.size() && state->workers.size() < maxThreads)
- state->workers.emplace_back(&ThreadPool::workerEntry, this);
+ state->pending.push(t);
+ /* Note: process() also executes items, so count it as a worker. */
+ if (state->pending.size() > state->workers.size() + 1 && state->workers.size() + 1 < maxThreads)
+ state->workers.emplace_back(&ThreadPool::doWork, this, false);
work.notify_one();
}
void ThreadPool::process()
{
- /* Loop until there are no active work items *and* there either
- are no queued items or there is an exception. The
- post-condition is that no new items will become active. */
- while (true) {
+ state_.lock()->draining = true;
+
+ /* Do work until no more work is pending or active. */
+ try {
+ doWork(true);
+
auto state(state_.lock());
- if (!state->active) {
- if (state->exception)
- std::rethrow_exception(state->exception);
- if (state->left.empty())
- break;
- }
- state.wait(done);
+
+ assert(quit);
+
+ if (state->exception)
+ std::rethrow_exception(state->exception);
+
+ } catch (...) {
+ /* In the exceptional case, some workers may still be
+ active. They may be referencing the stack frame of the
+ caller. So wait for them to finish. (~ThreadPool also does
+ this, but it might be destroyed after objects referenced by
+ the work item lambdas.) */
+ shutdown();
+ throw;
}
}
-void ThreadPool::workerEntry()
+void ThreadPool::doWork(bool mainThread)
{
- interruptCheck = [&]() { return (bool) quit; };
+ if (!mainThread)
+ interruptCheck = [&]() { return (bool) quit; };
bool didWork = false;
std::exception_ptr exc;
@@ -99,24 +117,27 @@ void ThreadPool::workerEntry()
}
}
- /* Wait until a work item is available or another thread
- had an exception or we're asked to quit. */
+ /* Wait until a work item is available or we're asked to
+ quit. */
while (true) {
- if (quit) {
- if (!state->active)
- done.notify_one();
- return;
- }
- if (!state->left.empty()) break;
- if (!state->active) {
- done.notify_one();
+ if (quit) return;
+
+ if (!state->pending.empty()) break;
+
+ /* If there are no active or pending items, and the
+ main thread is running process(), then no new items
+ can be added. So exit. */
+ if (!state->active && state->draining) {
+ quit = true;
+ work.notify_all();
return;
}
+
state.wait(work);
}
- w = std::move(state->left.front());
- state->left.pop();
+ w = std::move(state->pending.front());
+ state->pending.pop();
state->active++;
}
diff --git a/src/libutil/thread-pool.hh b/src/libutil/thread-pool.hh
index 06a097ab5..bb16b639a 100644
--- a/src/libutil/thread-pool.hh
+++ b/src/libutil/thread-pool.hh
@@ -44,19 +44,22 @@ private:
struct State
{
- std::queue<work_t> left;
+ std::queue<work_t> pending;
size_t active = 0;
std::exception_ptr exception;
std::vector<std::thread> workers;
+ bool draining = false;
};
std::atomic_bool quit{false};
Sync<State> state_;
- std::condition_variable work, done;
+ std::condition_variable work;
- void workerEntry();
+ void doWork(bool mainThread);
+
+ void shutdown();
};
/* Process in parallel a set of items of type T that have a partial