diff options
author | eldritch horrors <pennae@lix.systems> | 2024-09-01 01:37:10 +0200 |
---|---|---|
committer | eldritch horrors <pennae@lix.systems> | 2024-09-27 16:39:33 +0200 |
commit | cd1ceffb0ee9544bf14453f94da6b6f0d52f10cd (patch) | |
tree | eb055f15f9bf62ebb9fbb373056bd48703769203 /src/libstore/build | |
parent | 0478949c72310b9749d5b959adad8bdf5c2c0841 (diff) |
libstore: make waiting for a while a promise
this simplifies waitForInput quite a lot, and at the same time makes
polling less thundering-herd-y. it even fixes early polling wakeups!
Change-Id: I6dfa62ce91729b8880342117d71af5ae33366414
Diffstat (limited to 'src/libstore/build')
-rw-r--r-- | src/libstore/build/derivation-goal.cc | 14 | ||||
-rw-r--r-- | src/libstore/build/goal.cc | 13 | ||||
-rw-r--r-- | src/libstore/build/goal.hh | 7 | ||||
-rw-r--r-- | src/libstore/build/local-derivation-goal.cc | 2 | ||||
-rw-r--r-- | src/libstore/build/worker.cc | 63 | ||||
-rw-r--r-- | src/libstore/build/worker.hh | 20 |
6 files changed, 51 insertions, 68 deletions
diff --git a/src/libstore/build/derivation-goal.cc b/src/libstore/build/derivation-goal.cc index f40611b31..c95092913 100644 --- a/src/libstore/build/derivation-goal.cc +++ b/src/libstore/build/derivation-goal.cc @@ -736,7 +736,7 @@ try { if (!actLock) actLock = std::make_unique<Activity>(*logger, lvlWarn, actBuildWaiting, fmt("waiting for lock on %s", Magenta(showPaths(lockFiles)))); - return {WaitForAWhile{}}; + return waitForAWhile(); } actLock.reset(); @@ -776,32 +776,32 @@ try { auto hookReply = tryBuildHook(inBuildSlot); auto result = std::visit( overloaded{ - [&](HookReply::Accept & a) -> std::optional<WorkResult> { + [&](HookReply::Accept & a) -> std::optional<kj::Promise<Result<WorkResult>>> { /* Yes, it has started doing so. Wait until we get EOF from the hook. */ actLock.reset(); buildResult.startTime = time(0); // inexact state = &DerivationGoal::buildDone; started(); - return WaitForWorld{std::move(a.promise), false}; + return {{WaitForWorld{std::move(a.promise), false}}}; }, - [&](HookReply::Postpone) -> std::optional<WorkResult> { + [&](HookReply::Postpone) -> std::optional<kj::Promise<Result<WorkResult>>> { /* Not now; wait until at least one child finishes or the wake-up timeout expires. */ if (!actLock) actLock = std::make_unique<Activity>(*logger, lvlTalkative, actBuildWaiting, fmt("waiting for a machine to build '%s'", Magenta(worker.store.printStorePath(drvPath)))); outputLocks.unlock(); - return WaitForAWhile{}; + return waitForAWhile(); }, - [&](HookReply::Decline) -> std::optional<WorkResult> { + [&](HookReply::Decline) -> std::optional<kj::Promise<Result<WorkResult>>> { /* We should do it ourselves. */ return std::nullopt; }, }, hookReply); if (result) { - return {std::move(*result)}; + return std::move(*result); } } diff --git a/src/libstore/build/goal.cc b/src/libstore/build/goal.cc index 82861ad2b..649093dbd 100644 --- a/src/libstore/build/goal.cc +++ b/src/libstore/build/goal.cc @@ -1,4 +1,6 @@ #include "goal.hh" +#include "worker.hh" +#include <kj/time.h> namespace nix { @@ -15,4 +17,15 @@ void Goal::trace(std::string_view s) debug("%1%: %2%", name, s); } +kj::Promise<Result<Goal::WorkResult>> Goal::waitForAWhile() +try { + trace("wait for a while"); + /* If we are polling goals that are waiting for a lock, then wake + up after a few seconds at most. */ + co_await worker.aio.provider->getTimer().afterDelay(settings.pollInterval.get() * kj::SECONDS); + co_return ContinueImmediately{}; +} catch (...) { + co_return std::current_exception(); +} + } diff --git a/src/libstore/build/goal.hh b/src/libstore/build/goal.hh index 3f6e8396e..fbf767e8d 100644 --- a/src/libstore/build/goal.hh +++ b/src/libstore/build/goal.hh @@ -118,7 +118,6 @@ public: struct [[nodiscard]] StillAlive {}; struct [[nodiscard]] WaitForSlot {}; - struct [[nodiscard]] WaitForAWhile {}; struct [[nodiscard]] ContinueImmediately {}; struct [[nodiscard]] WaitForGoals { Goals goals; @@ -140,7 +139,6 @@ public: struct [[nodiscard]] WorkResult : std::variant< StillAlive, WaitForSlot, - WaitForAWhile, ContinueImmediately, WaitForGoals, WaitForWorld, @@ -150,6 +148,11 @@ public: using variant::variant; }; +protected: + kj::Promise<Result<WorkResult>> waitForAWhile(); + +public: + /** * Exception containing an error message, if any. */ diff --git a/src/libstore/build/local-derivation-goal.cc b/src/libstore/build/local-derivation-goal.cc index 040fa7461..9ec87f1b6 100644 --- a/src/libstore/build/local-derivation-goal.cc +++ b/src/libstore/build/local-derivation-goal.cc @@ -212,7 +212,7 @@ try { if (!actLock) actLock = std::make_unique<Activity>(*logger, lvlWarn, actBuildWaiting, fmt("waiting for a free build user ID for '%s'", Magenta(worker.store.printStorePath(drvPath)))); - return {WaitForAWhile{}}; + return waitForAWhile(); } } diff --git a/src/libstore/build/worker.cc b/src/libstore/build/worker.cc index 284adbc50..27d8e6ee1 100644 --- a/src/libstore/build/worker.cc +++ b/src/libstore/build/worker.cc @@ -32,7 +32,6 @@ Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio) /* Debugging: prevent recursive workers. */ nrLocalBuilds = 0; nrSubstitutions = 0; - lastWokenUp = steady_time_point::min(); } @@ -212,7 +211,6 @@ void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how) overloaded{ [&](Goal::StillAlive) {}, [&](Goal::WaitForSlot) { waitForBuildSlot(goal); }, - [&](Goal::WaitForAWhile) { waitForAWhile(goal); }, [&](Goal::ContinueImmediately) { wakeUp(goal); }, [&](Goal::WaitForGoals & w) { for (auto & dep : w.goals) { @@ -221,12 +219,25 @@ void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how) } }, [&](Goal::WaitForWorld & w) { - childStarted(goal, std::move(w.promise), w.inBuildSlot); + childStarted( + goal, + w.promise.then([](auto r) -> Result<Goal::WorkResult> { + if (r.has_value()) { + return {Goal::ContinueImmediately{}}; + } else if (r.has_error()) { + return {std::move(r).error()}; + } else { + return r.exception(); + } + }), + w.inBuildSlot + ); }, [&](Goal::Finished & f) { goalFinished(goal, f); }, }, how ); + updateStatistics(); } void Worker::removeGoal(GoalPtr goal) @@ -257,17 +268,15 @@ void Worker::wakeUp(GoalPtr goal) } -void Worker::childStarted(GoalPtr goal, kj::Promise<Outcome<void, Goal::Finished>> promise, +void Worker::childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> promise, bool inBuildSlot) { children.add(promise .then([this, goal](auto result) { if (result.has_value()) { - handleWorkResult(goal, Goal::ContinueImmediately{}); - } else if (result.has_error()) { - handleWorkResult(goal, std::move(result.assume_error())); + handleWorkResult(goal, std::move(result.assume_value())); } else { - childException = result.assume_exception(); + childException = result.assume_error(); } }) .attach(Finally{[this, goal, inBuildSlot] { @@ -331,13 +340,6 @@ void Worker::waitForBuildSlot(GoalPtr goal) } -void Worker::waitForAWhile(GoalPtr goal) -{ - debug("wait for a while"); - waitingForAWhile.insert(goal); -} - - void Worker::updateStatistics() { // only update progress info while running. this notably excludes updating @@ -397,8 +399,12 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req) const bool inSlot = goal->jobCategory() == JobCategory::Substitution ? nrSubstitutions < std::max(1U, (unsigned int) settings.maxSubstitutionJobs) : nrLocalBuilds < settings.maxBuildJobs; - handleWorkResult(goal, goal->work(inSlot).wait(aio.waitScope).value()); - updateStatistics(); + auto result = goal->work(inSlot); + if (result.poll(aio.waitScope)) { + handleWorkResult(goal, result.wait(aio.waitScope).value()); + } else { + childStarted(goal, std::move(result), false); + } if (topGoals.empty()) break; // stuff may have been cancelled } @@ -407,7 +413,7 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req) if (topGoals.empty()) break; /* Wait for input. */ - if (!children.isEmpty() || !waitingForAWhile.empty()) + if (!children.isEmpty()) waitForInput(); else { assert(!awake.empty()); @@ -445,22 +451,12 @@ void Worker::waitForInput() terminated. */ std::optional<long> timeout = 0; - auto before = steady_time_point::clock::now(); // Periodicallty wake up to see if we need to run the garbage collector. if (settings.minFree.get() != 0) { timeout = 10; } - /* If we are polling goals that are waiting for a lock, then wake - up after a few seconds at most. */ - if (!waitingForAWhile.empty()) { - if (lastWokenUp == steady_time_point::min() || lastWokenUp > before) lastWokenUp = before; - timeout = std::max(1L, - (long) std::chrono::duration_cast<std::chrono::seconds>( - lastWokenUp + std::chrono::seconds(settings.pollInterval) - before).count()); - } else lastWokenUp = steady_time_point::min(); - if (timeout) vomit("sleeping %d seconds", *timeout); @@ -475,17 +471,6 @@ void Worker::waitForInput() }(); waitFor.wait(aio.waitScope); - - auto after = steady_time_point::clock::now(); - - if (!waitingForAWhile.empty() && lastWokenUp + std::chrono::seconds(settings.pollInterval) <= after) { - lastWokenUp = after; - for (auto & i : waitingForAWhile) { - GoalPtr goal = i.lock(); - if (goal) wakeUp(goal); - } - waitingForAWhile.clear(); - } } diff --git a/src/libstore/build/worker.hh b/src/libstore/build/worker.hh index 37d80ba7b..daa612c06 100644 --- a/src/libstore/build/worker.hh +++ b/src/libstore/build/worker.hh @@ -118,16 +118,6 @@ private: std::map<DrvOutput, std::weak_ptr<DrvOutputSubstitutionGoal>> drvOutputSubstitutionGoals; /** - * Goals sleeping for a few seconds (polling a lock). - */ - WeakGoals waitingForAWhile; - - /** - * Last time the goals in `waitingForAWhile` where woken up. - */ - steady_time_point lastWokenUp; - - /** * Cache for pathContentsGood(). */ std::map<StorePath, bool> pathContentsGoodCache; @@ -165,14 +155,6 @@ private: void waitForBuildSlot(GoalPtr goal); /** - * Wait for a few seconds and then retry this goal. Used when - * waiting for a lock held by another process. This kind of - * polling is inefficient, but POSIX doesn't really provide a way - * to wait for multiple locks in the main select() loop. - */ - void waitForAWhile(GoalPtr goal); - - /** * Wake up a goal (i.e., there is something for it to do). */ void wakeUp(GoalPtr goal); @@ -191,7 +173,7 @@ private: * Registers a running child process. `inBuildSlot` means that * the process counts towards the jobs limit. */ - void childStarted(GoalPtr goal, kj::Promise<Outcome<void, Goal::Finished>> promise, + void childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> promise, bool inBuildSlot); /** |