diff options
Diffstat (limited to 'src/libstore/build/worker.cc')
-rw-r--r-- | src/libstore/build/worker.cc | 398 |
1 files changed, 114 insertions, 284 deletions
diff --git a/src/libstore/build/worker.cc b/src/libstore/build/worker.cc index ee45c7e3f..68071a94c 100644 --- a/src/libstore/build/worker.cc +++ b/src/libstore/build/worker.cc @@ -7,10 +7,19 @@ #include "signals.hh" #include "hook-instance.hh" // IWYU pragma: keep -#include <poll.h> - namespace nix { +namespace { +struct ErrorHandler : kj::TaskSet::ErrorHandler +{ + void taskFailed(kj::Exception && e) override + { + printError("unexpected async failure in Worker: %s", kj::str(e).cStr()); + abort(); + } +} errorHandler; +} + Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio) : act(*logger, actRealise) , actDerivations(*logger, actBuilds) @@ -18,11 +27,13 @@ Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio) , store(store) , evalStore(evalStore) , aio(aio) + /* Make sure that we are always allowed to run at least one substitution. + This prevents infinite waiting. */ + , substitutions(std::max<unsigned>(1, settings.maxSubstitutionJobs)) + , localBuilds(settings.maxBuildJobs) + , children(errorHandler) { /* Debugging: prevent recursive workers. */ - nrLocalBuilds = 0; - nrSubstitutions = 0; - lastWokenUp = steady_time_point::min(); } @@ -33,6 +44,7 @@ Worker::~Worker() are in trouble, since goals may call childTerminated() etc. in their destructors). */ topGoals.clear(); + children.clear(); assert(expectedSubstitutions == 0); assert(expectedDownloadSize == 0); @@ -40,26 +52,28 @@ Worker::~Worker() } -std::shared_ptr<DerivationGoal> Worker::makeDerivationGoalCommon( +std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<void>> Worker::makeDerivationGoalCommon( const StorePath & drvPath, const OutputsSpec & wantedOutputs, std::function<std::shared_ptr<DerivationGoal>()> mkDrvGoal) { - std::weak_ptr<DerivationGoal> & goal_weak = derivationGoals[drvPath]; - std::shared_ptr<DerivationGoal> goal = goal_weak.lock(); + auto & goal_weak = derivationGoals[drvPath]; + std::shared_ptr<DerivationGoal> goal = goal_weak.goal.lock(); if (!goal) { goal = mkDrvGoal(); - goal_weak = goal; + goal->notify = std::move(goal_weak.fulfiller); + goal_weak.goal = goal; wakeUp(goal); } else { goal->addWantedOutputs(wantedOutputs); } - return goal; + return {goal, goal_weak.promise->addBranch()}; } -std::shared_ptr<DerivationGoal> Worker::makeDerivationGoal(const StorePath & drvPath, - const OutputsSpec & wantedOutputs, BuildMode buildMode) +std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<void>> Worker::makeDerivationGoal( + const StorePath & drvPath, const OutputsSpec & wantedOutputs, BuildMode buildMode +) { return makeDerivationGoalCommon( drvPath, @@ -77,8 +91,12 @@ std::shared_ptr<DerivationGoal> Worker::makeDerivationGoal(const StorePath & drv } -std::shared_ptr<DerivationGoal> Worker::makeBasicDerivationGoal(const StorePath & drvPath, - const BasicDerivation & drv, const OutputsSpec & wantedOutputs, BuildMode buildMode) +std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<void>> Worker::makeBasicDerivationGoal( + const StorePath & drvPath, + const BasicDerivation & drv, + const OutputsSpec & wantedOutputs, + BuildMode buildMode +) { return makeDerivationGoalCommon( drvPath, @@ -96,55 +114,63 @@ std::shared_ptr<DerivationGoal> Worker::makeBasicDerivationGoal(const StorePath } -std::shared_ptr<PathSubstitutionGoal> Worker::makePathSubstitutionGoal(const StorePath & path, RepairFlag repair, std::optional<ContentAddress> ca) +std::pair<std::shared_ptr<PathSubstitutionGoal>, kj::Promise<void>> +Worker::makePathSubstitutionGoal( + const StorePath & path, RepairFlag repair, std::optional<ContentAddress> ca +) { - std::weak_ptr<PathSubstitutionGoal> & goal_weak = substitutionGoals[path]; - auto goal = goal_weak.lock(); // FIXME + auto & goal_weak = substitutionGoals[path]; + auto goal = goal_weak.goal.lock(); // FIXME if (!goal) { goal = std::make_shared<PathSubstitutionGoal>(path, *this, running, repair, ca); - goal_weak = goal; + goal->notify = std::move(goal_weak.fulfiller); + goal_weak.goal = goal; wakeUp(goal); } - return goal; + return {goal, goal_weak.promise->addBranch()}; } -std::shared_ptr<DrvOutputSubstitutionGoal> Worker::makeDrvOutputSubstitutionGoal(const DrvOutput& id, RepairFlag repair, std::optional<ContentAddress> ca) +std::pair<std::shared_ptr<DrvOutputSubstitutionGoal>, kj::Promise<void>> +Worker::makeDrvOutputSubstitutionGoal( + const DrvOutput & id, RepairFlag repair, std::optional<ContentAddress> ca +) { - std::weak_ptr<DrvOutputSubstitutionGoal> & goal_weak = drvOutputSubstitutionGoals[id]; - auto goal = goal_weak.lock(); // FIXME + auto & goal_weak = drvOutputSubstitutionGoals[id]; + auto goal = goal_weak.goal.lock(); // FIXME if (!goal) { goal = std::make_shared<DrvOutputSubstitutionGoal>(id, *this, running, repair, ca); - goal_weak = goal; + goal->notify = std::move(goal_weak.fulfiller); + goal_weak.goal = goal; wakeUp(goal); } - return goal; + return {goal, goal_weak.promise->addBranch()}; } -GoalPtr Worker::makeGoal(const DerivedPath & req, BuildMode buildMode) +std::pair<GoalPtr, kj::Promise<void>> Worker::makeGoal(const DerivedPath & req, BuildMode buildMode) { return std::visit(overloaded { - [&](const DerivedPath::Built & bfd) -> GoalPtr { + [&](const DerivedPath::Built & bfd) -> std::pair<GoalPtr, kj::Promise<void>> { if (auto bop = std::get_if<DerivedPath::Opaque>(&*bfd.drvPath)) return makeDerivationGoal(bop->path, bfd.outputs, buildMode); else throw UnimplementedError("Building dynamic derivations in one shot is not yet implemented."); }, - [&](const DerivedPath::Opaque & bo) -> GoalPtr { + [&](const DerivedPath::Opaque & bo) -> std::pair<GoalPtr, kj::Promise<void>> { return makePathSubstitutionGoal(bo.path, buildMode == bmRepair ? Repair : NoRepair); }, }, req.raw()); } -template<typename K, typename G> -static void removeGoal(std::shared_ptr<G> goal, std::map<K, std::weak_ptr<G>> & goalMap) +template<typename G> +static void removeGoal(std::shared_ptr<G> goal, auto & goalMap) { /* !!! inefficient */ for (auto i = goalMap.begin(); i != goalMap.end(); ) - if (i->second.lock() == goal) { + if (i->second.goal.lock() == goal) { auto j = i; ++j; goalMap.erase(i); i = j; @@ -165,33 +191,8 @@ void Worker::goalFinished(GoalPtr goal, Goal::Finished & f) hashMismatch |= f.hashMismatch; checkMismatch |= f.checkMismatch; - for (auto & i : goal->waiters) { - if (GoalPtr waiting = i.lock()) { - assert(waiting->waitees.count(goal)); - waiting->waitees.erase(goal); - - waiting->trace(fmt("waitee '%s' done; %d left", goal->name, waiting->waitees.size())); - - if (f.exitCode != Goal::ecSuccess) ++waiting->nrFailed; - if (f.exitCode == Goal::ecNoSubstituters) ++waiting->nrNoSubstituters; - if (f.exitCode == Goal::ecIncompleteClosure) ++waiting->nrIncompleteClosure; - - if (waiting->waitees.empty() || (f.exitCode == Goal::ecFailed && !settings.keepGoing)) { - /* If we failed and keepGoing is not set, we remove all - remaining waitees. */ - for (auto & i : waiting->waitees) { - i->waiters.extract(waiting); - } - waiting->waitees.clear(); - - wakeUp(waiting); - } - - waiting->waiteeDone(goal); - } - } - goal->waiters.clear(); removeGoal(goal); + goal->notify->fulfill(); goal->cleanup(); } @@ -200,20 +201,23 @@ void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how) std::visit( overloaded{ [&](Goal::StillAlive) {}, - [&](Goal::WaitForSlot) { waitForBuildSlot(goal); }, - [&](Goal::WaitForAWhile) { waitForAWhile(goal); }, [&](Goal::ContinueImmediately) { wakeUp(goal); }, - [&](Goal::WaitForGoals & w) { - for (auto & dep : w.goals) { - goal->waitees.insert(dep); - dep->waiters.insert(goal); - } + [&](Goal::WaitForWorld & w) { + childStarted(goal, w.promise.then([](auto r) -> Result<Goal::WorkResult> { + if (r.has_value()) { + return {Goal::ContinueImmediately{}}; + } else if (r.has_error()) { + return {std::move(r).error()}; + } else { + return r.exception(); + } + })); }, - [&](Goal::WaitForWorld & w) { childStarted(goal, w.fds, w.inBuildSlot); }, [&](Goal::Finished & f) { goalFinished(goal, f); }, }, how ); + updateStatistics(); } void Worker::removeGoal(GoalPtr goal) @@ -244,80 +248,27 @@ void Worker::wakeUp(GoalPtr goal) } -void Worker::childStarted(GoalPtr goal, const std::set<int> & fds, - bool inBuildSlot) +void Worker::childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> promise) { - Child child; - child.goal = goal; - child.goal2 = goal.get(); - child.fds = fds; - child.timeStarted = child.lastOutput = steady_time_point::clock::now(); - child.inBuildSlot = inBuildSlot; - children.emplace_back(child); - if (inBuildSlot) { - switch (goal->jobCategory()) { - case JobCategory::Substitution: - nrSubstitutions++; - break; - case JobCategory::Build: - nrLocalBuilds++; - break; - default: - abort(); - } - } + children.add(promise + .then([this, goal](auto result) { + if (result.has_value()) { + handleWorkResult(goal, std::move(result.assume_value())); + } else { + childException = result.assume_error(); + } + }) + .attach(Finally{[this, goal] { + childTerminated(goal); + }})); } -void Worker::childTerminated(Goal * goal) +void Worker::childTerminated(GoalPtr goal) { - auto i = std::find_if(children.begin(), children.end(), - [&](const Child & child) { return child.goal2 == goal; }); - if (i == children.end()) return; - - if (i->inBuildSlot) { - switch (goal->jobCategory()) { - case JobCategory::Substitution: - assert(nrSubstitutions > 0); - nrSubstitutions--; - break; - case JobCategory::Build: - assert(nrLocalBuilds > 0); - nrLocalBuilds--; - break; - default: - abort(); - } + if (childFinished) { + childFinished->fulfill(); } - - children.erase(i); - - /* Wake up goals waiting for a build slot. */ - for (auto & j : wantingToBuild) { - GoalPtr goal = j.lock(); - if (goal) wakeUp(goal); - } - - wantingToBuild.clear(); -} - - -void Worker::waitForBuildSlot(GoalPtr goal) -{ - goal->trace("wait for build slot"); - bool isSubstitutionGoal = goal->jobCategory() == JobCategory::Substitution; - if ((!isSubstitutionGoal && nrLocalBuilds < settings.maxBuildJobs) || - (isSubstitutionGoal && nrSubstitutions < settings.maxSubstitutionJobs)) - wakeUp(goal); /* we can do it right away */ - else - wantingToBuild.insert(goal); -} - - -void Worker::waitForAWhile(GoalPtr goal) -{ - debug("wait for a while"); - waitingForAWhile.insert(goal); } @@ -342,7 +293,7 @@ void Worker::updateStatistics() } } -Goals Worker::run(std::function<Goals (GoalFactory &)> req) +std::vector<GoalPtr> Worker::run(std::function<Targets (GoalFactory &)> req) { auto _topGoals = req(goalFactory()); @@ -352,7 +303,10 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req) updateStatistics(); - topGoals = _topGoals; + topGoals.clear(); + for (auto & [goal, _promise] : _topGoals) { + topGoals.insert(goal); + } debug("entered goal loop"); @@ -375,13 +329,12 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req) awake.clear(); for (auto & goal : awake2) { checkInterrupt(); - /* Make sure that we are always allowed to run at least one substitution. - This prevents infinite waiting. */ - const bool inSlot = goal->jobCategory() == JobCategory::Substitution - ? nrSubstitutions < std::max(1U, (unsigned int) settings.maxSubstitutionJobs) - : nrLocalBuilds < settings.maxBuildJobs; - handleWorkResult(goal, goal->work(inSlot).wait(aio.waitScope).value()); - updateStatistics(); + auto result = goal->work(); + if (result.poll(aio.waitScope)) { + handleWorkResult(goal, result.wait(aio.waitScope).value()); + } else { + childStarted(goal, std::move(result)); + } if (topGoals.empty()) break; // stuff may have been cancelled } @@ -390,169 +343,46 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req) if (topGoals.empty()) break; /* Wait for input. */ - if (!children.empty() || !waitingForAWhile.empty()) + if (!children.isEmpty()) waitForInput(); else { assert(!awake.empty()); } + + if (childException) { + std::rethrow_exception(childException); + } } /* If --keep-going is not set, it's possible that the main goal exited while some of its subgoals were still active. But if --keep-going *is* set, then they must all be finished now. */ assert(!settings.keepGoing || awake.empty()); - assert(!settings.keepGoing || wantingToBuild.empty()); - assert(!settings.keepGoing || children.empty()); + assert(!settings.keepGoing || children.isEmpty()); - return _topGoals; + std::vector<GoalPtr> results; + for (auto & [i, _p] : _topGoals) { + results.push_back(i); + } + return results; } void Worker::waitForInput() { printMsg(lvlVomit, "waiting for children"); - /* Process output from the file descriptors attached to the - children, namely log output and output path creation commands. - We also use this to detect child termination: if we get EOF on - the logger pipe of a build, we assume that the builder has - terminated. */ - - bool useTimeout = false; - long timeout = 0; - auto before = steady_time_point::clock::now(); - - /* If we're monitoring for silence on stdout/stderr, or if there - is a build timeout, then wait for input until the first - deadline for any child. */ - auto nearest = steady_time_point::max(); // nearest deadline - if (settings.minFree.get() != 0) - // Periodicallty wake up to see if we need to run the garbage collector. - nearest = before + std::chrono::seconds(10); - for (auto & i : children) { - if (auto goal = i.goal.lock()) { - if (!goal->respectsTimeouts()) continue; - if (0 != settings.maxSilentTime) - nearest = std::min(nearest, i.lastOutput + std::chrono::seconds(settings.maxSilentTime)); - if (0 != settings.buildTimeout) - nearest = std::min(nearest, i.timeStarted + std::chrono::seconds(settings.buildTimeout)); - } - } - if (nearest != steady_time_point::max()) { - timeout = std::max(1L, (long) std::chrono::duration_cast<std::chrono::seconds>(nearest - before).count()); - useTimeout = true; - } - - /* If we are polling goals that are waiting for a lock, then wake - up after a few seconds at most. */ - if (!waitingForAWhile.empty()) { - useTimeout = true; - if (lastWokenUp == steady_time_point::min() || lastWokenUp > before) lastWokenUp = before; - timeout = std::max(1L, - (long) std::chrono::duration_cast<std::chrono::seconds>( - lastWokenUp + std::chrono::seconds(settings.pollInterval) - before).count()); - } else lastWokenUp = steady_time_point::min(); - - if (useTimeout) - vomit("sleeping %d seconds", timeout); - - /* Use select() to wait for the input side of any logger pipe to - become `available'. Note that `available' (i.e., non-blocking) - includes EOF. */ - std::vector<struct pollfd> pollStatus; - std::map<int, size_t> fdToPollStatus; - for (auto & i : children) { - for (auto & j : i.fds) { - pollStatus.push_back((struct pollfd) { .fd = j, .events = POLLIN }); - fdToPollStatus[j] = pollStatus.size() - 1; - } - } - - if (poll(pollStatus.data(), pollStatus.size(), - useTimeout ? timeout * 1000 : -1) == -1) { - if (errno == EINTR) return; - throw SysError("waiting for input"); - } + auto waitFor = [&]{ + auto pair = kj::newPromiseAndFulfiller<void>(); + this->childFinished = kj::mv(pair.fulfiller); + return kj::mv(pair.promise); + }(); - auto after = steady_time_point::clock::now(); - - /* Process all available file descriptors. FIXME: this is - O(children * fds). */ - decltype(children)::iterator i; - for (auto j = children.begin(); j != children.end(); j = i) { - i = std::next(j); - - checkInterrupt(); - - GoalPtr goal = j->goal.lock(); - assert(goal); - - if (!goal->exitCode.has_value() && - 0 != settings.maxSilentTime && - goal->respectsTimeouts() && - after - j->lastOutput >= std::chrono::seconds(settings.maxSilentTime)) - { - handleWorkResult( - goal, - goal->timedOut(Error( - "%1% timed out after %2% seconds of silence", - goal->getName(), - settings.maxSilentTime - )) - ); - continue; - } - - else if (!goal->exitCode.has_value() && - 0 != settings.buildTimeout && - goal->respectsTimeouts() && - after - j->timeStarted >= std::chrono::seconds(settings.buildTimeout)) - { - handleWorkResult( - goal, - goal->timedOut( - Error("%1% timed out after %2% seconds", goal->getName(), settings.buildTimeout) - ) - ); - continue; - } - - std::set<int> fds2(j->fds); - std::vector<unsigned char> buffer(4096); - for (auto & k : fds2) { - const auto fdPollStatusId = get(fdToPollStatus, k); - assert(fdPollStatusId); - assert(*fdPollStatusId < pollStatus.size()); - if (pollStatus.at(*fdPollStatusId).revents) { - ssize_t rd = ::read(k, buffer.data(), buffer.size()); - // FIXME: is there a cleaner way to handle pt close - // than EIO? Is this even standard? - if (rd == 0 || (rd == -1 && errno == EIO)) { - debug("%1%: got EOF", goal->getName()); - goal->handleEOF(k); - handleWorkResult(goal, Goal::ContinueImmediately{}); - j->fds.erase(k); - } else if (rd == -1) { - if (errno != EINTR) - throw SysError("%s: read failed", goal->getName()); - } else { - printMsg(lvlVomit, "%1%: read %2% bytes", - goal->getName(), rd); - std::string_view data(charptr_cast<char *>(buffer.data()), rd); - j->lastOutput = after; - handleWorkResult(goal, goal->handleChildOutput(k, data)); - } - } - } + if (settings.minFree.get() != 0) { + // Periodicallty wake up to see if we need to run the garbage collector. + waitFor = waitFor.exclusiveJoin(aio.provider->getTimer().afterDelay(10 * kj::SECONDS)); } - if (!waitingForAWhile.empty() && lastWokenUp + std::chrono::seconds(settings.pollInterval) <= after) { - lastWokenUp = after; - for (auto & i : waitingForAWhile) { - GoalPtr goal = i.lock(); - if (goal) wakeUp(goal); - } - waitingForAWhile.clear(); - } + waitFor.wait(aio.waitScope); } |