aboutsummaryrefslogtreecommitdiff
path: root/src/libstore/build/worker.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstore/build/worker.cc')
-rw-r--r--src/libstore/build/worker.cc605
1 files changed, 175 insertions, 430 deletions
diff --git a/src/libstore/build/worker.cc b/src/libstore/build/worker.cc
index ee45c7e3f..10f58f5d3 100644
--- a/src/libstore/build/worker.cc
+++ b/src/libstore/build/worker.cc
@@ -1,3 +1,4 @@
+#include "async-collect.hh"
#include "charptr-cast.hh"
#include "worker.hh"
#include "finally.hh"
@@ -6,11 +7,22 @@
#include "local-derivation-goal.hh"
#include "signals.hh"
#include "hook-instance.hh" // IWYU pragma: keep
-
-#include <poll.h>
+#include <boost/outcome/try.hpp>
+#include <kj/vector.h>
namespace nix {
+namespace {
+struct ErrorHandler : kj::TaskSet::ErrorHandler
+{
+ void taskFailed(kj::Exception && e) override
+ {
+ printError("unexpected async failure in Worker: %s", kj::str(e).cStr());
+ abort();
+ }
+} errorHandler;
+}
+
Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio)
: act(*logger, actRealise)
, actDerivations(*logger, actBuilds)
@@ -18,11 +30,13 @@ Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio)
, store(store)
, evalStore(evalStore)
, aio(aio)
+ /* Make sure that we are always allowed to run at least one substitution.
+ This prevents infinite waiting. */
+ , substitutions(std::max<unsigned>(1, settings.maxSubstitutionJobs))
+ , localBuilds(settings.maxBuildJobs)
+ , children(errorHandler)
{
/* Debugging: prevent recursive workers. */
- nrLocalBuilds = 0;
- nrSubstitutions = 0;
- lastWokenUp = steady_time_point::min();
}
@@ -32,7 +46,11 @@ Worker::~Worker()
goals that refer to this worker should be gone. (Otherwise we
are in trouble, since goals may call childTerminated() etc. in
their destructors). */
- topGoals.clear();
+ children.clear();
+
+ derivationGoals.clear();
+ drvOutputSubstitutionGoals.clear();
+ substitutionGoals.clear();
assert(expectedSubstitutions == 0);
assert(expectedDownloadSize == 0);
@@ -40,292 +58,158 @@ Worker::~Worker()
}
-std::shared_ptr<DerivationGoal> Worker::makeDerivationGoalCommon(
- const StorePath & drvPath,
- const OutputsSpec & wantedOutputs,
- std::function<std::shared_ptr<DerivationGoal>()> mkDrvGoal)
+template<typename ID, std::derived_from<Goal> G>
+std::pair<std::shared_ptr<G>, kj::Promise<Result<Goal::WorkResult>>> Worker::makeGoalCommon(
+ std::map<ID, CachedGoal<G>> & map,
+ const ID & key,
+ InvocableR<std::unique_ptr<G>> auto create,
+ InvocableR<bool, G &> auto modify
+)
{
- std::weak_ptr<DerivationGoal> & goal_weak = derivationGoals[drvPath];
- std::shared_ptr<DerivationGoal> goal = goal_weak.lock();
- if (!goal) {
- goal = mkDrvGoal();
- goal_weak = goal;
- wakeUp(goal);
- } else {
- goal->addWantedOutputs(wantedOutputs);
+ auto [it, _inserted] = map.try_emplace(key);
+ // try twice to create the goal. we can only loop if we hit the continue,
+ // and then we only want to recreate the goal *once*. concurrent accesses
+ // to the worker are not sound, we want to catch them if at all possible.
+ for ([[maybe_unused]] auto _attempt : {1, 2}) {
+ auto & cachedGoal = it->second;
+ auto & goal = cachedGoal.goal;
+ if (!goal) {
+ goal = create();
+ // do not start working immediately. if we are not yet running we
+ // may create dependencies as though they were toplevel goals, in
+ // which case the dependencies will not report build errors. when
+ // we are running we may be called for this same goal more times,
+ // and then we want to modify rather than recreate when possible.
+ auto removeWhenDone = [goal, &map, it] {
+ // c++ lambda coroutine capture semantics are *so* fucked up.
+ return [](auto goal, auto & map, auto it) -> kj::Promise<Result<Goal::WorkResult>> {
+ auto result = co_await goal->work();
+ // a concurrent call to makeGoalCommon may have reset our
+ // cached goal and replaced it with a new instance. don't
+ // remove the goal in this case, otherwise we will crash.
+ if (goal == it->second.goal) {
+ map.erase(it);
+ }
+ co_return result;
+ }(goal, map, it);
+ };
+ cachedGoal.promise = kj::evalLater(std::move(removeWhenDone)).fork();
+ children.add(cachedGoal.promise.addBranch().then([this](auto _result) {
+ if (_result.has_value()) {
+ auto & result = _result.value();
+ permanentFailure |= result.permanentFailure;
+ timedOut |= result.timedOut;
+ hashMismatch |= result.hashMismatch;
+ checkMismatch |= result.checkMismatch;
+ }
+ }));
+ } else {
+ if (!modify(*goal)) {
+ cachedGoal = {};
+ continue;
+ }
+ }
+ return {goal, cachedGoal.promise.addBranch()};
}
- return goal;
+ assert(false && "could not make a goal. possible concurrent worker access");
}
-std::shared_ptr<DerivationGoal> Worker::makeDerivationGoal(const StorePath & drvPath,
- const OutputsSpec & wantedOutputs, BuildMode buildMode)
+std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<Result<Goal::WorkResult>>> Worker::makeDerivationGoal(
+ const StorePath & drvPath, const OutputsSpec & wantedOutputs, BuildMode buildMode
+)
{
- return makeDerivationGoalCommon(
+ return makeGoalCommon(
+ derivationGoals,
drvPath,
- wantedOutputs,
- [&]() -> std::shared_ptr<DerivationGoal> {
+ [&]() -> std::unique_ptr<DerivationGoal> {
return !dynamic_cast<LocalStore *>(&store)
- ? std::make_shared<DerivationGoal>(
+ ? std::make_unique<DerivationGoal>(
drvPath, wantedOutputs, *this, running, buildMode
)
: LocalDerivationGoal::makeLocalDerivationGoal(
drvPath, wantedOutputs, *this, running, buildMode
);
- }
+ },
+ [&](DerivationGoal & g) { return g.addWantedOutputs(wantedOutputs); }
);
}
-std::shared_ptr<DerivationGoal> Worker::makeBasicDerivationGoal(const StorePath & drvPath,
- const BasicDerivation & drv, const OutputsSpec & wantedOutputs, BuildMode buildMode)
+std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<Result<Goal::WorkResult>>> Worker::makeBasicDerivationGoal(
+ const StorePath & drvPath,
+ const BasicDerivation & drv,
+ const OutputsSpec & wantedOutputs,
+ BuildMode buildMode
+)
{
- return makeDerivationGoalCommon(
+ return makeGoalCommon(
+ derivationGoals,
drvPath,
- wantedOutputs,
- [&]() -> std::shared_ptr<DerivationGoal> {
+ [&]() -> std::unique_ptr<DerivationGoal> {
return !dynamic_cast<LocalStore *>(&store)
- ? std::make_shared<DerivationGoal>(
+ ? std::make_unique<DerivationGoal>(
drvPath, drv, wantedOutputs, *this, running, buildMode
)
: LocalDerivationGoal::makeLocalDerivationGoal(
drvPath, drv, wantedOutputs, *this, running, buildMode
);
- }
+ },
+ [&](DerivationGoal & g) { return g.addWantedOutputs(wantedOutputs); }
);
}
-std::shared_ptr<PathSubstitutionGoal> Worker::makePathSubstitutionGoal(const StorePath & path, RepairFlag repair, std::optional<ContentAddress> ca)
+std::pair<std::shared_ptr<PathSubstitutionGoal>, kj::Promise<Result<Goal::WorkResult>>>
+Worker::makePathSubstitutionGoal(
+ const StorePath & path, RepairFlag repair, std::optional<ContentAddress> ca
+)
{
- std::weak_ptr<PathSubstitutionGoal> & goal_weak = substitutionGoals[path];
- auto goal = goal_weak.lock(); // FIXME
- if (!goal) {
- goal = std::make_shared<PathSubstitutionGoal>(path, *this, running, repair, ca);
- goal_weak = goal;
- wakeUp(goal);
- }
- return goal;
+ return makeGoalCommon(
+ substitutionGoals,
+ path,
+ [&] { return std::make_unique<PathSubstitutionGoal>(path, *this, running, repair, ca); },
+ [&](auto &) { return true; }
+ );
}
-std::shared_ptr<DrvOutputSubstitutionGoal> Worker::makeDrvOutputSubstitutionGoal(const DrvOutput& id, RepairFlag repair, std::optional<ContentAddress> ca)
+std::pair<std::shared_ptr<DrvOutputSubstitutionGoal>, kj::Promise<Result<Goal::WorkResult>>>
+Worker::makeDrvOutputSubstitutionGoal(
+ const DrvOutput & id, RepairFlag repair, std::optional<ContentAddress> ca
+)
{
- std::weak_ptr<DrvOutputSubstitutionGoal> & goal_weak = drvOutputSubstitutionGoals[id];
- auto goal = goal_weak.lock(); // FIXME
- if (!goal) {
- goal = std::make_shared<DrvOutputSubstitutionGoal>(id, *this, running, repair, ca);
- goal_weak = goal;
- wakeUp(goal);
- }
- return goal;
+ return makeGoalCommon(
+ drvOutputSubstitutionGoals,
+ id,
+ [&] { return std::make_unique<DrvOutputSubstitutionGoal>(id, *this, running, repair, ca); },
+ [&](auto &) { return true; }
+ );
}
-GoalPtr Worker::makeGoal(const DerivedPath & req, BuildMode buildMode)
+std::pair<GoalPtr, kj::Promise<Result<Goal::WorkResult>>> Worker::makeGoal(const DerivedPath & req, BuildMode buildMode)
{
return std::visit(overloaded {
- [&](const DerivedPath::Built & bfd) -> GoalPtr {
+ [&](const DerivedPath::Built & bfd) -> std::pair<GoalPtr, kj::Promise<Result<Goal::WorkResult>>> {
if (auto bop = std::get_if<DerivedPath::Opaque>(&*bfd.drvPath))
return makeDerivationGoal(bop->path, bfd.outputs, buildMode);
else
throw UnimplementedError("Building dynamic derivations in one shot is not yet implemented.");
},
- [&](const DerivedPath::Opaque & bo) -> GoalPtr {
+ [&](const DerivedPath::Opaque & bo) -> std::pair<GoalPtr, kj::Promise<Result<Goal::WorkResult>>> {
return makePathSubstitutionGoal(bo.path, buildMode == bmRepair ? Repair : NoRepair);
},
}, req.raw());
}
+kj::Promise<Result<Worker::Results>> Worker::updateStatistics()
+try {
+ while (true) {
+ statisticsUpdateInhibitor = co_await statisticsUpdateSignal.acquire();
-template<typename K, typename G>
-static void removeGoal(std::shared_ptr<G> goal, std::map<K, std::weak_ptr<G>> & goalMap)
-{
- /* !!! inefficient */
- for (auto i = goalMap.begin();
- i != goalMap.end(); )
- if (i->second.lock() == goal) {
- auto j = i; ++j;
- goalMap.erase(i);
- i = j;
- }
- else ++i;
-}
-
-
-void Worker::goalFinished(GoalPtr goal, Goal::Finished & f)
-{
- goal->trace("done");
- assert(!goal->exitCode.has_value());
- goal->exitCode = f.exitCode;
- goal->ex = f.ex;
-
- permanentFailure |= f.permanentFailure;
- timedOut |= f.timedOut;
- hashMismatch |= f.hashMismatch;
- checkMismatch |= f.checkMismatch;
-
- for (auto & i : goal->waiters) {
- if (GoalPtr waiting = i.lock()) {
- assert(waiting->waitees.count(goal));
- waiting->waitees.erase(goal);
-
- waiting->trace(fmt("waitee '%s' done; %d left", goal->name, waiting->waitees.size()));
-
- if (f.exitCode != Goal::ecSuccess) ++waiting->nrFailed;
- if (f.exitCode == Goal::ecNoSubstituters) ++waiting->nrNoSubstituters;
- if (f.exitCode == Goal::ecIncompleteClosure) ++waiting->nrIncompleteClosure;
-
- if (waiting->waitees.empty() || (f.exitCode == Goal::ecFailed && !settings.keepGoing)) {
- /* If we failed and keepGoing is not set, we remove all
- remaining waitees. */
- for (auto & i : waiting->waitees) {
- i->waiters.extract(waiting);
- }
- waiting->waitees.clear();
-
- wakeUp(waiting);
- }
-
- waiting->waiteeDone(goal);
- }
- }
- goal->waiters.clear();
- removeGoal(goal);
- goal->cleanup();
-}
-
-void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how)
-{
- std::visit(
- overloaded{
- [&](Goal::StillAlive) {},
- [&](Goal::WaitForSlot) { waitForBuildSlot(goal); },
- [&](Goal::WaitForAWhile) { waitForAWhile(goal); },
- [&](Goal::ContinueImmediately) { wakeUp(goal); },
- [&](Goal::WaitForGoals & w) {
- for (auto & dep : w.goals) {
- goal->waitees.insert(dep);
- dep->waiters.insert(goal);
- }
- },
- [&](Goal::WaitForWorld & w) { childStarted(goal, w.fds, w.inBuildSlot); },
- [&](Goal::Finished & f) { goalFinished(goal, f); },
- },
- how
- );
-}
-
-void Worker::removeGoal(GoalPtr goal)
-{
- if (auto drvGoal = std::dynamic_pointer_cast<DerivationGoal>(goal))
- nix::removeGoal(drvGoal, derivationGoals);
- else if (auto subGoal = std::dynamic_pointer_cast<PathSubstitutionGoal>(goal))
- nix::removeGoal(subGoal, substitutionGoals);
- else if (auto subGoal = std::dynamic_pointer_cast<DrvOutputSubstitutionGoal>(goal))
- nix::removeGoal(subGoal, drvOutputSubstitutionGoals);
- else
- assert(false);
-
- if (topGoals.find(goal) != topGoals.end()) {
- topGoals.erase(goal);
- /* If a top-level goal failed, then kill all other goals
- (unless keepGoing was set). */
- if (goal->exitCode == Goal::ecFailed && !settings.keepGoing)
- topGoals.clear();
- }
-}
-
-
-void Worker::wakeUp(GoalPtr goal)
-{
- goal->trace("woken up");
- awake.insert(goal);
-}
-
-
-void Worker::childStarted(GoalPtr goal, const std::set<int> & fds,
- bool inBuildSlot)
-{
- Child child;
- child.goal = goal;
- child.goal2 = goal.get();
- child.fds = fds;
- child.timeStarted = child.lastOutput = steady_time_point::clock::now();
- child.inBuildSlot = inBuildSlot;
- children.emplace_back(child);
- if (inBuildSlot) {
- switch (goal->jobCategory()) {
- case JobCategory::Substitution:
- nrSubstitutions++;
- break;
- case JobCategory::Build:
- nrLocalBuilds++;
- break;
- default:
- abort();
- }
- }
-}
-
-
-void Worker::childTerminated(Goal * goal)
-{
- auto i = std::find_if(children.begin(), children.end(),
- [&](const Child & child) { return child.goal2 == goal; });
- if (i == children.end()) return;
-
- if (i->inBuildSlot) {
- switch (goal->jobCategory()) {
- case JobCategory::Substitution:
- assert(nrSubstitutions > 0);
- nrSubstitutions--;
- break;
- case JobCategory::Build:
- assert(nrLocalBuilds > 0);
- nrLocalBuilds--;
- break;
- default:
- abort();
- }
- }
-
- children.erase(i);
-
- /* Wake up goals waiting for a build slot. */
- for (auto & j : wantingToBuild) {
- GoalPtr goal = j.lock();
- if (goal) wakeUp(goal);
- }
-
- wantingToBuild.clear();
-}
-
-
-void Worker::waitForBuildSlot(GoalPtr goal)
-{
- goal->trace("wait for build slot");
- bool isSubstitutionGoal = goal->jobCategory() == JobCategory::Substitution;
- if ((!isSubstitutionGoal && nrLocalBuilds < settings.maxBuildJobs) ||
- (isSubstitutionGoal && nrSubstitutions < settings.maxSubstitutionJobs))
- wakeUp(goal); /* we can do it right away */
- else
- wantingToBuild.insert(goal);
-}
-
-
-void Worker::waitForAWhile(GoalPtr goal)
-{
- debug("wait for a while");
- waitingForAWhile.insert(goal);
-}
-
-
-void Worker::updateStatistics()
-{
- // only update progress info while running. this notably excludes updating
- // progress info while destroying, which causes the progress bar to assert
- if (running && statisticsOutdated) {
+ // only update progress info while running. this notably excludes updating
+ // progress info while destroying, which causes the progress bar to assert
actDerivations.progress(
doneBuilds, expectedBuilds + doneBuilds, runningBuilds, failedBuilds
);
@@ -338,221 +222,82 @@ void Worker::updateStatistics()
act.setExpected(actFileTransfer, expectedDownloadSize + doneDownloadSize);
act.setExpected(actCopyPath, expectedNarSize + doneNarSize);
- statisticsOutdated = false;
+ // limit to 50fps. that should be more than good enough for anything we do
+ co_await aio.provider->getTimer().afterDelay(20 * kj::MILLISECONDS);
}
+} catch (...) {
+ co_return result::failure(std::current_exception());
}
-Goals Worker::run(std::function<Goals (GoalFactory &)> req)
+Worker::Results Worker::run(std::function<Targets (GoalFactory &)> req)
{
- auto _topGoals = req(goalFactory());
+ auto topGoals = req(goalFactory());
assert(!running);
running = true;
Finally const _stop([&] { running = false; });
- updateStatistics();
+ auto onInterrupt = kj::newPromiseAndCrossThreadFulfiller<Result<Results>>();
+ auto interruptCallback = createInterruptCallback([&] {
+ onInterrupt.fulfiller->fulfill(result::failure(std::make_exception_ptr(makeInterrupted())));
+ });
- topGoals = _topGoals;
+ auto promise = runImpl(std::move(topGoals))
+ .exclusiveJoin(updateStatistics())
+ .exclusiveJoin(std::move(onInterrupt.promise));
- debug("entered goal loop");
+ // TODO GC interface?
+ if (auto localStore = dynamic_cast<LocalStore *>(&store); localStore && settings.minFree != 0u) {
+ // Periodically wake up to see if we need to run the garbage collector.
+ promise = promise.exclusiveJoin(boopGC(*localStore));
+ }
- while (1) {
+ return promise.wait(aio.waitScope).value();
+}
- checkInterrupt();
+kj::Promise<Result<Worker::Results>> Worker::runImpl(Targets topGoals)
+try {
+ debug("entered goal loop");
- // TODO GC interface?
- if (auto localStore = dynamic_cast<LocalStore *>(&store))
- localStore->autoGC(false);
+ kj::Vector<Targets::value_type> promises(topGoals.size());
+ for (auto & gp : topGoals) {
+ promises.add(std::move(gp));
+ }
- /* Call every wake goal (in the ordering established by
- CompareGoalPtrs). */
- while (!awake.empty() && !topGoals.empty()) {
- Goals awake2;
- for (auto & i : awake) {
- GoalPtr goal = i.lock();
- if (goal) awake2.insert(goal);
- }
- awake.clear();
- for (auto & goal : awake2) {
- checkInterrupt();
- /* Make sure that we are always allowed to run at least one substitution.
- This prevents infinite waiting. */
- const bool inSlot = goal->jobCategory() == JobCategory::Substitution
- ? nrSubstitutions < std::max(1U, (unsigned int) settings.maxSubstitutionJobs)
- : nrLocalBuilds < settings.maxBuildJobs;
- handleWorkResult(goal, goal->work(inSlot).wait(aio.waitScope).value());
- updateStatistics();
-
- if (topGoals.empty()) break; // stuff may have been cancelled
- }
- }
+ Results results;
- if (topGoals.empty()) break;
+ auto collect = AsyncCollect(promises.releaseAsArray());
+ while (auto done = co_await collect.next()) {
+ // propagate goal exceptions outward
+ BOOST_OUTCOME_CO_TRY(auto result, done->second);
+ results.emplace(done->first, result);
- /* Wait for input. */
- if (!children.empty() || !waitingForAWhile.empty())
- waitForInput();
- else {
- assert(!awake.empty());
+ /* If a top-level goal failed, then kill all other goals
+ (unless keepGoing was set). */
+ if (result.exitCode == Goal::ecFailed && !settings.keepGoing) {
+ children.clear();
+ break;
}
}
/* If --keep-going is not set, it's possible that the main goal
exited while some of its subgoals were still active. But if
--keep-going *is* set, then they must all be finished now. */
- assert(!settings.keepGoing || awake.empty());
- assert(!settings.keepGoing || wantingToBuild.empty());
- assert(!settings.keepGoing || children.empty());
+ assert(!settings.keepGoing || children.isEmpty());
- return _topGoals;
+ co_return std::move(results);
+} catch (...) {
+ co_return result::failure(std::current_exception());
}
-void Worker::waitForInput()
-{
- printMsg(lvlVomit, "waiting for children");
-
- /* Process output from the file descriptors attached to the
- children, namely log output and output path creation commands.
- We also use this to detect child termination: if we get EOF on
- the logger pipe of a build, we assume that the builder has
- terminated. */
-
- bool useTimeout = false;
- long timeout = 0;
- auto before = steady_time_point::clock::now();
-
- /* If we're monitoring for silence on stdout/stderr, or if there
- is a build timeout, then wait for input until the first
- deadline for any child. */
- auto nearest = steady_time_point::max(); // nearest deadline
- if (settings.minFree.get() != 0)
- // Periodicallty wake up to see if we need to run the garbage collector.
- nearest = before + std::chrono::seconds(10);
- for (auto & i : children) {
- if (auto goal = i.goal.lock()) {
- if (!goal->respectsTimeouts()) continue;
- if (0 != settings.maxSilentTime)
- nearest = std::min(nearest, i.lastOutput + std::chrono::seconds(settings.maxSilentTime));
- if (0 != settings.buildTimeout)
- nearest = std::min(nearest, i.timeStarted + std::chrono::seconds(settings.buildTimeout));
- }
- }
- if (nearest != steady_time_point::max()) {
- timeout = std::max(1L, (long) std::chrono::duration_cast<std::chrono::seconds>(nearest - before).count());
- useTimeout = true;
- }
-
- /* If we are polling goals that are waiting for a lock, then wake
- up after a few seconds at most. */
- if (!waitingForAWhile.empty()) {
- useTimeout = true;
- if (lastWokenUp == steady_time_point::min() || lastWokenUp > before) lastWokenUp = before;
- timeout = std::max(1L,
- (long) std::chrono::duration_cast<std::chrono::seconds>(
- lastWokenUp + std::chrono::seconds(settings.pollInterval) - before).count());
- } else lastWokenUp = steady_time_point::min();
-
- if (useTimeout)
- vomit("sleeping %d seconds", timeout);
-
- /* Use select() to wait for the input side of any logger pipe to
- become `available'. Note that `available' (i.e., non-blocking)
- includes EOF. */
- std::vector<struct pollfd> pollStatus;
- std::map<int, size_t> fdToPollStatus;
- for (auto & i : children) {
- for (auto & j : i.fds) {
- pollStatus.push_back((struct pollfd) { .fd = j, .events = POLLIN });
- fdToPollStatus[j] = pollStatus.size() - 1;
- }
- }
-
- if (poll(pollStatus.data(), pollStatus.size(),
- useTimeout ? timeout * 1000 : -1) == -1) {
- if (errno == EINTR) return;
- throw SysError("waiting for input");
- }
-
- auto after = steady_time_point::clock::now();
-
- /* Process all available file descriptors. FIXME: this is
- O(children * fds). */
- decltype(children)::iterator i;
- for (auto j = children.begin(); j != children.end(); j = i) {
- i = std::next(j);
-
- checkInterrupt();
-
- GoalPtr goal = j->goal.lock();
- assert(goal);
-
- if (!goal->exitCode.has_value() &&
- 0 != settings.maxSilentTime &&
- goal->respectsTimeouts() &&
- after - j->lastOutput >= std::chrono::seconds(settings.maxSilentTime))
- {
- handleWorkResult(
- goal,
- goal->timedOut(Error(
- "%1% timed out after %2% seconds of silence",
- goal->getName(),
- settings.maxSilentTime
- ))
- );
- continue;
- }
-
- else if (!goal->exitCode.has_value() &&
- 0 != settings.buildTimeout &&
- goal->respectsTimeouts() &&
- after - j->timeStarted >= std::chrono::seconds(settings.buildTimeout))
- {
- handleWorkResult(
- goal,
- goal->timedOut(
- Error("%1% timed out after %2% seconds", goal->getName(), settings.buildTimeout)
- )
- );
- continue;
- }
-
- std::set<int> fds2(j->fds);
- std::vector<unsigned char> buffer(4096);
- for (auto & k : fds2) {
- const auto fdPollStatusId = get(fdToPollStatus, k);
- assert(fdPollStatusId);
- assert(*fdPollStatusId < pollStatus.size());
- if (pollStatus.at(*fdPollStatusId).revents) {
- ssize_t rd = ::read(k, buffer.data(), buffer.size());
- // FIXME: is there a cleaner way to handle pt close
- // than EIO? Is this even standard?
- if (rd == 0 || (rd == -1 && errno == EIO)) {
- debug("%1%: got EOF", goal->getName());
- goal->handleEOF(k);
- handleWorkResult(goal, Goal::ContinueImmediately{});
- j->fds.erase(k);
- } else if (rd == -1) {
- if (errno != EINTR)
- throw SysError("%s: read failed", goal->getName());
- } else {
- printMsg(lvlVomit, "%1%: read %2% bytes",
- goal->getName(), rd);
- std::string_view data(charptr_cast<char *>(buffer.data()), rd);
- j->lastOutput = after;
- handleWorkResult(goal, goal->handleChildOutput(k, data));
- }
- }
- }
- }
-
- if (!waitingForAWhile.empty() && lastWokenUp + std::chrono::seconds(settings.pollInterval) <= after) {
- lastWokenUp = after;
- for (auto & i : waitingForAWhile) {
- GoalPtr goal = i.lock();
- if (goal) wakeUp(goal);
- }
- waitingForAWhile.clear();
+kj::Promise<Result<Worker::Results>> Worker::boopGC(LocalStore & localStore)
+try {
+ while (true) {
+ co_await aio.provider->getTimer().afterDelay(10 * kj::SECONDS);
+ localStore.autoGC(false);
}
+} catch (...) {
+ co_return result::failure(std::current_exception());
}