aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authoreldritch horrors <pennae@lix.systems>2024-09-01 01:37:10 +0200
committereldritch horrors <pennae@lix.systems>2024-09-27 16:39:33 +0200
commitcd1ceffb0ee9544bf14453f94da6b6f0d52f10cd (patch)
treeeb055f15f9bf62ebb9fbb373056bd48703769203 /src
parent0478949c72310b9749d5b959adad8bdf5c2c0841 (diff)
libstore: make waiting for a while a promise
this simplifies waitForInput quite a lot, and at the same time makes polling less thundering-herd-y. it even fixes early polling wakeups! Change-Id: I6dfa62ce91729b8880342117d71af5ae33366414
Diffstat (limited to 'src')
-rw-r--r--src/libstore/build/derivation-goal.cc14
-rw-r--r--src/libstore/build/goal.cc13
-rw-r--r--src/libstore/build/goal.hh7
-rw-r--r--src/libstore/build/local-derivation-goal.cc2
-rw-r--r--src/libstore/build/worker.cc63
-rw-r--r--src/libstore/build/worker.hh20
6 files changed, 51 insertions, 68 deletions
diff --git a/src/libstore/build/derivation-goal.cc b/src/libstore/build/derivation-goal.cc
index f40611b31..c95092913 100644
--- a/src/libstore/build/derivation-goal.cc
+++ b/src/libstore/build/derivation-goal.cc
@@ -736,7 +736,7 @@ try {
if (!actLock)
actLock = std::make_unique<Activity>(*logger, lvlWarn, actBuildWaiting,
fmt("waiting for lock on %s", Magenta(showPaths(lockFiles))));
- return {WaitForAWhile{}};
+ return waitForAWhile();
}
actLock.reset();
@@ -776,32 +776,32 @@ try {
auto hookReply = tryBuildHook(inBuildSlot);
auto result = std::visit(
overloaded{
- [&](HookReply::Accept & a) -> std::optional<WorkResult> {
+ [&](HookReply::Accept & a) -> std::optional<kj::Promise<Result<WorkResult>>> {
/* Yes, it has started doing so. Wait until we get
EOF from the hook. */
actLock.reset();
buildResult.startTime = time(0); // inexact
state = &DerivationGoal::buildDone;
started();
- return WaitForWorld{std::move(a.promise), false};
+ return {{WaitForWorld{std::move(a.promise), false}}};
},
- [&](HookReply::Postpone) -> std::optional<WorkResult> {
+ [&](HookReply::Postpone) -> std::optional<kj::Promise<Result<WorkResult>>> {
/* Not now; wait until at least one child finishes or
the wake-up timeout expires. */
if (!actLock)
actLock = std::make_unique<Activity>(*logger, lvlTalkative, actBuildWaiting,
fmt("waiting for a machine to build '%s'", Magenta(worker.store.printStorePath(drvPath))));
outputLocks.unlock();
- return WaitForAWhile{};
+ return waitForAWhile();
},
- [&](HookReply::Decline) -> std::optional<WorkResult> {
+ [&](HookReply::Decline) -> std::optional<kj::Promise<Result<WorkResult>>> {
/* We should do it ourselves. */
return std::nullopt;
},
},
hookReply);
if (result) {
- return {std::move(*result)};
+ return std::move(*result);
}
}
diff --git a/src/libstore/build/goal.cc b/src/libstore/build/goal.cc
index 82861ad2b..649093dbd 100644
--- a/src/libstore/build/goal.cc
+++ b/src/libstore/build/goal.cc
@@ -1,4 +1,6 @@
#include "goal.hh"
+#include "worker.hh"
+#include <kj/time.h>
namespace nix {
@@ -15,4 +17,15 @@ void Goal::trace(std::string_view s)
debug("%1%: %2%", name, s);
}
+kj::Promise<Result<Goal::WorkResult>> Goal::waitForAWhile()
+try {
+ trace("wait for a while");
+ /* If we are polling goals that are waiting for a lock, then wake
+ up after a few seconds at most. */
+ co_await worker.aio.provider->getTimer().afterDelay(settings.pollInterval.get() * kj::SECONDS);
+ co_return ContinueImmediately{};
+} catch (...) {
+ co_return std::current_exception();
+}
+
}
diff --git a/src/libstore/build/goal.hh b/src/libstore/build/goal.hh
index 3f6e8396e..fbf767e8d 100644
--- a/src/libstore/build/goal.hh
+++ b/src/libstore/build/goal.hh
@@ -118,7 +118,6 @@ public:
struct [[nodiscard]] StillAlive {};
struct [[nodiscard]] WaitForSlot {};
- struct [[nodiscard]] WaitForAWhile {};
struct [[nodiscard]] ContinueImmediately {};
struct [[nodiscard]] WaitForGoals {
Goals goals;
@@ -140,7 +139,6 @@ public:
struct [[nodiscard]] WorkResult : std::variant<
StillAlive,
WaitForSlot,
- WaitForAWhile,
ContinueImmediately,
WaitForGoals,
WaitForWorld,
@@ -150,6 +148,11 @@ public:
using variant::variant;
};
+protected:
+ kj::Promise<Result<WorkResult>> waitForAWhile();
+
+public:
+
/**
* Exception containing an error message, if any.
*/
diff --git a/src/libstore/build/local-derivation-goal.cc b/src/libstore/build/local-derivation-goal.cc
index 040fa7461..9ec87f1b6 100644
--- a/src/libstore/build/local-derivation-goal.cc
+++ b/src/libstore/build/local-derivation-goal.cc
@@ -212,7 +212,7 @@ try {
if (!actLock)
actLock = std::make_unique<Activity>(*logger, lvlWarn, actBuildWaiting,
fmt("waiting for a free build user ID for '%s'", Magenta(worker.store.printStorePath(drvPath))));
- return {WaitForAWhile{}};
+ return waitForAWhile();
}
}
diff --git a/src/libstore/build/worker.cc b/src/libstore/build/worker.cc
index 284adbc50..27d8e6ee1 100644
--- a/src/libstore/build/worker.cc
+++ b/src/libstore/build/worker.cc
@@ -32,7 +32,6 @@ Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio)
/* Debugging: prevent recursive workers. */
nrLocalBuilds = 0;
nrSubstitutions = 0;
- lastWokenUp = steady_time_point::min();
}
@@ -212,7 +211,6 @@ void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how)
overloaded{
[&](Goal::StillAlive) {},
[&](Goal::WaitForSlot) { waitForBuildSlot(goal); },
- [&](Goal::WaitForAWhile) { waitForAWhile(goal); },
[&](Goal::ContinueImmediately) { wakeUp(goal); },
[&](Goal::WaitForGoals & w) {
for (auto & dep : w.goals) {
@@ -221,12 +219,25 @@ void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how)
}
},
[&](Goal::WaitForWorld & w) {
- childStarted(goal, std::move(w.promise), w.inBuildSlot);
+ childStarted(
+ goal,
+ w.promise.then([](auto r) -> Result<Goal::WorkResult> {
+ if (r.has_value()) {
+ return {Goal::ContinueImmediately{}};
+ } else if (r.has_error()) {
+ return {std::move(r).error()};
+ } else {
+ return r.exception();
+ }
+ }),
+ w.inBuildSlot
+ );
},
[&](Goal::Finished & f) { goalFinished(goal, f); },
},
how
);
+ updateStatistics();
}
void Worker::removeGoal(GoalPtr goal)
@@ -257,17 +268,15 @@ void Worker::wakeUp(GoalPtr goal)
}
-void Worker::childStarted(GoalPtr goal, kj::Promise<Outcome<void, Goal::Finished>> promise,
+void Worker::childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> promise,
bool inBuildSlot)
{
children.add(promise
.then([this, goal](auto result) {
if (result.has_value()) {
- handleWorkResult(goal, Goal::ContinueImmediately{});
- } else if (result.has_error()) {
- handleWorkResult(goal, std::move(result.assume_error()));
+ handleWorkResult(goal, std::move(result.assume_value()));
} else {
- childException = result.assume_exception();
+ childException = result.assume_error();
}
})
.attach(Finally{[this, goal, inBuildSlot] {
@@ -331,13 +340,6 @@ void Worker::waitForBuildSlot(GoalPtr goal)
}
-void Worker::waitForAWhile(GoalPtr goal)
-{
- debug("wait for a while");
- waitingForAWhile.insert(goal);
-}
-
-
void Worker::updateStatistics()
{
// only update progress info while running. this notably excludes updating
@@ -397,8 +399,12 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req)
const bool inSlot = goal->jobCategory() == JobCategory::Substitution
? nrSubstitutions < std::max(1U, (unsigned int) settings.maxSubstitutionJobs)
: nrLocalBuilds < settings.maxBuildJobs;
- handleWorkResult(goal, goal->work(inSlot).wait(aio.waitScope).value());
- updateStatistics();
+ auto result = goal->work(inSlot);
+ if (result.poll(aio.waitScope)) {
+ handleWorkResult(goal, result.wait(aio.waitScope).value());
+ } else {
+ childStarted(goal, std::move(result), false);
+ }
if (topGoals.empty()) break; // stuff may have been cancelled
}
@@ -407,7 +413,7 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req)
if (topGoals.empty()) break;
/* Wait for input. */
- if (!children.isEmpty() || !waitingForAWhile.empty())
+ if (!children.isEmpty())
waitForInput();
else {
assert(!awake.empty());
@@ -445,22 +451,12 @@ void Worker::waitForInput()
terminated. */
std::optional<long> timeout = 0;
- auto before = steady_time_point::clock::now();
// Periodicallty wake up to see if we need to run the garbage collector.
if (settings.minFree.get() != 0) {
timeout = 10;
}
- /* If we are polling goals that are waiting for a lock, then wake
- up after a few seconds at most. */
- if (!waitingForAWhile.empty()) {
- if (lastWokenUp == steady_time_point::min() || lastWokenUp > before) lastWokenUp = before;
- timeout = std::max(1L,
- (long) std::chrono::duration_cast<std::chrono::seconds>(
- lastWokenUp + std::chrono::seconds(settings.pollInterval) - before).count());
- } else lastWokenUp = steady_time_point::min();
-
if (timeout)
vomit("sleeping %d seconds", *timeout);
@@ -475,17 +471,6 @@ void Worker::waitForInput()
}();
waitFor.wait(aio.waitScope);
-
- auto after = steady_time_point::clock::now();
-
- if (!waitingForAWhile.empty() && lastWokenUp + std::chrono::seconds(settings.pollInterval) <= after) {
- lastWokenUp = after;
- for (auto & i : waitingForAWhile) {
- GoalPtr goal = i.lock();
- if (goal) wakeUp(goal);
- }
- waitingForAWhile.clear();
- }
}
diff --git a/src/libstore/build/worker.hh b/src/libstore/build/worker.hh
index 37d80ba7b..daa612c06 100644
--- a/src/libstore/build/worker.hh
+++ b/src/libstore/build/worker.hh
@@ -118,16 +118,6 @@ private:
std::map<DrvOutput, std::weak_ptr<DrvOutputSubstitutionGoal>> drvOutputSubstitutionGoals;
/**
- * Goals sleeping for a few seconds (polling a lock).
- */
- WeakGoals waitingForAWhile;
-
- /**
- * Last time the goals in `waitingForAWhile` where woken up.
- */
- steady_time_point lastWokenUp;
-
- /**
* Cache for pathContentsGood().
*/
std::map<StorePath, bool> pathContentsGoodCache;
@@ -165,14 +155,6 @@ private:
void waitForBuildSlot(GoalPtr goal);
/**
- * Wait for a few seconds and then retry this goal. Used when
- * waiting for a lock held by another process. This kind of
- * polling is inefficient, but POSIX doesn't really provide a way
- * to wait for multiple locks in the main select() loop.
- */
- void waitForAWhile(GoalPtr goal);
-
- /**
* Wake up a goal (i.e., there is something for it to do).
*/
void wakeUp(GoalPtr goal);
@@ -191,7 +173,7 @@ private:
* Registers a running child process. `inBuildSlot` means that
* the process counts towards the jobs limit.
*/
- void childStarted(GoalPtr goal, kj::Promise<Outcome<void, Goal::Finished>> promise,
+ void childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> promise,
bool inBuildSlot);
/**