aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/libutil/thread-pool.cc75
-rw-r--r--src/libutil/thread-pool.hh2
2 files changed, 53 insertions, 24 deletions
diff --git a/src/libutil/thread-pool.cc b/src/libutil/thread-pool.cc
index 0a3a40724..f43dbe0c3 100644
--- a/src/libutil/thread-pool.cc
+++ b/src/libutil/thread-pool.cc
@@ -46,11 +46,17 @@ void ThreadPool::enqueue(const work_t & t)
void ThreadPool::process()
{
+ /* Loop until there are no active work items *and* there either
+ are no queued items or there is an exception. The
+ post-condition is that no new items will become active. */
while (true) {
auto state(state_.lock());
- if (state->exception)
- std::rethrow_exception(state->exception);
- if (state->left.empty() && !state->pending) break;
+ if (!state->active) {
+ if (state->exception)
+ std::rethrow_exception(state->exception);
+ if (state->left.empty())
+ break;
+ }
state.wait(done);
}
}
@@ -58,41 +64,64 @@ void ThreadPool::process()
void ThreadPool::workerEntry()
{
bool didWork = false;
+ std::exception_ptr exc;
while (true) {
work_t w;
{
auto state(state_.lock());
+
+ if (didWork) {
+ assert(state->active);
+ state->active--;
+
+ if (exc) {
+
+ if (!state->exception) {
+ state->exception = exc;
+ // Tell the other workers to quit.
+ state->quit = true;
+ work.notify_all();
+ } else {
+ /* Print the exception, since we can't
+ propagate it. */
+ try {
+ std::rethrow_exception(exc);
+ } catch (std::exception & e) {
+ if (!dynamic_cast<Interrupted*>(&e) &&
+ !dynamic_cast<ThreadPoolShutDown*>(&e))
+ ignoreException();
+ } catch (...) {
+ }
+ }
+ }
+ }
+
+ /* Wait until a work item is available or another thread
+ had an exception or we're asked to quit. */
while (true) {
- if (state->quit || state->exception) return;
- if (didWork) {
- assert(state->pending);
- state->pending--;
- didWork = false;
+ if (state->quit) {
+ if (!state->active)
+ done.notify_one();
+ return;
}
if (!state->left.empty()) break;
- if (!state->pending)
- done.notify_all();
+ if (!state->active) {
+ done.notify_one();
+ return;
+ }
state.wait(work);
}
- w = state->left.front();
+
+ w = std::move(state->left.front());
state->left.pop();
- state->pending++;
+ state->active++;
}
try {
w();
- } catch (std::exception & e) {
- auto state(state_.lock());
- if (state->exception) {
- if (!dynamic_cast<Interrupted*>(&e) &&
- !dynamic_cast<ThreadPoolShutDown*>(&e))
- printError(format("error: %s") % e.what());
- } else {
- state->exception = std::current_exception();
- work.notify_all();
- done.notify_all();
- }
+ } catch (...) {
+ exc = std::current_exception();
}
didWork = true;
diff --git a/src/libutil/thread-pool.hh b/src/libutil/thread-pool.hh
index 361a9d33a..835dfb4b8 100644
--- a/src/libutil/thread-pool.hh
+++ b/src/libutil/thread-pool.hh
@@ -44,7 +44,7 @@ private:
struct State
{
std::queue<work_t> left;
- size_t pending = 0;
+ size_t active = 0;
std::exception_ptr exception;
std::vector<std::thread> workers;
bool quit = false;