aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/libstore/build/derivation-goal.cc7
-rw-r--r--src/libstore/build/derivation-goal.hh2
-rw-r--r--src/libstore/build/drv-output-substitution-goal.cc12
-rw-r--r--src/libstore/build/drv-output-substitution-goal.hh2
-rw-r--r--src/libstore/build/goal.hh9
-rw-r--r--src/libstore/build/local-derivation-goal.cc9
-rw-r--r--src/libstore/build/substitution-goal.cc12
-rw-r--r--src/libstore/build/substitution-goal.hh2
-rw-r--r--src/libstore/build/worker.cc95
-rw-r--r--src/libstore/build/worker.hh32
10 files changed, 58 insertions, 124 deletions
diff --git a/src/libstore/build/derivation-goal.cc b/src/libstore/build/derivation-goal.cc
index c95092913..3c4257f08 100644
--- a/src/libstore/build/derivation-goal.cc
+++ b/src/libstore/build/derivation-goal.cc
@@ -134,9 +134,9 @@ Goal::Finished DerivationGoal::timedOut(Error && ex)
}
-kj::Promise<Result<Goal::WorkResult>> DerivationGoal::work(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DerivationGoal::work() noexcept
{
- return (this->*state)(inBuildSlot);
+ return (this->*state)(slotToken.valid());
}
void DerivationGoal::addWantedOutputs(const OutputsSpec & outputs)
@@ -783,7 +783,7 @@ try {
buildResult.startTime = time(0); // inexact
state = &DerivationGoal::buildDone;
started();
- return {{WaitForWorld{std::move(a.promise), false}}};
+ return {{WaitForWorld{std::move(a.promise)}}};
},
[&](HookReply::Postpone) -> std::optional<kj::Promise<Result<WorkResult>>> {
/* Not now; wait until at least one child finishes or
@@ -980,6 +980,7 @@ kj::Promise<Result<Goal::WorkResult>> DerivationGoal::buildDone(bool inBuildSlot
try {
trace("build done");
+ slotToken = {};
Finally releaseBuildUser([&](){ this->cleanupHookFinally(); });
cleanupPreChildKill();
diff --git a/src/libstore/build/derivation-goal.hh b/src/libstore/build/derivation-goal.hh
index 46b07fc0b..d60bb0b4c 100644
--- a/src/libstore/build/derivation-goal.hh
+++ b/src/libstore/build/derivation-goal.hh
@@ -249,7 +249,7 @@ struct DerivationGoal : public Goal
std::string key() override;
- kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept override;
+ kj::Promise<Result<WorkResult>> work() noexcept override;
/**
* Add wanted outputs to an already existing derivation goal.
diff --git a/src/libstore/build/drv-output-substitution-goal.cc b/src/libstore/build/drv-output-substitution-goal.cc
index fdee53699..6ef00d1ff 100644
--- a/src/libstore/build/drv-output-substitution-goal.cc
+++ b/src/libstore/build/drv-output-substitution-goal.cc
@@ -42,7 +42,10 @@ try {
trace("trying next substituter");
if (!inBuildSlot) {
- return {WaitForSlot{}};
+ return worker.substitutions.acquire().then([this](auto token) {
+ slotToken = std::move(token);
+ return work();
+ });
}
maintainRunningSubstitutions = worker.runningSubstitutions.addTemporarily(1);
@@ -81,7 +84,7 @@ try {
state = &DrvOutputSubstitutionGoal::realisationFetched;
return {WaitForWorld{
- pipe.promise.then([]() -> Outcome<void, Finished> { return result::success(); }), true
+ pipe.promise.then([]() -> Outcome<void, Finished> { return result::success(); })
}};
} catch (...) {
return {std::current_exception()};
@@ -90,6 +93,7 @@ try {
kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::realisationFetched(bool inBuildSlot) noexcept
try {
maintainRunningSubstitutions.reset();
+ slotToken = {};
try {
outputInfo = downloadState->result.get();
@@ -168,9 +172,9 @@ std::string DrvOutputSubstitutionGoal::key()
return "a$" + std::string(id.to_string());
}
-kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::work(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::work() noexcept
{
- return (this->*state)(inBuildSlot);
+ return (this->*state)(slotToken.valid());
}
diff --git a/src/libstore/build/drv-output-substitution-goal.hh b/src/libstore/build/drv-output-substitution-goal.hh
index a35bf67ee..805b65bfa 100644
--- a/src/libstore/build/drv-output-substitution-goal.hh
+++ b/src/libstore/build/drv-output-substitution-goal.hh
@@ -76,7 +76,7 @@ public:
std::string key() override;
- kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept override;
+ kj::Promise<Result<WorkResult>> work() noexcept override;
JobCategory jobCategory() const override {
return JobCategory::Substitution;
diff --git a/src/libstore/build/goal.hh b/src/libstore/build/goal.hh
index fbf767e8d..1ccf9716b 100644
--- a/src/libstore/build/goal.hh
+++ b/src/libstore/build/goal.hh
@@ -1,6 +1,7 @@
#pragma once
///@file
+#include "async-semaphore.hh"
#include "result.hh"
#include "types.hh"
#include "store-api.hh"
@@ -112,19 +113,20 @@ struct Goal
*/
BuildResult buildResult;
+protected:
+ AsyncSemaphore::Token slotToken;
+
public:
struct Finished;
struct [[nodiscard]] StillAlive {};
- struct [[nodiscard]] WaitForSlot {};
struct [[nodiscard]] ContinueImmediately {};
struct [[nodiscard]] WaitForGoals {
Goals goals;
};
struct [[nodiscard]] WaitForWorld {
kj::Promise<Outcome<void, Finished>> promise;
- bool inBuildSlot;
};
struct [[nodiscard]] Finished {
ExitCode exitCode;
@@ -138,7 +140,6 @@ public:
struct [[nodiscard]] WorkResult : std::variant<
StillAlive,
- WaitForSlot,
ContinueImmediately,
WaitForGoals,
WaitForWorld,
@@ -168,7 +169,7 @@ public:
trace("goal destroyed");
}
- virtual kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept = 0;
+ virtual kj::Promise<Result<WorkResult>> work() noexcept = 0;
virtual void waiteeDone(GoalPtr waitee) { }
diff --git a/src/libstore/build/local-derivation-goal.cc b/src/libstore/build/local-derivation-goal.cc
index 9ec87f1b6..2443cfb5a 100644
--- a/src/libstore/build/local-derivation-goal.cc
+++ b/src/libstore/build/local-derivation-goal.cc
@@ -156,8 +156,11 @@ try {
if (!inBuildSlot) {
state = &DerivationGoal::tryToBuild;
outputLocks.unlock();
- if (0U != settings.maxBuildJobs) {
- return {WaitForSlot{}};
+ if (worker.localBuilds.capacity() > 0) {
+ return worker.localBuilds.acquire().then([this](auto token) {
+ slotToken = std::move(token);
+ return work();
+ });
}
if (getMachines().empty()) {
throw Error(
@@ -248,7 +251,7 @@ try {
state = &DerivationGoal::buildDone;
started();
- return {WaitForWorld{std::move(promise), true}};
+ return {WaitForWorld{std::move(promise)}};
} catch (BuildError & e) {
outputLocks.unlock();
diff --git a/src/libstore/build/substitution-goal.cc b/src/libstore/build/substitution-goal.cc
index 058f858d4..6d90196fa 100644
--- a/src/libstore/build/substitution-goal.cc
+++ b/src/libstore/build/substitution-goal.cc
@@ -45,9 +45,9 @@ Goal::Finished PathSubstitutionGoal::done(
}
-kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::work(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::work() noexcept
{
- return (this->*state)(inBuildSlot);
+ return (this->*state)(slotToken.valid());
}
@@ -203,7 +203,10 @@ try {
trace("trying to run");
if (!inBuildSlot) {
- return {WaitForSlot{}};
+ return worker.substitutions.acquire().then([this](auto token) {
+ slotToken = std::move(token);
+ return work();
+ });
}
maintainRunningSubstitutions = worker.runningSubstitutions.addTemporarily(1);
@@ -236,7 +239,7 @@ try {
state = &PathSubstitutionGoal::finished;
return {WaitForWorld{
- pipe.promise.then([]() -> Outcome<void, Finished> { return result::success(); }), true
+ pipe.promise.then([]() -> Outcome<void, Finished> { return result::success(); })
}};
} catch (...) {
return {std::current_exception()};
@@ -248,6 +251,7 @@ try {
trace("substitute finished");
try {
+ slotToken = {};
thr.get();
} catch (std::exception & e) {
printError(e.what());
diff --git a/src/libstore/build/substitution-goal.hh b/src/libstore/build/substitution-goal.hh
index 91e256fd7..cef3a4c5c 100644
--- a/src/libstore/build/substitution-goal.hh
+++ b/src/libstore/build/substitution-goal.hh
@@ -99,7 +99,7 @@ public:
return "a$" + std::string(storePath.name()) + "$" + worker.store.printStorePath(storePath);
}
- kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept override;
+ kj::Promise<Result<WorkResult>> work() noexcept override;
/**
* The states.
diff --git a/src/libstore/build/worker.cc b/src/libstore/build/worker.cc
index 2cc2828b1..e19917d91 100644
--- a/src/libstore/build/worker.cc
+++ b/src/libstore/build/worker.cc
@@ -27,11 +27,13 @@ Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio)
, store(store)
, evalStore(evalStore)
, aio(aio)
+ /* Make sure that we are always allowed to run at least one substitution.
+ This prevents infinite waiting. */
+ , substitutions(std::max<unsigned>(1, settings.maxSubstitutionJobs))
+ , localBuilds(settings.maxBuildJobs)
, children(errorHandler)
{
/* Debugging: prevent recursive workers. */
- nrLocalBuilds = 0;
- nrSubstitutions = 0;
}
@@ -210,7 +212,6 @@ void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how)
std::visit(
overloaded{
[&](Goal::StillAlive) {},
- [&](Goal::WaitForSlot) { waitForBuildSlot(goal); },
[&](Goal::ContinueImmediately) { wakeUp(goal); },
[&](Goal::WaitForGoals & w) {
for (auto & dep : w.goals) {
@@ -219,19 +220,15 @@ void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how)
}
},
[&](Goal::WaitForWorld & w) {
- childStarted(
- goal,
- w.promise.then([](auto r) -> Result<Goal::WorkResult> {
- if (r.has_value()) {
- return {Goal::ContinueImmediately{}};
- } else if (r.has_error()) {
- return {std::move(r).error()};
- } else {
- return r.exception();
- }
- }),
- w.inBuildSlot
- );
+ childStarted(goal, w.promise.then([](auto r) -> Result<Goal::WorkResult> {
+ if (r.has_value()) {
+ return {Goal::ContinueImmediately{}};
+ } else if (r.has_error()) {
+ return {std::move(r).error()};
+ } else {
+ return r.exception();
+ }
+ }));
},
[&](Goal::Finished & f) { goalFinished(goal, f); },
},
@@ -268,8 +265,7 @@ void Worker::wakeUp(GoalPtr goal)
}
-void Worker::childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> promise,
- bool inBuildSlot)
+void Worker::childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> promise)
{
children.add(promise
.then([this, goal](auto result) {
@@ -279,64 +275,17 @@ void Worker::childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> pr
childException = result.assume_error();
}
})
- .attach(Finally{[this, goal, inBuildSlot] {
- childTerminated(goal, inBuildSlot);
+ .attach(Finally{[this, goal] {
+ childTerminated(goal);
}}));
- if (inBuildSlot) {
- switch (goal->jobCategory()) {
- case JobCategory::Substitution:
- nrSubstitutions++;
- break;
- case JobCategory::Build:
- nrLocalBuilds++;
- break;
- default:
- abort();
- }
- }
}
-void Worker::childTerminated(GoalPtr goal, bool inBuildSlot)
+void Worker::childTerminated(GoalPtr goal)
{
if (childFinished) {
childFinished->fulfill();
}
-
- if (inBuildSlot) {
- switch (goal->jobCategory()) {
- case JobCategory::Substitution:
- assert(nrSubstitutions > 0);
- nrSubstitutions--;
- break;
- case JobCategory::Build:
- assert(nrLocalBuilds > 0);
- nrLocalBuilds--;
- break;
- default:
- abort();
- }
- }
-
- /* Wake up goals waiting for a build slot. */
- for (auto & j : wantingToBuild) {
- GoalPtr goal = j.lock();
- if (goal) wakeUp(goal);
- }
-
- wantingToBuild.clear();
-}
-
-
-void Worker::waitForBuildSlot(GoalPtr goal)
-{
- goal->trace("wait for build slot");
- bool isSubstitutionGoal = goal->jobCategory() == JobCategory::Substitution;
- if ((!isSubstitutionGoal && nrLocalBuilds < settings.maxBuildJobs) ||
- (isSubstitutionGoal && nrSubstitutions < settings.maxSubstitutionJobs))
- wakeUp(goal); /* we can do it right away */
- else
- wantingToBuild.insert(goal);
}
@@ -394,16 +343,11 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req)
awake.clear();
for (auto & goal : awake2) {
checkInterrupt();
- /* Make sure that we are always allowed to run at least one substitution.
- This prevents infinite waiting. */
- const bool inSlot = goal->jobCategory() == JobCategory::Substitution
- ? nrSubstitutions < std::max(1U, (unsigned int) settings.maxSubstitutionJobs)
- : nrLocalBuilds < settings.maxBuildJobs;
- auto result = goal->work(inSlot);
+ auto result = goal->work();
if (result.poll(aio.waitScope)) {
handleWorkResult(goal, result.wait(aio.waitScope).value());
} else {
- childStarted(goal, std::move(result), false);
+ childStarted(goal, std::move(result));
}
if (topGoals.empty()) break; // stuff may have been cancelled
@@ -428,7 +372,6 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req)
exited while some of its subgoals were still active. But if
--keep-going *is* set, then they must all be finished now. */
assert(!settings.keepGoing || awake.empty());
- assert(!settings.keepGoing || wantingToBuild.empty());
assert(!settings.keepGoing || children.isEmpty());
return _topGoals;
diff --git a/src/libstore/build/worker.hh b/src/libstore/build/worker.hh
index daa612c06..834ecfda3 100644
--- a/src/libstore/build/worker.hh
+++ b/src/libstore/build/worker.hh
@@ -1,6 +1,7 @@
#pragma once
///@file
+#include "async-semaphore.hh"
#include "notifying-counter.hh"
#include "types.hh"
#include "lock.hh"
@@ -94,22 +95,6 @@ private:
WeakGoals awake;
/**
- * Goals waiting for a build slot.
- */
- WeakGoals wantingToBuild;
-
- /**
- * Number of build slots occupied. This includes local builds but does not
- * include substitutions or remote builds via the build hook.
- */
- unsigned int nrLocalBuilds;
-
- /**
- * Number of substitution slots occupied.
- */
- unsigned int nrSubstitutions;
-
- /**
* Maps used to prevent multiple instantiations of a goal for the
* same derivation / path.
*/
@@ -149,12 +134,6 @@ private:
kj::Own<kj::PromiseFulfiller<void>> childFinished;
/**
- * Put `goal` to sleep until a build slot becomes available (which
- * might be right away).
- */
- void waitForBuildSlot(GoalPtr goal);
-
- /**
* Wake up a goal (i.e., there is something for it to do).
*/
void wakeUp(GoalPtr goal);
@@ -170,16 +149,14 @@ private:
void removeGoal(GoalPtr goal);
/**
- * Registers a running child process. `inBuildSlot` means that
- * the process counts towards the jobs limit.
+ * Registers a running child process.
*/
- void childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> promise,
- bool inBuildSlot);
+ void childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> promise);
/**
* Unregisters a running child process.
*/
- void childTerminated(GoalPtr goal, bool inBuildSlot);
+ void childTerminated(GoalPtr goal);
/**
* Pass current stats counters to the logger for progress bar updates.
@@ -205,6 +182,7 @@ public:
Store & store;
Store & evalStore;
kj::AsyncIoContext & aio;
+ AsyncSemaphore substitutions, localBuilds;
private:
kj::TaskSet children;