aboutsummaryrefslogtreecommitdiff
path: root/src/libstore/build/worker.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstore/build/worker.cc')
-rw-r--r--src/libstore/build/worker.cc398
1 files changed, 114 insertions, 284 deletions
diff --git a/src/libstore/build/worker.cc b/src/libstore/build/worker.cc
index ee45c7e3f..68071a94c 100644
--- a/src/libstore/build/worker.cc
+++ b/src/libstore/build/worker.cc
@@ -7,10 +7,19 @@
#include "signals.hh"
#include "hook-instance.hh" // IWYU pragma: keep
-#include <poll.h>
-
namespace nix {
+namespace {
+struct ErrorHandler : kj::TaskSet::ErrorHandler
+{
+ void taskFailed(kj::Exception && e) override
+ {
+ printError("unexpected async failure in Worker: %s", kj::str(e).cStr());
+ abort();
+ }
+} errorHandler;
+}
+
Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio)
: act(*logger, actRealise)
, actDerivations(*logger, actBuilds)
@@ -18,11 +27,13 @@ Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio)
, store(store)
, evalStore(evalStore)
, aio(aio)
+ /* Make sure that we are always allowed to run at least one substitution.
+ This prevents infinite waiting. */
+ , substitutions(std::max<unsigned>(1, settings.maxSubstitutionJobs))
+ , localBuilds(settings.maxBuildJobs)
+ , children(errorHandler)
{
/* Debugging: prevent recursive workers. */
- nrLocalBuilds = 0;
- nrSubstitutions = 0;
- lastWokenUp = steady_time_point::min();
}
@@ -33,6 +44,7 @@ Worker::~Worker()
are in trouble, since goals may call childTerminated() etc. in
their destructors). */
topGoals.clear();
+ children.clear();
assert(expectedSubstitutions == 0);
assert(expectedDownloadSize == 0);
@@ -40,26 +52,28 @@ Worker::~Worker()
}
-std::shared_ptr<DerivationGoal> Worker::makeDerivationGoalCommon(
+std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<void>> Worker::makeDerivationGoalCommon(
const StorePath & drvPath,
const OutputsSpec & wantedOutputs,
std::function<std::shared_ptr<DerivationGoal>()> mkDrvGoal)
{
- std::weak_ptr<DerivationGoal> & goal_weak = derivationGoals[drvPath];
- std::shared_ptr<DerivationGoal> goal = goal_weak.lock();
+ auto & goal_weak = derivationGoals[drvPath];
+ std::shared_ptr<DerivationGoal> goal = goal_weak.goal.lock();
if (!goal) {
goal = mkDrvGoal();
- goal_weak = goal;
+ goal->notify = std::move(goal_weak.fulfiller);
+ goal_weak.goal = goal;
wakeUp(goal);
} else {
goal->addWantedOutputs(wantedOutputs);
}
- return goal;
+ return {goal, goal_weak.promise->addBranch()};
}
-std::shared_ptr<DerivationGoal> Worker::makeDerivationGoal(const StorePath & drvPath,
- const OutputsSpec & wantedOutputs, BuildMode buildMode)
+std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<void>> Worker::makeDerivationGoal(
+ const StorePath & drvPath, const OutputsSpec & wantedOutputs, BuildMode buildMode
+)
{
return makeDerivationGoalCommon(
drvPath,
@@ -77,8 +91,12 @@ std::shared_ptr<DerivationGoal> Worker::makeDerivationGoal(const StorePath & drv
}
-std::shared_ptr<DerivationGoal> Worker::makeBasicDerivationGoal(const StorePath & drvPath,
- const BasicDerivation & drv, const OutputsSpec & wantedOutputs, BuildMode buildMode)
+std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<void>> Worker::makeBasicDerivationGoal(
+ const StorePath & drvPath,
+ const BasicDerivation & drv,
+ const OutputsSpec & wantedOutputs,
+ BuildMode buildMode
+)
{
return makeDerivationGoalCommon(
drvPath,
@@ -96,55 +114,63 @@ std::shared_ptr<DerivationGoal> Worker::makeBasicDerivationGoal(const StorePath
}
-std::shared_ptr<PathSubstitutionGoal> Worker::makePathSubstitutionGoal(const StorePath & path, RepairFlag repair, std::optional<ContentAddress> ca)
+std::pair<std::shared_ptr<PathSubstitutionGoal>, kj::Promise<void>>
+Worker::makePathSubstitutionGoal(
+ const StorePath & path, RepairFlag repair, std::optional<ContentAddress> ca
+)
{
- std::weak_ptr<PathSubstitutionGoal> & goal_weak = substitutionGoals[path];
- auto goal = goal_weak.lock(); // FIXME
+ auto & goal_weak = substitutionGoals[path];
+ auto goal = goal_weak.goal.lock(); // FIXME
if (!goal) {
goal = std::make_shared<PathSubstitutionGoal>(path, *this, running, repair, ca);
- goal_weak = goal;
+ goal->notify = std::move(goal_weak.fulfiller);
+ goal_weak.goal = goal;
wakeUp(goal);
}
- return goal;
+ return {goal, goal_weak.promise->addBranch()};
}
-std::shared_ptr<DrvOutputSubstitutionGoal> Worker::makeDrvOutputSubstitutionGoal(const DrvOutput& id, RepairFlag repair, std::optional<ContentAddress> ca)
+std::pair<std::shared_ptr<DrvOutputSubstitutionGoal>, kj::Promise<void>>
+Worker::makeDrvOutputSubstitutionGoal(
+ const DrvOutput & id, RepairFlag repair, std::optional<ContentAddress> ca
+)
{
- std::weak_ptr<DrvOutputSubstitutionGoal> & goal_weak = drvOutputSubstitutionGoals[id];
- auto goal = goal_weak.lock(); // FIXME
+ auto & goal_weak = drvOutputSubstitutionGoals[id];
+ auto goal = goal_weak.goal.lock(); // FIXME
if (!goal) {
goal = std::make_shared<DrvOutputSubstitutionGoal>(id, *this, running, repair, ca);
- goal_weak = goal;
+ goal->notify = std::move(goal_weak.fulfiller);
+ goal_weak.goal = goal;
wakeUp(goal);
}
- return goal;
+ return {goal, goal_weak.promise->addBranch()};
}
-GoalPtr Worker::makeGoal(const DerivedPath & req, BuildMode buildMode)
+std::pair<GoalPtr, kj::Promise<void>> Worker::makeGoal(const DerivedPath & req, BuildMode buildMode)
{
return std::visit(overloaded {
- [&](const DerivedPath::Built & bfd) -> GoalPtr {
+ [&](const DerivedPath::Built & bfd) -> std::pair<GoalPtr, kj::Promise<void>> {
if (auto bop = std::get_if<DerivedPath::Opaque>(&*bfd.drvPath))
return makeDerivationGoal(bop->path, bfd.outputs, buildMode);
else
throw UnimplementedError("Building dynamic derivations in one shot is not yet implemented.");
},
- [&](const DerivedPath::Opaque & bo) -> GoalPtr {
+ [&](const DerivedPath::Opaque & bo) -> std::pair<GoalPtr, kj::Promise<void>> {
return makePathSubstitutionGoal(bo.path, buildMode == bmRepair ? Repair : NoRepair);
},
}, req.raw());
}
-template<typename K, typename G>
-static void removeGoal(std::shared_ptr<G> goal, std::map<K, std::weak_ptr<G>> & goalMap)
+template<typename G>
+static void removeGoal(std::shared_ptr<G> goal, auto & goalMap)
{
/* !!! inefficient */
for (auto i = goalMap.begin();
i != goalMap.end(); )
- if (i->second.lock() == goal) {
+ if (i->second.goal.lock() == goal) {
auto j = i; ++j;
goalMap.erase(i);
i = j;
@@ -165,33 +191,8 @@ void Worker::goalFinished(GoalPtr goal, Goal::Finished & f)
hashMismatch |= f.hashMismatch;
checkMismatch |= f.checkMismatch;
- for (auto & i : goal->waiters) {
- if (GoalPtr waiting = i.lock()) {
- assert(waiting->waitees.count(goal));
- waiting->waitees.erase(goal);
-
- waiting->trace(fmt("waitee '%s' done; %d left", goal->name, waiting->waitees.size()));
-
- if (f.exitCode != Goal::ecSuccess) ++waiting->nrFailed;
- if (f.exitCode == Goal::ecNoSubstituters) ++waiting->nrNoSubstituters;
- if (f.exitCode == Goal::ecIncompleteClosure) ++waiting->nrIncompleteClosure;
-
- if (waiting->waitees.empty() || (f.exitCode == Goal::ecFailed && !settings.keepGoing)) {
- /* If we failed and keepGoing is not set, we remove all
- remaining waitees. */
- for (auto & i : waiting->waitees) {
- i->waiters.extract(waiting);
- }
- waiting->waitees.clear();
-
- wakeUp(waiting);
- }
-
- waiting->waiteeDone(goal);
- }
- }
- goal->waiters.clear();
removeGoal(goal);
+ goal->notify->fulfill();
goal->cleanup();
}
@@ -200,20 +201,23 @@ void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how)
std::visit(
overloaded{
[&](Goal::StillAlive) {},
- [&](Goal::WaitForSlot) { waitForBuildSlot(goal); },
- [&](Goal::WaitForAWhile) { waitForAWhile(goal); },
[&](Goal::ContinueImmediately) { wakeUp(goal); },
- [&](Goal::WaitForGoals & w) {
- for (auto & dep : w.goals) {
- goal->waitees.insert(dep);
- dep->waiters.insert(goal);
- }
+ [&](Goal::WaitForWorld & w) {
+ childStarted(goal, w.promise.then([](auto r) -> Result<Goal::WorkResult> {
+ if (r.has_value()) {
+ return {Goal::ContinueImmediately{}};
+ } else if (r.has_error()) {
+ return {std::move(r).error()};
+ } else {
+ return r.exception();
+ }
+ }));
},
- [&](Goal::WaitForWorld & w) { childStarted(goal, w.fds, w.inBuildSlot); },
[&](Goal::Finished & f) { goalFinished(goal, f); },
},
how
);
+ updateStatistics();
}
void Worker::removeGoal(GoalPtr goal)
@@ -244,80 +248,27 @@ void Worker::wakeUp(GoalPtr goal)
}
-void Worker::childStarted(GoalPtr goal, const std::set<int> & fds,
- bool inBuildSlot)
+void Worker::childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> promise)
{
- Child child;
- child.goal = goal;
- child.goal2 = goal.get();
- child.fds = fds;
- child.timeStarted = child.lastOutput = steady_time_point::clock::now();
- child.inBuildSlot = inBuildSlot;
- children.emplace_back(child);
- if (inBuildSlot) {
- switch (goal->jobCategory()) {
- case JobCategory::Substitution:
- nrSubstitutions++;
- break;
- case JobCategory::Build:
- nrLocalBuilds++;
- break;
- default:
- abort();
- }
- }
+ children.add(promise
+ .then([this, goal](auto result) {
+ if (result.has_value()) {
+ handleWorkResult(goal, std::move(result.assume_value()));
+ } else {
+ childException = result.assume_error();
+ }
+ })
+ .attach(Finally{[this, goal] {
+ childTerminated(goal);
+ }}));
}
-void Worker::childTerminated(Goal * goal)
+void Worker::childTerminated(GoalPtr goal)
{
- auto i = std::find_if(children.begin(), children.end(),
- [&](const Child & child) { return child.goal2 == goal; });
- if (i == children.end()) return;
-
- if (i->inBuildSlot) {
- switch (goal->jobCategory()) {
- case JobCategory::Substitution:
- assert(nrSubstitutions > 0);
- nrSubstitutions--;
- break;
- case JobCategory::Build:
- assert(nrLocalBuilds > 0);
- nrLocalBuilds--;
- break;
- default:
- abort();
- }
+ if (childFinished) {
+ childFinished->fulfill();
}
-
- children.erase(i);
-
- /* Wake up goals waiting for a build slot. */
- for (auto & j : wantingToBuild) {
- GoalPtr goal = j.lock();
- if (goal) wakeUp(goal);
- }
-
- wantingToBuild.clear();
-}
-
-
-void Worker::waitForBuildSlot(GoalPtr goal)
-{
- goal->trace("wait for build slot");
- bool isSubstitutionGoal = goal->jobCategory() == JobCategory::Substitution;
- if ((!isSubstitutionGoal && nrLocalBuilds < settings.maxBuildJobs) ||
- (isSubstitutionGoal && nrSubstitutions < settings.maxSubstitutionJobs))
- wakeUp(goal); /* we can do it right away */
- else
- wantingToBuild.insert(goal);
-}
-
-
-void Worker::waitForAWhile(GoalPtr goal)
-{
- debug("wait for a while");
- waitingForAWhile.insert(goal);
}
@@ -342,7 +293,7 @@ void Worker::updateStatistics()
}
}
-Goals Worker::run(std::function<Goals (GoalFactory &)> req)
+std::vector<GoalPtr> Worker::run(std::function<Targets (GoalFactory &)> req)
{
auto _topGoals = req(goalFactory());
@@ -352,7 +303,10 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req)
updateStatistics();
- topGoals = _topGoals;
+ topGoals.clear();
+ for (auto & [goal, _promise] : _topGoals) {
+ topGoals.insert(goal);
+ }
debug("entered goal loop");
@@ -375,13 +329,12 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req)
awake.clear();
for (auto & goal : awake2) {
checkInterrupt();
- /* Make sure that we are always allowed to run at least one substitution.
- This prevents infinite waiting. */
- const bool inSlot = goal->jobCategory() == JobCategory::Substitution
- ? nrSubstitutions < std::max(1U, (unsigned int) settings.maxSubstitutionJobs)
- : nrLocalBuilds < settings.maxBuildJobs;
- handleWorkResult(goal, goal->work(inSlot).wait(aio.waitScope).value());
- updateStatistics();
+ auto result = goal->work();
+ if (result.poll(aio.waitScope)) {
+ handleWorkResult(goal, result.wait(aio.waitScope).value());
+ } else {
+ childStarted(goal, std::move(result));
+ }
if (topGoals.empty()) break; // stuff may have been cancelled
}
@@ -390,169 +343,46 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req)
if (topGoals.empty()) break;
/* Wait for input. */
- if (!children.empty() || !waitingForAWhile.empty())
+ if (!children.isEmpty())
waitForInput();
else {
assert(!awake.empty());
}
+
+ if (childException) {
+ std::rethrow_exception(childException);
+ }
}
/* If --keep-going is not set, it's possible that the main goal
exited while some of its subgoals were still active. But if
--keep-going *is* set, then they must all be finished now. */
assert(!settings.keepGoing || awake.empty());
- assert(!settings.keepGoing || wantingToBuild.empty());
- assert(!settings.keepGoing || children.empty());
+ assert(!settings.keepGoing || children.isEmpty());
- return _topGoals;
+ std::vector<GoalPtr> results;
+ for (auto & [i, _p] : _topGoals) {
+ results.push_back(i);
+ }
+ return results;
}
void Worker::waitForInput()
{
printMsg(lvlVomit, "waiting for children");
- /* Process output from the file descriptors attached to the
- children, namely log output and output path creation commands.
- We also use this to detect child termination: if we get EOF on
- the logger pipe of a build, we assume that the builder has
- terminated. */
-
- bool useTimeout = false;
- long timeout = 0;
- auto before = steady_time_point::clock::now();
-
- /* If we're monitoring for silence on stdout/stderr, or if there
- is a build timeout, then wait for input until the first
- deadline for any child. */
- auto nearest = steady_time_point::max(); // nearest deadline
- if (settings.minFree.get() != 0)
- // Periodicallty wake up to see if we need to run the garbage collector.
- nearest = before + std::chrono::seconds(10);
- for (auto & i : children) {
- if (auto goal = i.goal.lock()) {
- if (!goal->respectsTimeouts()) continue;
- if (0 != settings.maxSilentTime)
- nearest = std::min(nearest, i.lastOutput + std::chrono::seconds(settings.maxSilentTime));
- if (0 != settings.buildTimeout)
- nearest = std::min(nearest, i.timeStarted + std::chrono::seconds(settings.buildTimeout));
- }
- }
- if (nearest != steady_time_point::max()) {
- timeout = std::max(1L, (long) std::chrono::duration_cast<std::chrono::seconds>(nearest - before).count());
- useTimeout = true;
- }
-
- /* If we are polling goals that are waiting for a lock, then wake
- up after a few seconds at most. */
- if (!waitingForAWhile.empty()) {
- useTimeout = true;
- if (lastWokenUp == steady_time_point::min() || lastWokenUp > before) lastWokenUp = before;
- timeout = std::max(1L,
- (long) std::chrono::duration_cast<std::chrono::seconds>(
- lastWokenUp + std::chrono::seconds(settings.pollInterval) - before).count());
- } else lastWokenUp = steady_time_point::min();
-
- if (useTimeout)
- vomit("sleeping %d seconds", timeout);
-
- /* Use select() to wait for the input side of any logger pipe to
- become `available'. Note that `available' (i.e., non-blocking)
- includes EOF. */
- std::vector<struct pollfd> pollStatus;
- std::map<int, size_t> fdToPollStatus;
- for (auto & i : children) {
- for (auto & j : i.fds) {
- pollStatus.push_back((struct pollfd) { .fd = j, .events = POLLIN });
- fdToPollStatus[j] = pollStatus.size() - 1;
- }
- }
-
- if (poll(pollStatus.data(), pollStatus.size(),
- useTimeout ? timeout * 1000 : -1) == -1) {
- if (errno == EINTR) return;
- throw SysError("waiting for input");
- }
+ auto waitFor = [&]{
+ auto pair = kj::newPromiseAndFulfiller<void>();
+ this->childFinished = kj::mv(pair.fulfiller);
+ return kj::mv(pair.promise);
+ }();
- auto after = steady_time_point::clock::now();
-
- /* Process all available file descriptors. FIXME: this is
- O(children * fds). */
- decltype(children)::iterator i;
- for (auto j = children.begin(); j != children.end(); j = i) {
- i = std::next(j);
-
- checkInterrupt();
-
- GoalPtr goal = j->goal.lock();
- assert(goal);
-
- if (!goal->exitCode.has_value() &&
- 0 != settings.maxSilentTime &&
- goal->respectsTimeouts() &&
- after - j->lastOutput >= std::chrono::seconds(settings.maxSilentTime))
- {
- handleWorkResult(
- goal,
- goal->timedOut(Error(
- "%1% timed out after %2% seconds of silence",
- goal->getName(),
- settings.maxSilentTime
- ))
- );
- continue;
- }
-
- else if (!goal->exitCode.has_value() &&
- 0 != settings.buildTimeout &&
- goal->respectsTimeouts() &&
- after - j->timeStarted >= std::chrono::seconds(settings.buildTimeout))
- {
- handleWorkResult(
- goal,
- goal->timedOut(
- Error("%1% timed out after %2% seconds", goal->getName(), settings.buildTimeout)
- )
- );
- continue;
- }
-
- std::set<int> fds2(j->fds);
- std::vector<unsigned char> buffer(4096);
- for (auto & k : fds2) {
- const auto fdPollStatusId = get(fdToPollStatus, k);
- assert(fdPollStatusId);
- assert(*fdPollStatusId < pollStatus.size());
- if (pollStatus.at(*fdPollStatusId).revents) {
- ssize_t rd = ::read(k, buffer.data(), buffer.size());
- // FIXME: is there a cleaner way to handle pt close
- // than EIO? Is this even standard?
- if (rd == 0 || (rd == -1 && errno == EIO)) {
- debug("%1%: got EOF", goal->getName());
- goal->handleEOF(k);
- handleWorkResult(goal, Goal::ContinueImmediately{});
- j->fds.erase(k);
- } else if (rd == -1) {
- if (errno != EINTR)
- throw SysError("%s: read failed", goal->getName());
- } else {
- printMsg(lvlVomit, "%1%: read %2% bytes",
- goal->getName(), rd);
- std::string_view data(charptr_cast<char *>(buffer.data()), rd);
- j->lastOutput = after;
- handleWorkResult(goal, goal->handleChildOutput(k, data));
- }
- }
- }
+ if (settings.minFree.get() != 0) {
+ // Periodicallty wake up to see if we need to run the garbage collector.
+ waitFor = waitFor.exclusiveJoin(aio.provider->getTimer().afterDelay(10 * kj::SECONDS));
}
- if (!waitingForAWhile.empty() && lastWokenUp + std::chrono::seconds(settings.pollInterval) <= after) {
- lastWokenUp = after;
- for (auto & i : waitingForAWhile) {
- GoalPtr goal = i.lock();
- if (goal) wakeUp(goal);
- }
- waitingForAWhile.clear();
- }
+ waitFor.wait(aio.waitScope);
}