diff options
Diffstat (limited to 'src/libstore/build/worker.cc')
-rw-r--r-- | src/libstore/build/worker.cc | 605 |
1 files changed, 175 insertions, 430 deletions
diff --git a/src/libstore/build/worker.cc b/src/libstore/build/worker.cc index ee45c7e3f..10f58f5d3 100644 --- a/src/libstore/build/worker.cc +++ b/src/libstore/build/worker.cc @@ -1,3 +1,4 @@ +#include "async-collect.hh" #include "charptr-cast.hh" #include "worker.hh" #include "finally.hh" @@ -6,11 +7,22 @@ #include "local-derivation-goal.hh" #include "signals.hh" #include "hook-instance.hh" // IWYU pragma: keep - -#include <poll.h> +#include <boost/outcome/try.hpp> +#include <kj/vector.h> namespace nix { +namespace { +struct ErrorHandler : kj::TaskSet::ErrorHandler +{ + void taskFailed(kj::Exception && e) override + { + printError("unexpected async failure in Worker: %s", kj::str(e).cStr()); + abort(); + } +} errorHandler; +} + Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio) : act(*logger, actRealise) , actDerivations(*logger, actBuilds) @@ -18,11 +30,13 @@ Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio) , store(store) , evalStore(evalStore) , aio(aio) + /* Make sure that we are always allowed to run at least one substitution. + This prevents infinite waiting. */ + , substitutions(std::max<unsigned>(1, settings.maxSubstitutionJobs)) + , localBuilds(settings.maxBuildJobs) + , children(errorHandler) { /* Debugging: prevent recursive workers. */ - nrLocalBuilds = 0; - nrSubstitutions = 0; - lastWokenUp = steady_time_point::min(); } @@ -32,7 +46,11 @@ Worker::~Worker() goals that refer to this worker should be gone. (Otherwise we are in trouble, since goals may call childTerminated() etc. in their destructors). */ - topGoals.clear(); + children.clear(); + + derivationGoals.clear(); + drvOutputSubstitutionGoals.clear(); + substitutionGoals.clear(); assert(expectedSubstitutions == 0); assert(expectedDownloadSize == 0); @@ -40,292 +58,158 @@ Worker::~Worker() } -std::shared_ptr<DerivationGoal> Worker::makeDerivationGoalCommon( - const StorePath & drvPath, - const OutputsSpec & wantedOutputs, - std::function<std::shared_ptr<DerivationGoal>()> mkDrvGoal) +template<typename ID, std::derived_from<Goal> G> +std::pair<std::shared_ptr<G>, kj::Promise<Result<Goal::WorkResult>>> Worker::makeGoalCommon( + std::map<ID, CachedGoal<G>> & map, + const ID & key, + InvocableR<std::unique_ptr<G>> auto create, + InvocableR<bool, G &> auto modify +) { - std::weak_ptr<DerivationGoal> & goal_weak = derivationGoals[drvPath]; - std::shared_ptr<DerivationGoal> goal = goal_weak.lock(); - if (!goal) { - goal = mkDrvGoal(); - goal_weak = goal; - wakeUp(goal); - } else { - goal->addWantedOutputs(wantedOutputs); + auto [it, _inserted] = map.try_emplace(key); + // try twice to create the goal. we can only loop if we hit the continue, + // and then we only want to recreate the goal *once*. concurrent accesses + // to the worker are not sound, we want to catch them if at all possible. + for ([[maybe_unused]] auto _attempt : {1, 2}) { + auto & cachedGoal = it->second; + auto & goal = cachedGoal.goal; + if (!goal) { + goal = create(); + // do not start working immediately. if we are not yet running we + // may create dependencies as though they were toplevel goals, in + // which case the dependencies will not report build errors. when + // we are running we may be called for this same goal more times, + // and then we want to modify rather than recreate when possible. + auto removeWhenDone = [goal, &map, it] { + // c++ lambda coroutine capture semantics are *so* fucked up. + return [](auto goal, auto & map, auto it) -> kj::Promise<Result<Goal::WorkResult>> { + auto result = co_await goal->work(); + // a concurrent call to makeGoalCommon may have reset our + // cached goal and replaced it with a new instance. don't + // remove the goal in this case, otherwise we will crash. + if (goal == it->second.goal) { + map.erase(it); + } + co_return result; + }(goal, map, it); + }; + cachedGoal.promise = kj::evalLater(std::move(removeWhenDone)).fork(); + children.add(cachedGoal.promise.addBranch().then([this](auto _result) { + if (_result.has_value()) { + auto & result = _result.value(); + permanentFailure |= result.permanentFailure; + timedOut |= result.timedOut; + hashMismatch |= result.hashMismatch; + checkMismatch |= result.checkMismatch; + } + })); + } else { + if (!modify(*goal)) { + cachedGoal = {}; + continue; + } + } + return {goal, cachedGoal.promise.addBranch()}; } - return goal; + assert(false && "could not make a goal. possible concurrent worker access"); } -std::shared_ptr<DerivationGoal> Worker::makeDerivationGoal(const StorePath & drvPath, - const OutputsSpec & wantedOutputs, BuildMode buildMode) +std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<Result<Goal::WorkResult>>> Worker::makeDerivationGoal( + const StorePath & drvPath, const OutputsSpec & wantedOutputs, BuildMode buildMode +) { - return makeDerivationGoalCommon( + return makeGoalCommon( + derivationGoals, drvPath, - wantedOutputs, - [&]() -> std::shared_ptr<DerivationGoal> { + [&]() -> std::unique_ptr<DerivationGoal> { return !dynamic_cast<LocalStore *>(&store) - ? std::make_shared<DerivationGoal>( + ? std::make_unique<DerivationGoal>( drvPath, wantedOutputs, *this, running, buildMode ) : LocalDerivationGoal::makeLocalDerivationGoal( drvPath, wantedOutputs, *this, running, buildMode ); - } + }, + [&](DerivationGoal & g) { return g.addWantedOutputs(wantedOutputs); } ); } -std::shared_ptr<DerivationGoal> Worker::makeBasicDerivationGoal(const StorePath & drvPath, - const BasicDerivation & drv, const OutputsSpec & wantedOutputs, BuildMode buildMode) +std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<Result<Goal::WorkResult>>> Worker::makeBasicDerivationGoal( + const StorePath & drvPath, + const BasicDerivation & drv, + const OutputsSpec & wantedOutputs, + BuildMode buildMode +) { - return makeDerivationGoalCommon( + return makeGoalCommon( + derivationGoals, drvPath, - wantedOutputs, - [&]() -> std::shared_ptr<DerivationGoal> { + [&]() -> std::unique_ptr<DerivationGoal> { return !dynamic_cast<LocalStore *>(&store) - ? std::make_shared<DerivationGoal>( + ? std::make_unique<DerivationGoal>( drvPath, drv, wantedOutputs, *this, running, buildMode ) : LocalDerivationGoal::makeLocalDerivationGoal( drvPath, drv, wantedOutputs, *this, running, buildMode ); - } + }, + [&](DerivationGoal & g) { return g.addWantedOutputs(wantedOutputs); } ); } -std::shared_ptr<PathSubstitutionGoal> Worker::makePathSubstitutionGoal(const StorePath & path, RepairFlag repair, std::optional<ContentAddress> ca) +std::pair<std::shared_ptr<PathSubstitutionGoal>, kj::Promise<Result<Goal::WorkResult>>> +Worker::makePathSubstitutionGoal( + const StorePath & path, RepairFlag repair, std::optional<ContentAddress> ca +) { - std::weak_ptr<PathSubstitutionGoal> & goal_weak = substitutionGoals[path]; - auto goal = goal_weak.lock(); // FIXME - if (!goal) { - goal = std::make_shared<PathSubstitutionGoal>(path, *this, running, repair, ca); - goal_weak = goal; - wakeUp(goal); - } - return goal; + return makeGoalCommon( + substitutionGoals, + path, + [&] { return std::make_unique<PathSubstitutionGoal>(path, *this, running, repair, ca); }, + [&](auto &) { return true; } + ); } -std::shared_ptr<DrvOutputSubstitutionGoal> Worker::makeDrvOutputSubstitutionGoal(const DrvOutput& id, RepairFlag repair, std::optional<ContentAddress> ca) +std::pair<std::shared_ptr<DrvOutputSubstitutionGoal>, kj::Promise<Result<Goal::WorkResult>>> +Worker::makeDrvOutputSubstitutionGoal( + const DrvOutput & id, RepairFlag repair, std::optional<ContentAddress> ca +) { - std::weak_ptr<DrvOutputSubstitutionGoal> & goal_weak = drvOutputSubstitutionGoals[id]; - auto goal = goal_weak.lock(); // FIXME - if (!goal) { - goal = std::make_shared<DrvOutputSubstitutionGoal>(id, *this, running, repair, ca); - goal_weak = goal; - wakeUp(goal); - } - return goal; + return makeGoalCommon( + drvOutputSubstitutionGoals, + id, + [&] { return std::make_unique<DrvOutputSubstitutionGoal>(id, *this, running, repair, ca); }, + [&](auto &) { return true; } + ); } -GoalPtr Worker::makeGoal(const DerivedPath & req, BuildMode buildMode) +std::pair<GoalPtr, kj::Promise<Result<Goal::WorkResult>>> Worker::makeGoal(const DerivedPath & req, BuildMode buildMode) { return std::visit(overloaded { - [&](const DerivedPath::Built & bfd) -> GoalPtr { + [&](const DerivedPath::Built & bfd) -> std::pair<GoalPtr, kj::Promise<Result<Goal::WorkResult>>> { if (auto bop = std::get_if<DerivedPath::Opaque>(&*bfd.drvPath)) return makeDerivationGoal(bop->path, bfd.outputs, buildMode); else throw UnimplementedError("Building dynamic derivations in one shot is not yet implemented."); }, - [&](const DerivedPath::Opaque & bo) -> GoalPtr { + [&](const DerivedPath::Opaque & bo) -> std::pair<GoalPtr, kj::Promise<Result<Goal::WorkResult>>> { return makePathSubstitutionGoal(bo.path, buildMode == bmRepair ? Repair : NoRepair); }, }, req.raw()); } +kj::Promise<Result<Worker::Results>> Worker::updateStatistics() +try { + while (true) { + statisticsUpdateInhibitor = co_await statisticsUpdateSignal.acquire(); -template<typename K, typename G> -static void removeGoal(std::shared_ptr<G> goal, std::map<K, std::weak_ptr<G>> & goalMap) -{ - /* !!! inefficient */ - for (auto i = goalMap.begin(); - i != goalMap.end(); ) - if (i->second.lock() == goal) { - auto j = i; ++j; - goalMap.erase(i); - i = j; - } - else ++i; -} - - -void Worker::goalFinished(GoalPtr goal, Goal::Finished & f) -{ - goal->trace("done"); - assert(!goal->exitCode.has_value()); - goal->exitCode = f.exitCode; - goal->ex = f.ex; - - permanentFailure |= f.permanentFailure; - timedOut |= f.timedOut; - hashMismatch |= f.hashMismatch; - checkMismatch |= f.checkMismatch; - - for (auto & i : goal->waiters) { - if (GoalPtr waiting = i.lock()) { - assert(waiting->waitees.count(goal)); - waiting->waitees.erase(goal); - - waiting->trace(fmt("waitee '%s' done; %d left", goal->name, waiting->waitees.size())); - - if (f.exitCode != Goal::ecSuccess) ++waiting->nrFailed; - if (f.exitCode == Goal::ecNoSubstituters) ++waiting->nrNoSubstituters; - if (f.exitCode == Goal::ecIncompleteClosure) ++waiting->nrIncompleteClosure; - - if (waiting->waitees.empty() || (f.exitCode == Goal::ecFailed && !settings.keepGoing)) { - /* If we failed and keepGoing is not set, we remove all - remaining waitees. */ - for (auto & i : waiting->waitees) { - i->waiters.extract(waiting); - } - waiting->waitees.clear(); - - wakeUp(waiting); - } - - waiting->waiteeDone(goal); - } - } - goal->waiters.clear(); - removeGoal(goal); - goal->cleanup(); -} - -void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how) -{ - std::visit( - overloaded{ - [&](Goal::StillAlive) {}, - [&](Goal::WaitForSlot) { waitForBuildSlot(goal); }, - [&](Goal::WaitForAWhile) { waitForAWhile(goal); }, - [&](Goal::ContinueImmediately) { wakeUp(goal); }, - [&](Goal::WaitForGoals & w) { - for (auto & dep : w.goals) { - goal->waitees.insert(dep); - dep->waiters.insert(goal); - } - }, - [&](Goal::WaitForWorld & w) { childStarted(goal, w.fds, w.inBuildSlot); }, - [&](Goal::Finished & f) { goalFinished(goal, f); }, - }, - how - ); -} - -void Worker::removeGoal(GoalPtr goal) -{ - if (auto drvGoal = std::dynamic_pointer_cast<DerivationGoal>(goal)) - nix::removeGoal(drvGoal, derivationGoals); - else if (auto subGoal = std::dynamic_pointer_cast<PathSubstitutionGoal>(goal)) - nix::removeGoal(subGoal, substitutionGoals); - else if (auto subGoal = std::dynamic_pointer_cast<DrvOutputSubstitutionGoal>(goal)) - nix::removeGoal(subGoal, drvOutputSubstitutionGoals); - else - assert(false); - - if (topGoals.find(goal) != topGoals.end()) { - topGoals.erase(goal); - /* If a top-level goal failed, then kill all other goals - (unless keepGoing was set). */ - if (goal->exitCode == Goal::ecFailed && !settings.keepGoing) - topGoals.clear(); - } -} - - -void Worker::wakeUp(GoalPtr goal) -{ - goal->trace("woken up"); - awake.insert(goal); -} - - -void Worker::childStarted(GoalPtr goal, const std::set<int> & fds, - bool inBuildSlot) -{ - Child child; - child.goal = goal; - child.goal2 = goal.get(); - child.fds = fds; - child.timeStarted = child.lastOutput = steady_time_point::clock::now(); - child.inBuildSlot = inBuildSlot; - children.emplace_back(child); - if (inBuildSlot) { - switch (goal->jobCategory()) { - case JobCategory::Substitution: - nrSubstitutions++; - break; - case JobCategory::Build: - nrLocalBuilds++; - break; - default: - abort(); - } - } -} - - -void Worker::childTerminated(Goal * goal) -{ - auto i = std::find_if(children.begin(), children.end(), - [&](const Child & child) { return child.goal2 == goal; }); - if (i == children.end()) return; - - if (i->inBuildSlot) { - switch (goal->jobCategory()) { - case JobCategory::Substitution: - assert(nrSubstitutions > 0); - nrSubstitutions--; - break; - case JobCategory::Build: - assert(nrLocalBuilds > 0); - nrLocalBuilds--; - break; - default: - abort(); - } - } - - children.erase(i); - - /* Wake up goals waiting for a build slot. */ - for (auto & j : wantingToBuild) { - GoalPtr goal = j.lock(); - if (goal) wakeUp(goal); - } - - wantingToBuild.clear(); -} - - -void Worker::waitForBuildSlot(GoalPtr goal) -{ - goal->trace("wait for build slot"); - bool isSubstitutionGoal = goal->jobCategory() == JobCategory::Substitution; - if ((!isSubstitutionGoal && nrLocalBuilds < settings.maxBuildJobs) || - (isSubstitutionGoal && nrSubstitutions < settings.maxSubstitutionJobs)) - wakeUp(goal); /* we can do it right away */ - else - wantingToBuild.insert(goal); -} - - -void Worker::waitForAWhile(GoalPtr goal) -{ - debug("wait for a while"); - waitingForAWhile.insert(goal); -} - - -void Worker::updateStatistics() -{ - // only update progress info while running. this notably excludes updating - // progress info while destroying, which causes the progress bar to assert - if (running && statisticsOutdated) { + // only update progress info while running. this notably excludes updating + // progress info while destroying, which causes the progress bar to assert actDerivations.progress( doneBuilds, expectedBuilds + doneBuilds, runningBuilds, failedBuilds ); @@ -338,221 +222,82 @@ void Worker::updateStatistics() act.setExpected(actFileTransfer, expectedDownloadSize + doneDownloadSize); act.setExpected(actCopyPath, expectedNarSize + doneNarSize); - statisticsOutdated = false; + // limit to 50fps. that should be more than good enough for anything we do + co_await aio.provider->getTimer().afterDelay(20 * kj::MILLISECONDS); } +} catch (...) { + co_return result::failure(std::current_exception()); } -Goals Worker::run(std::function<Goals (GoalFactory &)> req) +Worker::Results Worker::run(std::function<Targets (GoalFactory &)> req) { - auto _topGoals = req(goalFactory()); + auto topGoals = req(goalFactory()); assert(!running); running = true; Finally const _stop([&] { running = false; }); - updateStatistics(); + auto onInterrupt = kj::newPromiseAndCrossThreadFulfiller<Result<Results>>(); + auto interruptCallback = createInterruptCallback([&] { + onInterrupt.fulfiller->fulfill(result::failure(std::make_exception_ptr(makeInterrupted()))); + }); - topGoals = _topGoals; + auto promise = runImpl(std::move(topGoals)) + .exclusiveJoin(updateStatistics()) + .exclusiveJoin(std::move(onInterrupt.promise)); - debug("entered goal loop"); + // TODO GC interface? + if (auto localStore = dynamic_cast<LocalStore *>(&store); localStore && settings.minFree != 0u) { + // Periodically wake up to see if we need to run the garbage collector. + promise = promise.exclusiveJoin(boopGC(*localStore)); + } - while (1) { + return promise.wait(aio.waitScope).value(); +} - checkInterrupt(); +kj::Promise<Result<Worker::Results>> Worker::runImpl(Targets topGoals) +try { + debug("entered goal loop"); - // TODO GC interface? - if (auto localStore = dynamic_cast<LocalStore *>(&store)) - localStore->autoGC(false); + kj::Vector<Targets::value_type> promises(topGoals.size()); + for (auto & gp : topGoals) { + promises.add(std::move(gp)); + } - /* Call every wake goal (in the ordering established by - CompareGoalPtrs). */ - while (!awake.empty() && !topGoals.empty()) { - Goals awake2; - for (auto & i : awake) { - GoalPtr goal = i.lock(); - if (goal) awake2.insert(goal); - } - awake.clear(); - for (auto & goal : awake2) { - checkInterrupt(); - /* Make sure that we are always allowed to run at least one substitution. - This prevents infinite waiting. */ - const bool inSlot = goal->jobCategory() == JobCategory::Substitution - ? nrSubstitutions < std::max(1U, (unsigned int) settings.maxSubstitutionJobs) - : nrLocalBuilds < settings.maxBuildJobs; - handleWorkResult(goal, goal->work(inSlot).wait(aio.waitScope).value()); - updateStatistics(); - - if (topGoals.empty()) break; // stuff may have been cancelled - } - } + Results results; - if (topGoals.empty()) break; + auto collect = AsyncCollect(promises.releaseAsArray()); + while (auto done = co_await collect.next()) { + // propagate goal exceptions outward + BOOST_OUTCOME_CO_TRY(auto result, done->second); + results.emplace(done->first, result); - /* Wait for input. */ - if (!children.empty() || !waitingForAWhile.empty()) - waitForInput(); - else { - assert(!awake.empty()); + /* If a top-level goal failed, then kill all other goals + (unless keepGoing was set). */ + if (result.exitCode == Goal::ecFailed && !settings.keepGoing) { + children.clear(); + break; } } /* If --keep-going is not set, it's possible that the main goal exited while some of its subgoals were still active. But if --keep-going *is* set, then they must all be finished now. */ - assert(!settings.keepGoing || awake.empty()); - assert(!settings.keepGoing || wantingToBuild.empty()); - assert(!settings.keepGoing || children.empty()); + assert(!settings.keepGoing || children.isEmpty()); - return _topGoals; + co_return std::move(results); +} catch (...) { + co_return result::failure(std::current_exception()); } -void Worker::waitForInput() -{ - printMsg(lvlVomit, "waiting for children"); - - /* Process output from the file descriptors attached to the - children, namely log output and output path creation commands. - We also use this to detect child termination: if we get EOF on - the logger pipe of a build, we assume that the builder has - terminated. */ - - bool useTimeout = false; - long timeout = 0; - auto before = steady_time_point::clock::now(); - - /* If we're monitoring for silence on stdout/stderr, or if there - is a build timeout, then wait for input until the first - deadline for any child. */ - auto nearest = steady_time_point::max(); // nearest deadline - if (settings.minFree.get() != 0) - // Periodicallty wake up to see if we need to run the garbage collector. - nearest = before + std::chrono::seconds(10); - for (auto & i : children) { - if (auto goal = i.goal.lock()) { - if (!goal->respectsTimeouts()) continue; - if (0 != settings.maxSilentTime) - nearest = std::min(nearest, i.lastOutput + std::chrono::seconds(settings.maxSilentTime)); - if (0 != settings.buildTimeout) - nearest = std::min(nearest, i.timeStarted + std::chrono::seconds(settings.buildTimeout)); - } - } - if (nearest != steady_time_point::max()) { - timeout = std::max(1L, (long) std::chrono::duration_cast<std::chrono::seconds>(nearest - before).count()); - useTimeout = true; - } - - /* If we are polling goals that are waiting for a lock, then wake - up after a few seconds at most. */ - if (!waitingForAWhile.empty()) { - useTimeout = true; - if (lastWokenUp == steady_time_point::min() || lastWokenUp > before) lastWokenUp = before; - timeout = std::max(1L, - (long) std::chrono::duration_cast<std::chrono::seconds>( - lastWokenUp + std::chrono::seconds(settings.pollInterval) - before).count()); - } else lastWokenUp = steady_time_point::min(); - - if (useTimeout) - vomit("sleeping %d seconds", timeout); - - /* Use select() to wait for the input side of any logger pipe to - become `available'. Note that `available' (i.e., non-blocking) - includes EOF. */ - std::vector<struct pollfd> pollStatus; - std::map<int, size_t> fdToPollStatus; - for (auto & i : children) { - for (auto & j : i.fds) { - pollStatus.push_back((struct pollfd) { .fd = j, .events = POLLIN }); - fdToPollStatus[j] = pollStatus.size() - 1; - } - } - - if (poll(pollStatus.data(), pollStatus.size(), - useTimeout ? timeout * 1000 : -1) == -1) { - if (errno == EINTR) return; - throw SysError("waiting for input"); - } - - auto after = steady_time_point::clock::now(); - - /* Process all available file descriptors. FIXME: this is - O(children * fds). */ - decltype(children)::iterator i; - for (auto j = children.begin(); j != children.end(); j = i) { - i = std::next(j); - - checkInterrupt(); - - GoalPtr goal = j->goal.lock(); - assert(goal); - - if (!goal->exitCode.has_value() && - 0 != settings.maxSilentTime && - goal->respectsTimeouts() && - after - j->lastOutput >= std::chrono::seconds(settings.maxSilentTime)) - { - handleWorkResult( - goal, - goal->timedOut(Error( - "%1% timed out after %2% seconds of silence", - goal->getName(), - settings.maxSilentTime - )) - ); - continue; - } - - else if (!goal->exitCode.has_value() && - 0 != settings.buildTimeout && - goal->respectsTimeouts() && - after - j->timeStarted >= std::chrono::seconds(settings.buildTimeout)) - { - handleWorkResult( - goal, - goal->timedOut( - Error("%1% timed out after %2% seconds", goal->getName(), settings.buildTimeout) - ) - ); - continue; - } - - std::set<int> fds2(j->fds); - std::vector<unsigned char> buffer(4096); - for (auto & k : fds2) { - const auto fdPollStatusId = get(fdToPollStatus, k); - assert(fdPollStatusId); - assert(*fdPollStatusId < pollStatus.size()); - if (pollStatus.at(*fdPollStatusId).revents) { - ssize_t rd = ::read(k, buffer.data(), buffer.size()); - // FIXME: is there a cleaner way to handle pt close - // than EIO? Is this even standard? - if (rd == 0 || (rd == -1 && errno == EIO)) { - debug("%1%: got EOF", goal->getName()); - goal->handleEOF(k); - handleWorkResult(goal, Goal::ContinueImmediately{}); - j->fds.erase(k); - } else if (rd == -1) { - if (errno != EINTR) - throw SysError("%s: read failed", goal->getName()); - } else { - printMsg(lvlVomit, "%1%: read %2% bytes", - goal->getName(), rd); - std::string_view data(charptr_cast<char *>(buffer.data()), rd); - j->lastOutput = after; - handleWorkResult(goal, goal->handleChildOutput(k, data)); - } - } - } - } - - if (!waitingForAWhile.empty() && lastWokenUp + std::chrono::seconds(settings.pollInterval) <= after) { - lastWokenUp = after; - for (auto & i : waitingForAWhile) { - GoalPtr goal = i.lock(); - if (goal) wakeUp(goal); - } - waitingForAWhile.clear(); +kj::Promise<Result<Worker::Results>> Worker::boopGC(LocalStore & localStore) +try { + while (true) { + co_await aio.provider->getTimer().afterDelay(10 * kj::SECONDS); + localStore.autoGC(false); } +} catch (...) { + co_return result::failure(std::current_exception()); } |