aboutsummaryrefslogtreecommitdiff
path: root/src/libstore/build
diff options
context:
space:
mode:
authoreldritch horrors <pennae@lix.systems>2024-09-24 00:21:16 +0200
committereldritch horrors <pennae@lix.systems>2024-09-27 16:40:27 +0200
commit852da07b67564f7a9986f0638aac391d334d4afa (patch)
tree993d896dd8353a664723206c2f3ea59230f3d1f5 /src/libstore/build
parentbf32085d63ccfa8fb1e0cff2f2ae7156b4679015 (diff)
libstore: replace Goal::WaitForSlot with semaphores
now that we have an event loop in the worker we can use it and its magical execution suspending properties to replace the slot counts we managed explicitly with semaphores and raii tokens. technically this would not have needed an event loop base to be doable, but it is a whole lot easier to wait for a token to be available if there is a callback mechanism ready for use that doesn't require a whole damn dedicated abstract method in Goal to work, and specific calls to that dedicated method strewn all over the worker implementation Change-Id: I1da7cf386d94e2bbf2dba9b53ff51dbce6a0cff7
Diffstat (limited to 'src/libstore/build')
-rw-r--r--src/libstore/build/derivation-goal.cc7
-rw-r--r--src/libstore/build/derivation-goal.hh2
-rw-r--r--src/libstore/build/drv-output-substitution-goal.cc12
-rw-r--r--src/libstore/build/drv-output-substitution-goal.hh2
-rw-r--r--src/libstore/build/goal.hh9
-rw-r--r--src/libstore/build/local-derivation-goal.cc9
-rw-r--r--src/libstore/build/substitution-goal.cc12
-rw-r--r--src/libstore/build/substitution-goal.hh2
-rw-r--r--src/libstore/build/worker.cc95
-rw-r--r--src/libstore/build/worker.hh32
10 files changed, 58 insertions, 124 deletions
diff --git a/src/libstore/build/derivation-goal.cc b/src/libstore/build/derivation-goal.cc
index c95092913..3c4257f08 100644
--- a/src/libstore/build/derivation-goal.cc
+++ b/src/libstore/build/derivation-goal.cc
@@ -134,9 +134,9 @@ Goal::Finished DerivationGoal::timedOut(Error && ex)
}
-kj::Promise<Result<Goal::WorkResult>> DerivationGoal::work(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DerivationGoal::work() noexcept
{
- return (this->*state)(inBuildSlot);
+ return (this->*state)(slotToken.valid());
}
void DerivationGoal::addWantedOutputs(const OutputsSpec & outputs)
@@ -783,7 +783,7 @@ try {
buildResult.startTime = time(0); // inexact
state = &DerivationGoal::buildDone;
started();
- return {{WaitForWorld{std::move(a.promise), false}}};
+ return {{WaitForWorld{std::move(a.promise)}}};
},
[&](HookReply::Postpone) -> std::optional<kj::Promise<Result<WorkResult>>> {
/* Not now; wait until at least one child finishes or
@@ -980,6 +980,7 @@ kj::Promise<Result<Goal::WorkResult>> DerivationGoal::buildDone(bool inBuildSlot
try {
trace("build done");
+ slotToken = {};
Finally releaseBuildUser([&](){ this->cleanupHookFinally(); });
cleanupPreChildKill();
diff --git a/src/libstore/build/derivation-goal.hh b/src/libstore/build/derivation-goal.hh
index 46b07fc0b..d60bb0b4c 100644
--- a/src/libstore/build/derivation-goal.hh
+++ b/src/libstore/build/derivation-goal.hh
@@ -249,7 +249,7 @@ struct DerivationGoal : public Goal
std::string key() override;
- kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept override;
+ kj::Promise<Result<WorkResult>> work() noexcept override;
/**
* Add wanted outputs to an already existing derivation goal.
diff --git a/src/libstore/build/drv-output-substitution-goal.cc b/src/libstore/build/drv-output-substitution-goal.cc
index fdee53699..6ef00d1ff 100644
--- a/src/libstore/build/drv-output-substitution-goal.cc
+++ b/src/libstore/build/drv-output-substitution-goal.cc
@@ -42,7 +42,10 @@ try {
trace("trying next substituter");
if (!inBuildSlot) {
- return {WaitForSlot{}};
+ return worker.substitutions.acquire().then([this](auto token) {
+ slotToken = std::move(token);
+ return work();
+ });
}
maintainRunningSubstitutions = worker.runningSubstitutions.addTemporarily(1);
@@ -81,7 +84,7 @@ try {
state = &DrvOutputSubstitutionGoal::realisationFetched;
return {WaitForWorld{
- pipe.promise.then([]() -> Outcome<void, Finished> { return result::success(); }), true
+ pipe.promise.then([]() -> Outcome<void, Finished> { return result::success(); })
}};
} catch (...) {
return {std::current_exception()};
@@ -90,6 +93,7 @@ try {
kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::realisationFetched(bool inBuildSlot) noexcept
try {
maintainRunningSubstitutions.reset();
+ slotToken = {};
try {
outputInfo = downloadState->result.get();
@@ -168,9 +172,9 @@ std::string DrvOutputSubstitutionGoal::key()
return "a$" + std::string(id.to_string());
}
-kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::work(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::work() noexcept
{
- return (this->*state)(inBuildSlot);
+ return (this->*state)(slotToken.valid());
}
diff --git a/src/libstore/build/drv-output-substitution-goal.hh b/src/libstore/build/drv-output-substitution-goal.hh
index a35bf67ee..805b65bfa 100644
--- a/src/libstore/build/drv-output-substitution-goal.hh
+++ b/src/libstore/build/drv-output-substitution-goal.hh
@@ -76,7 +76,7 @@ public:
std::string key() override;
- kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept override;
+ kj::Promise<Result<WorkResult>> work() noexcept override;
JobCategory jobCategory() const override {
return JobCategory::Substitution;
diff --git a/src/libstore/build/goal.hh b/src/libstore/build/goal.hh
index fbf767e8d..1ccf9716b 100644
--- a/src/libstore/build/goal.hh
+++ b/src/libstore/build/goal.hh
@@ -1,6 +1,7 @@
#pragma once
///@file
+#include "async-semaphore.hh"
#include "result.hh"
#include "types.hh"
#include "store-api.hh"
@@ -112,19 +113,20 @@ struct Goal
*/
BuildResult buildResult;
+protected:
+ AsyncSemaphore::Token slotToken;
+
public:
struct Finished;
struct [[nodiscard]] StillAlive {};
- struct [[nodiscard]] WaitForSlot {};
struct [[nodiscard]] ContinueImmediately {};
struct [[nodiscard]] WaitForGoals {
Goals goals;
};
struct [[nodiscard]] WaitForWorld {
kj::Promise<Outcome<void, Finished>> promise;
- bool inBuildSlot;
};
struct [[nodiscard]] Finished {
ExitCode exitCode;
@@ -138,7 +140,6 @@ public:
struct [[nodiscard]] WorkResult : std::variant<
StillAlive,
- WaitForSlot,
ContinueImmediately,
WaitForGoals,
WaitForWorld,
@@ -168,7 +169,7 @@ public:
trace("goal destroyed");
}
- virtual kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept = 0;
+ virtual kj::Promise<Result<WorkResult>> work() noexcept = 0;
virtual void waiteeDone(GoalPtr waitee) { }
diff --git a/src/libstore/build/local-derivation-goal.cc b/src/libstore/build/local-derivation-goal.cc
index 9ec87f1b6..2443cfb5a 100644
--- a/src/libstore/build/local-derivation-goal.cc
+++ b/src/libstore/build/local-derivation-goal.cc
@@ -156,8 +156,11 @@ try {
if (!inBuildSlot) {
state = &DerivationGoal::tryToBuild;
outputLocks.unlock();
- if (0U != settings.maxBuildJobs) {
- return {WaitForSlot{}};
+ if (worker.localBuilds.capacity() > 0) {
+ return worker.localBuilds.acquire().then([this](auto token) {
+ slotToken = std::move(token);
+ return work();
+ });
}
if (getMachines().empty()) {
throw Error(
@@ -248,7 +251,7 @@ try {
state = &DerivationGoal::buildDone;
started();
- return {WaitForWorld{std::move(promise), true}};
+ return {WaitForWorld{std::move(promise)}};
} catch (BuildError & e) {
outputLocks.unlock();
diff --git a/src/libstore/build/substitution-goal.cc b/src/libstore/build/substitution-goal.cc
index 058f858d4..6d90196fa 100644
--- a/src/libstore/build/substitution-goal.cc
+++ b/src/libstore/build/substitution-goal.cc
@@ -45,9 +45,9 @@ Goal::Finished PathSubstitutionGoal::done(
}
-kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::work(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::work() noexcept
{
- return (this->*state)(inBuildSlot);
+ return (this->*state)(slotToken.valid());
}
@@ -203,7 +203,10 @@ try {
trace("trying to run");
if (!inBuildSlot) {
- return {WaitForSlot{}};
+ return worker.substitutions.acquire().then([this](auto token) {
+ slotToken = std::move(token);
+ return work();
+ });
}
maintainRunningSubstitutions = worker.runningSubstitutions.addTemporarily(1);
@@ -236,7 +239,7 @@ try {
state = &PathSubstitutionGoal::finished;
return {WaitForWorld{
- pipe.promise.then([]() -> Outcome<void, Finished> { return result::success(); }), true
+ pipe.promise.then([]() -> Outcome<void, Finished> { return result::success(); })
}};
} catch (...) {
return {std::current_exception()};
@@ -248,6 +251,7 @@ try {
trace("substitute finished");
try {
+ slotToken = {};
thr.get();
} catch (std::exception & e) {
printError(e.what());
diff --git a/src/libstore/build/substitution-goal.hh b/src/libstore/build/substitution-goal.hh
index 91e256fd7..cef3a4c5c 100644
--- a/src/libstore/build/substitution-goal.hh
+++ b/src/libstore/build/substitution-goal.hh
@@ -99,7 +99,7 @@ public:
return "a$" + std::string(storePath.name()) + "$" + worker.store.printStorePath(storePath);
}
- kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept override;
+ kj::Promise<Result<WorkResult>> work() noexcept override;
/**
* The states.
diff --git a/src/libstore/build/worker.cc b/src/libstore/build/worker.cc
index 2cc2828b1..e19917d91 100644
--- a/src/libstore/build/worker.cc
+++ b/src/libstore/build/worker.cc
@@ -27,11 +27,13 @@ Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio)
, store(store)
, evalStore(evalStore)
, aio(aio)
+ /* Make sure that we are always allowed to run at least one substitution.
+ This prevents infinite waiting. */
+ , substitutions(std::max<unsigned>(1, settings.maxSubstitutionJobs))
+ , localBuilds(settings.maxBuildJobs)
, children(errorHandler)
{
/* Debugging: prevent recursive workers. */
- nrLocalBuilds = 0;
- nrSubstitutions = 0;
}
@@ -210,7 +212,6 @@ void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how)
std::visit(
overloaded{
[&](Goal::StillAlive) {},
- [&](Goal::WaitForSlot) { waitForBuildSlot(goal); },
[&](Goal::ContinueImmediately) { wakeUp(goal); },
[&](Goal::WaitForGoals & w) {
for (auto & dep : w.goals) {
@@ -219,19 +220,15 @@ void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how)
}
},
[&](Goal::WaitForWorld & w) {
- childStarted(
- goal,
- w.promise.then([](auto r) -> Result<Goal::WorkResult> {
- if (r.has_value()) {
- return {Goal::ContinueImmediately{}};
- } else if (r.has_error()) {
- return {std::move(r).error()};
- } else {
- return r.exception();
- }
- }),
- w.inBuildSlot
- );
+ childStarted(goal, w.promise.then([](auto r) -> Result<Goal::WorkResult> {
+ if (r.has_value()) {
+ return {Goal::ContinueImmediately{}};
+ } else if (r.has_error()) {
+ return {std::move(r).error()};
+ } else {
+ return r.exception();
+ }
+ }));
},
[&](Goal::Finished & f) { goalFinished(goal, f); },
},
@@ -268,8 +265,7 @@ void Worker::wakeUp(GoalPtr goal)
}
-void Worker::childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> promise,
- bool inBuildSlot)
+void Worker::childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> promise)
{
children.add(promise
.then([this, goal](auto result) {
@@ -279,64 +275,17 @@ void Worker::childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> pr
childException = result.assume_error();
}
})
- .attach(Finally{[this, goal, inBuildSlot] {
- childTerminated(goal, inBuildSlot);
+ .attach(Finally{[this, goal] {
+ childTerminated(goal);
}}));
- if (inBuildSlot) {
- switch (goal->jobCategory()) {
- case JobCategory::Substitution:
- nrSubstitutions++;
- break;
- case JobCategory::Build:
- nrLocalBuilds++;
- break;
- default:
- abort();
- }
- }
}
-void Worker::childTerminated(GoalPtr goal, bool inBuildSlot)
+void Worker::childTerminated(GoalPtr goal)
{
if (childFinished) {
childFinished->fulfill();
}
-
- if (inBuildSlot) {
- switch (goal->jobCategory()) {
- case JobCategory::Substitution:
- assert(nrSubstitutions > 0);
- nrSubstitutions--;
- break;
- case JobCategory::Build:
- assert(nrLocalBuilds > 0);
- nrLocalBuilds--;
- break;
- default:
- abort();
- }
- }
-
- /* Wake up goals waiting for a build slot. */
- for (auto & j : wantingToBuild) {
- GoalPtr goal = j.lock();
- if (goal) wakeUp(goal);
- }
-
- wantingToBuild.clear();
-}
-
-
-void Worker::waitForBuildSlot(GoalPtr goal)
-{
- goal->trace("wait for build slot");
- bool isSubstitutionGoal = goal->jobCategory() == JobCategory::Substitution;
- if ((!isSubstitutionGoal && nrLocalBuilds < settings.maxBuildJobs) ||
- (isSubstitutionGoal && nrSubstitutions < settings.maxSubstitutionJobs))
- wakeUp(goal); /* we can do it right away */
- else
- wantingToBuild.insert(goal);
}
@@ -394,16 +343,11 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req)
awake.clear();
for (auto & goal : awake2) {
checkInterrupt();
- /* Make sure that we are always allowed to run at least one substitution.
- This prevents infinite waiting. */
- const bool inSlot = goal->jobCategory() == JobCategory::Substitution
- ? nrSubstitutions < std::max(1U, (unsigned int) settings.maxSubstitutionJobs)
- : nrLocalBuilds < settings.maxBuildJobs;
- auto result = goal->work(inSlot);
+ auto result = goal->work();
if (result.poll(aio.waitScope)) {
handleWorkResult(goal, result.wait(aio.waitScope).value());
} else {
- childStarted(goal, std::move(result), false);
+ childStarted(goal, std::move(result));
}
if (topGoals.empty()) break; // stuff may have been cancelled
@@ -428,7 +372,6 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req)
exited while some of its subgoals were still active. But if
--keep-going *is* set, then they must all be finished now. */
assert(!settings.keepGoing || awake.empty());
- assert(!settings.keepGoing || wantingToBuild.empty());
assert(!settings.keepGoing || children.isEmpty());
return _topGoals;
diff --git a/src/libstore/build/worker.hh b/src/libstore/build/worker.hh
index daa612c06..834ecfda3 100644
--- a/src/libstore/build/worker.hh
+++ b/src/libstore/build/worker.hh
@@ -1,6 +1,7 @@
#pragma once
///@file
+#include "async-semaphore.hh"
#include "notifying-counter.hh"
#include "types.hh"
#include "lock.hh"
@@ -94,22 +95,6 @@ private:
WeakGoals awake;
/**
- * Goals waiting for a build slot.
- */
- WeakGoals wantingToBuild;
-
- /**
- * Number of build slots occupied. This includes local builds but does not
- * include substitutions or remote builds via the build hook.
- */
- unsigned int nrLocalBuilds;
-
- /**
- * Number of substitution slots occupied.
- */
- unsigned int nrSubstitutions;
-
- /**
* Maps used to prevent multiple instantiations of a goal for the
* same derivation / path.
*/
@@ -149,12 +134,6 @@ private:
kj::Own<kj::PromiseFulfiller<void>> childFinished;
/**
- * Put `goal` to sleep until a build slot becomes available (which
- * might be right away).
- */
- void waitForBuildSlot(GoalPtr goal);
-
- /**
* Wake up a goal (i.e., there is something for it to do).
*/
void wakeUp(GoalPtr goal);
@@ -170,16 +149,14 @@ private:
void removeGoal(GoalPtr goal);
/**
- * Registers a running child process. `inBuildSlot` means that
- * the process counts towards the jobs limit.
+ * Registers a running child process.
*/
- void childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> promise,
- bool inBuildSlot);
+ void childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> promise);
/**
* Unregisters a running child process.
*/
- void childTerminated(GoalPtr goal, bool inBuildSlot);
+ void childTerminated(GoalPtr goal);
/**
* Pass current stats counters to the logger for progress bar updates.
@@ -205,6 +182,7 @@ public:
Store & store;
Store & evalStore;
kj::AsyncIoContext & aio;
+ AsyncSemaphore substitutions, localBuilds;
private:
kj::TaskSet children;