diff options
-rw-r--r-- | mk/tests.mk | 4 | ||||
-rw-r--r-- | release-common.nix | 4 | ||||
-rw-r--r-- | src/libutil/thread-pool.cc | 81 | ||||
-rw-r--r-- | src/libutil/thread-pool.hh | 9 |
4 files changed, 61 insertions, 37 deletions
diff --git a/mk/tests.mk b/mk/tests.mk index d18b91058..1138857c3 100644 --- a/mk/tests.mk +++ b/mk/tests.mk @@ -26,9 +26,9 @@ installcheck: printf "running test $$i..."; \ log="$$(cd $$(dirname $$i) && $(tests-environment) $$(basename $$i) 2>&1)"; \ if [ $$? -eq 0 ]; then \ - echo "[$${green}PASS$$normal]"; \ + echo " [$${green}PASS$$normal]"; \ else \ - echo "[$${red}FAIL$$normal]"; \ + echo " [$${red}FAIL$$normal]"; \ echo "$$log" | sed 's/^/ /'; \ failed=$$((failed + 1)); \ fi; \ diff --git a/release-common.nix b/release-common.nix index c64fc619d..4553118e1 100644 --- a/release-common.nix +++ b/release-common.nix @@ -7,8 +7,8 @@ rec { enableMinimal = true; extraConfig = '' CONFIG_ASH y - CONFIG_ASH_BUILTIN_ECHO y - CONFIG_ASH_BUILTIN_TEST y + CONFIG_ASH_ECHO y + CONFIG_ASH_TEST y CONFIG_ASH_OPTIMIZE_FOR_SIZE y ''; }; diff --git a/src/libutil/thread-pool.cc b/src/libutil/thread-pool.cc index ce126d36d..857ee91f8 100644 --- a/src/libutil/thread-pool.cc +++ b/src/libutil/thread-pool.cc @@ -13,11 +13,16 @@ ThreadPool::ThreadPool(size_t _maxThreads) if (!maxThreads) maxThreads = 1; } - debug(format("starting pool of %d threads") % maxThreads); + debug("starting pool of %d threads", maxThreads - 1); } ThreadPool::~ThreadPool() { + shutdown(); +} + +void ThreadPool::shutdown() +{ std::vector<std::thread> workers; { auto state(state_.lock()); @@ -25,7 +30,9 @@ ThreadPool::~ThreadPool() std::swap(workers, state->workers); } - debug(format("reaping %d worker threads") % workers.size()); + if (workers.empty()) return; + + debug("reaping %d worker threads", workers.size()); work.notify_all(); @@ -38,32 +45,43 @@ void ThreadPool::enqueue(const work_t & t) auto state(state_.lock()); if (quit) throw ThreadPoolShutDown("cannot enqueue a work item while the thread pool is shutting down"); - state->left.push(t); - if (state->left.size() > state->workers.size() && state->workers.size() < maxThreads) - state->workers.emplace_back(&ThreadPool::workerEntry, this); + state->pending.push(t); + /* Note: process() also executes items, so count it as a worker. */ + if (state->pending.size() > state->workers.size() + 1 && state->workers.size() + 1 < maxThreads) + state->workers.emplace_back(&ThreadPool::doWork, this, false); work.notify_one(); } void ThreadPool::process() { - /* Loop until there are no active work items *and* there either - are no queued items or there is an exception. The - post-condition is that no new items will become active. */ - while (true) { + state_.lock()->draining = true; + + /* Do work until no more work is pending or active. */ + try { + doWork(true); + auto state(state_.lock()); - if (!state->active) { - if (state->exception) - std::rethrow_exception(state->exception); - if (state->left.empty()) - break; - } - state.wait(done); + + assert(quit); + + if (state->exception) + std::rethrow_exception(state->exception); + + } catch (...) { + /* In the exceptional case, some workers may still be + active. They may be referencing the stack frame of the + caller. So wait for them to finish. (~ThreadPool also does + this, but it might be destroyed after objects referenced by + the work item lambdas.) */ + shutdown(); + throw; } } -void ThreadPool::workerEntry() +void ThreadPool::doWork(bool mainThread) { - interruptCheck = [&]() { return (bool) quit; }; + if (!mainThread) + interruptCheck = [&]() { return (bool) quit; }; bool didWork = false; std::exception_ptr exc; @@ -99,24 +117,27 @@ void ThreadPool::workerEntry() } } - /* Wait until a work item is available or another thread - had an exception or we're asked to quit. */ + /* Wait until a work item is available or we're asked to + quit. */ while (true) { - if (quit) { - if (!state->active) - done.notify_one(); - return; - } - if (!state->left.empty()) break; - if (!state->active) { - done.notify_one(); + if (quit) return; + + if (!state->pending.empty()) break; + + /* If there are no active or pending items, and the + main thread is running process(), then no new items + can be added. So exit. */ + if (!state->active && state->draining) { + quit = true; + work.notify_all(); return; } + state.wait(work); } - w = std::move(state->left.front()); - state->left.pop(); + w = std::move(state->pending.front()); + state->pending.pop(); state->active++; } diff --git a/src/libutil/thread-pool.hh b/src/libutil/thread-pool.hh index 06a097ab5..bb16b639a 100644 --- a/src/libutil/thread-pool.hh +++ b/src/libutil/thread-pool.hh @@ -44,19 +44,22 @@ private: struct State { - std::queue<work_t> left; + std::queue<work_t> pending; size_t active = 0; std::exception_ptr exception; std::vector<std::thread> workers; + bool draining = false; }; std::atomic_bool quit{false}; Sync<State> state_; - std::condition_variable work, done; + std::condition_variable work; - void workerEntry(); + void doWork(bool mainThread); + + void shutdown(); }; /* Process in parallel a set of items of type T that have a partial |