diff options
Diffstat (limited to 'src/libstore/build/derivation-goal.cc')
-rw-r--r-- | src/libstore/build/derivation-goal.cc | 407 |
1 files changed, 267 insertions, 140 deletions
diff --git a/src/libstore/build/derivation-goal.cc b/src/libstore/build/derivation-goal.cc index 827c9f541..96140e10b 100644 --- a/src/libstore/build/derivation-goal.cc +++ b/src/libstore/build/derivation-goal.cc @@ -11,7 +11,13 @@ #include "drv-output-substitution-goal.hh" #include "strings.hh" +#include <boost/outcome/try.hpp> #include <fstream> +#include <kj/array.h> +#include <kj/async-unix.h> +#include <kj/async.h> +#include <kj/debug.h> +#include <kj/vector.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/un.h> @@ -65,7 +71,6 @@ DerivationGoal::DerivationGoal(const StorePath & drvPath, , wantedOutputs(wantedOutputs) , buildMode(buildMode) { - state = &DerivationGoal::getDerivation; name = fmt( "building of '%s' from .drv file", DerivedPath::Built { makeConstantStorePathRef(drvPath), wantedOutputs }.to_string(worker.store)); @@ -85,7 +90,6 @@ DerivationGoal::DerivationGoal(const StorePath & drvPath, const BasicDerivation { this->drv = std::make_unique<Derivation>(drv); - state = &DerivationGoal::haveDerivation; name = fmt( "building of '%s' from in-memory derivation", DerivedPath::Built { makeConstantStorePathRef(drvPath), drv.outputNames() }.to_string(worker.store)); @@ -103,17 +107,7 @@ DerivationGoal::~DerivationGoal() noexcept(false) { /* Careful: we should never ever throw an exception from a destructor. */ - try { closeLogFile(); } catch (...) { ignoreException(); } -} - - -std::string DerivationGoal::key() -{ - /* Ensure that derivations get built in order of their name, - i.e. a derivation named "aardvark" always comes before - "baboon". And substitution goals always happen before - derivation goals (due to "b$"). */ - return "b$" + std::string(drvPath.name()) + "$" + worker.store.printStorePath(drvPath); + try { closeLogFile(); } catch (...) { ignoreExceptionInDestructor(); } } @@ -124,20 +118,24 @@ void DerivationGoal::killChild() } -Goal::Finished DerivationGoal::timedOut(Error && ex) +Goal::WorkResult DerivationGoal::timedOut(Error && ex) { killChild(); return done(BuildResult::TimedOut, {}, std::move(ex)); } -kj::Promise<Result<Goal::WorkResult>> DerivationGoal::work(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DerivationGoal::workImpl() noexcept { - return (this->*state)(inBuildSlot); + return useDerivation ? getDerivation() : haveDerivation(); } -void DerivationGoal::addWantedOutputs(const OutputsSpec & outputs) +bool DerivationGoal::addWantedOutputs(const OutputsSpec & outputs) { + if (isDone) { + return false; + } + auto newWanted = wantedOutputs.union_(outputs); switch (needRestart) { case NeedRestartForMoreOutputs::OutputsUnmodifedDontNeed: @@ -154,10 +152,11 @@ void DerivationGoal::addWantedOutputs(const OutputsSpec & outputs) break; }; wantedOutputs = newWanted; + return true; } -kj::Promise<Result<Goal::WorkResult>> DerivationGoal::getDerivation(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DerivationGoal::getDerivation() noexcept try { trace("init"); @@ -165,18 +164,17 @@ try { exists. If it doesn't, it may be created through a substitute. */ if (buildMode == bmNormal && worker.evalStore.isValidPath(drvPath)) { - return loadDerivation(inBuildSlot); + co_return co_await loadDerivation(); } - - state = &DerivationGoal::loadDerivation; - return {WaitForGoals{{worker.goalFactory().makePathSubstitutionGoal(drvPath)}}}; + (co_await waitForGoals(worker.goalFactory().makePathSubstitutionGoal(drvPath))).value(); + co_return co_await loadDerivation(); } catch (...) { - return {std::current_exception()}; + co_return result::failure(std::current_exception()); } -kj::Promise<Result<Goal::WorkResult>> DerivationGoal::loadDerivation(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DerivationGoal::loadDerivation() noexcept try { trace("loading derivation"); @@ -207,13 +205,13 @@ try { } assert(drv); - return haveDerivation(inBuildSlot); + return haveDerivation(); } catch (...) { return {std::current_exception()}; } -kj::Promise<Result<Goal::WorkResult>> DerivationGoal::haveDerivation(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DerivationGoal::haveDerivation() noexcept try { trace("have derivation"); @@ -241,7 +239,7 @@ try { }); } - return gaveUpOnSubstitution(inBuildSlot); + co_return co_await gaveUpOnSubstitution(); } for (auto & i : drv->outputsAndOptPaths(worker.store)) @@ -263,19 +261,19 @@ try { /* If they are all valid, then we're done. */ if (allValid && buildMode == bmNormal) { - return {done(BuildResult::AlreadyValid, std::move(validOutputs))}; + co_return done(BuildResult::AlreadyValid, std::move(validOutputs)); } /* We are first going to try to create the invalid output paths through substitutes. If that doesn't work, we'll build them. */ - WaitForGoals result; + kj::Vector<std::pair<GoalPtr, kj::Promise<Result<WorkResult>>>> dependencies; if (settings.useSubstitutes) { if (parsedDrv->substitutesAllowed()) { for (auto & [outputName, status] : initialOutputs) { if (!status.wanted) continue; if (!status.known) - result.goals.insert( + dependencies.add( worker.goalFactory().makeDrvOutputSubstitutionGoal( DrvOutput{status.outputHash, outputName}, buildMode == bmRepair ? Repair : NoRepair @@ -283,7 +281,7 @@ try { ); else { auto * cap = getDerivationCA(*drv); - result.goals.insert(worker.goalFactory().makePathSubstitutionGoal( + dependencies.add(worker.goalFactory().makePathSubstitutionGoal( status.known->path, buildMode == bmRepair ? Repair : NoRepair, cap ? std::optional { *cap } : std::nullopt)); @@ -294,17 +292,15 @@ try { } } - if (result.goals.empty()) { /* to prevent hang (no wake-up event) */ - return outputsSubstitutionTried(inBuildSlot); - } else { - state = &DerivationGoal::outputsSubstitutionTried; - return {std::move(result)}; + if (!dependencies.empty()) { /* to prevent hang (no wake-up event) */ + (co_await waitForGoals(dependencies.releaseAsArray())).value(); } + co_return co_await outputsSubstitutionTried(); } catch (...) { - return {std::current_exception()}; + co_return result::failure(std::current_exception()); } -kj::Promise<Result<Goal::WorkResult>> DerivationGoal::outputsSubstitutionTried(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DerivationGoal::outputsSubstitutionTried() noexcept try { trace("all outputs substituted (maybe)"); @@ -354,7 +350,7 @@ try { if (needRestart == NeedRestartForMoreOutputs::OutputsAddedDoNeed) { needRestart = NeedRestartForMoreOutputs::OutputsUnmodifedDontNeed; - return haveDerivation(inBuildSlot); + return haveDerivation(); } auto [allValid, validOutputs] = checkPathValidity(); @@ -370,7 +366,7 @@ try { worker.store.printStorePath(drvPath)); /* Nothing to wait for; tail call */ - return gaveUpOnSubstitution(inBuildSlot); + return gaveUpOnSubstitution(); } catch (...) { return {std::current_exception()}; } @@ -378,9 +374,9 @@ try { /* At least one of the output paths could not be produced using a substitute. So we have to build instead. */ -kj::Promise<Result<Goal::WorkResult>> DerivationGoal::gaveUpOnSubstitution(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DerivationGoal::gaveUpOnSubstitution() noexcept try { - WaitForGoals result; + kj::Vector<std::pair<GoalPtr, kj::Promise<Result<WorkResult>>>> dependencies; /* At this point we are building all outputs, so if more are wanted there is no need to restart. */ @@ -393,7 +389,7 @@ try { addWaiteeDerivedPath = [&](ref<SingleDerivedPath> inputDrv, const DerivedPathMap<StringSet>::ChildNode & inputNode) { if (!inputNode.value.empty()) - result.goals.insert(worker.goalFactory().makeGoal( + dependencies.add(worker.goalFactory().makeGoal( DerivedPath::Built { .drvPath = inputDrv, .outputs = inputNode.value, @@ -438,17 +434,15 @@ try { if (!settings.useSubstitutes) throw Error("dependency '%s' of '%s' does not exist, and substitution is disabled", worker.store.printStorePath(i), worker.store.printStorePath(drvPath)); - result.goals.insert(worker.goalFactory().makePathSubstitutionGoal(i)); + dependencies.add(worker.goalFactory().makePathSubstitutionGoal(i)); } - if (result.goals.empty()) {/* to prevent hang (no wake-up event) */ - return inputsRealised(inBuildSlot); - } else { - state = &DerivationGoal::inputsRealised; - return {result}; + if (!dependencies.empty()) {/* to prevent hang (no wake-up event) */ + (co_await waitForGoals(dependencies.releaseAsArray())).value(); } + co_return co_await inputsRealised(); } catch (...) { - return {std::current_exception()}; + co_return result::failure(std::current_exception()); } @@ -488,7 +482,7 @@ try { } /* Check each path (slow!). */ - WaitForGoals result; + kj::Vector<std::pair<GoalPtr, kj::Promise<Result<WorkResult>>>> dependencies; for (auto & i : outputClosure) { if (worker.pathContentsGood(i)) continue; printError( @@ -496,9 +490,9 @@ try { worker.store.printStorePath(i), worker.store.printStorePath(drvPath)); auto drvPath2 = outputsToDrv.find(i); if (drvPath2 == outputsToDrv.end()) - result.goals.insert(worker.goalFactory().makePathSubstitutionGoal(i, Repair)); + dependencies.add(worker.goalFactory().makePathSubstitutionGoal(i, Repair)); else - result.goals.insert(worker.goalFactory().makeGoal( + dependencies.add(worker.goalFactory().makeGoal( DerivedPath::Built { .drvPath = makeConstantStorePathRef(drvPath2->second), .outputs = OutputsSpec::All { }, @@ -506,18 +500,18 @@ try { bmRepair)); } - if (result.goals.empty()) { - return {done(BuildResult::AlreadyValid, assertPathValidity())}; + if (dependencies.empty()) { + co_return done(BuildResult::AlreadyValid, assertPathValidity()); } - state = &DerivationGoal::closureRepaired; - return {result}; + (co_await waitForGoals(dependencies.releaseAsArray())).value(); + co_return co_await closureRepaired(); } catch (...) { - return {std::current_exception()}; + co_return result::failure(std::current_exception()); } -kj::Promise<Result<Goal::WorkResult>> DerivationGoal::closureRepaired(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DerivationGoal::closureRepaired() noexcept try { trace("closure repaired"); if (nrFailed > 0) @@ -529,14 +523,14 @@ try { } -kj::Promise<Result<Goal::WorkResult>> DerivationGoal::inputsRealised(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DerivationGoal::inputsRealised() noexcept try { trace("all inputs realised"); if (nrFailed != 0) { if (!useDerivation) throw Error("some dependencies of '%s' are missing", worker.store.printStorePath(drvPath)); - return {done( + co_return done( BuildResult::DependencyFailed, {}, Error( @@ -544,12 +538,12 @@ try { nrFailed, worker.store.printStorePath(drvPath) ) - )}; + ); } if (retrySubstitution == RetrySubstitution::YesNeed) { retrySubstitution = RetrySubstitution::AlreadyRetried; - return haveDerivation(inBuildSlot); + co_return co_await haveDerivation(); } /* Gather information necessary for computing the closure and/or @@ -611,11 +605,12 @@ try { worker.store.printStorePath(pathResolved), }); - resolvedDrvGoal = worker.goalFactory().makeDerivationGoal( + auto dependency = worker.goalFactory().makeDerivationGoal( pathResolved, wantedOutputs, buildMode); + resolvedDrvGoal = dependency.first; - state = &DerivationGoal::resolvedFinished; - return {WaitForGoals{{resolvedDrvGoal}}}; + (co_await waitForGoals(std::move(dependency))).value(); + co_return co_await resolvedFinished(); } std::function<void(const StorePath &, const DerivedPathMap<StringSet>::ChildNode &)> accumInputPaths; @@ -679,10 +674,9 @@ try { /* Okay, try to build. Note that here we don't wait for a build slot to become available, since we don't need one if there is a build hook. */ - state = &DerivationGoal::tryToBuild; - return tryToBuild(inBuildSlot); + co_return co_await tryToBuild(); } catch (...) { - return {std::current_exception()}; + co_return result::failure(std::current_exception()); } void DerivationGoal::started() @@ -698,8 +692,9 @@ void DerivationGoal::started() mcRunningBuilds = worker.runningBuilds.addTemporarily(1); } -kj::Promise<Result<Goal::WorkResult>> DerivationGoal::tryToBuild(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DerivationGoal::tryToBuild() noexcept try { +retry: trace("trying to build"); /* Obtain locks on all output paths, if the paths are known a priori. @@ -733,7 +728,9 @@ try { if (!actLock) actLock = std::make_unique<Activity>(*logger, lvlWarn, actBuildWaiting, fmt("waiting for lock on %s", Magenta(showPaths(lockFiles)))); - return {WaitForAWhile{}}; + co_await waitForAWhile(); + // we can loop very often, and `co_return co_await` always allocates a new frame + goto retry; } actLock.reset(); @@ -750,7 +747,7 @@ try { if (buildMode != bmCheck && allValid) { debug("skipping build of derivation '%s', someone beat us to it", worker.store.printStorePath(drvPath)); outputLocks.setDeletion(true); - return {done(BuildResult::AlreadyValid, std::move(validOutputs))}; + co_return done(BuildResult::AlreadyValid, std::move(validOutputs)); } /* If any of the outputs already exist but are not valid, delete @@ -770,47 +767,56 @@ try { && settings.maxBuildJobs.get() != 0; if (!buildLocally) { - auto hookReply = tryBuildHook(inBuildSlot); - auto result = std::visit( - overloaded{ - [&](HookReply::Accept & a) -> std::optional<WorkResult> { - /* Yes, it has started doing so. Wait until we get - EOF from the hook. */ - actLock.reset(); - buildResult.startTime = time(0); // inexact - state = &DerivationGoal::buildDone; - started(); - return WaitForWorld{std::move(a.fds), false}; - }, - [&](HookReply::Postpone) -> std::optional<WorkResult> { - /* Not now; wait until at least one child finishes or - the wake-up timeout expires. */ - if (!actLock) - actLock = std::make_unique<Activity>(*logger, lvlTalkative, actBuildWaiting, - fmt("waiting for a machine to build '%s'", Magenta(worker.store.printStorePath(drvPath)))); - outputLocks.unlock(); - return WaitForAWhile{}; - }, - [&](HookReply::Decline) -> std::optional<WorkResult> { - /* We should do it ourselves. */ - return std::nullopt; - }, - }, - hookReply); - if (result) { - return {std::move(*result)}; + auto hookReply = tryBuildHook(); + switch (hookReply.index()) { + case 0: { + HookReply::Accept & a = std::get<0>(hookReply); + /* Yes, it has started doing so. Wait until we get + EOF from the hook. */ + actLock.reset(); + buildResult.startTime = time(0); // inexact + started(); + auto r = co_await a.promise; + if (r.has_value()) { + co_return co_await buildDone(); + } else if (r.has_error()) { + co_return r.assume_error(); + } else { + co_return r.assume_exception(); + } + } + + case 1: { + HookReply::Decline _ [[gnu::unused]] = std::get<1>(hookReply); + break; + } + + case 2: { + HookReply::Postpone _ [[gnu::unused]] = std::get<2>(hookReply); + /* Not now; wait until at least one child finishes or + the wake-up timeout expires. */ + if (!actLock) + actLock = std::make_unique<Activity>(*logger, lvlTalkative, actBuildWaiting, + fmt("waiting for a machine to build '%s'", Magenta(worker.store.printStorePath(drvPath)))); + outputLocks.unlock(); + co_await waitForAWhile(); + goto retry; + } + + default: + // can't static_assert this because HookReply *subclasses* variant and std::variant_size breaks + assert(false && "unexpected hook reply"); } } actLock.reset(); - state = &DerivationGoal::tryLocalBuild; - return tryLocalBuild(inBuildSlot); + co_return co_await tryLocalBuild(); } catch (...) { - return {std::current_exception()}; + co_return result::failure(std::current_exception()); } -kj::Promise<Result<Goal::WorkResult>> DerivationGoal::tryLocalBuild(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DerivationGoal::tryLocalBuild() noexcept try { throw Error( "unable to build with a primary store that isn't a local store; " @@ -857,7 +863,7 @@ void replaceValidPath(const Path & storePath, const Path & tmpPath) // attempt to recover movePath(oldPath, storePath); } catch (...) { - ignoreException(); + ignoreExceptionExceptInterrupt(); } throw; } @@ -973,10 +979,11 @@ void runPostBuildHook( proc.getStdout()->drainInto(sink); } -kj::Promise<Result<Goal::WorkResult>> DerivationGoal::buildDone(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DerivationGoal::buildDone() noexcept try { trace("build done"); + slotToken = {}; Finally releaseBuildUser([&](){ this->cleanupHookFinally(); }); cleanupPreChildKill(); @@ -992,9 +999,6 @@ try { buildResult.timesBuilt++; buildResult.stopTime = time(0); - /* So the child is gone now. */ - worker.childTerminated(this); - /* Close the read side of the logger pipe. */ closeReadPipes(); @@ -1095,7 +1099,7 @@ try { return {std::current_exception()}; } -kj::Promise<Result<Goal::WorkResult>> DerivationGoal::resolvedFinished(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DerivationGoal::resolvedFinished() noexcept try { trace("resolved derivation finished"); @@ -1168,7 +1172,7 @@ try { return {std::current_exception()}; } -HookReply DerivationGoal::tryBuildHook(bool inBuildSlot) +HookReply DerivationGoal::tryBuildHook() { if (!worker.hook.available || !useDerivation) return HookReply::Decline{}; @@ -1180,7 +1184,7 @@ HookReply DerivationGoal::tryBuildHook(bool inBuildSlot) /* Send the request to the hook. */ worker.hook.instance->sink << "try" - << (inBuildSlot ? 1 : 0) + << (slotToken.valid() ? 1 : 0) << drv->platform << worker.store.printStorePath(drvPath) << parsedDrv->getRequiredSystemFeatures(); @@ -1207,6 +1211,7 @@ HookReply DerivationGoal::tryBuildHook(bool inBuildSlot) else { s += "\n"; writeLogsToStderr(s); + logger->log(lvlInfo, s); } } @@ -1266,12 +1271,8 @@ HookReply DerivationGoal::tryBuildHook(bool inBuildSlot) /* Create the log file and pipe. */ Path logFile = openLogFile(); - std::set<int> fds; - fds.insert(hook->fromHook.get()); - fds.insert(hook->builderOut.get()); builderOutFD = &hook->builderOut; - - return HookReply::Accept{std::move(fds)}; + return HookReply::Accept{handleChildOutput()}; } @@ -1331,23 +1332,69 @@ void DerivationGoal::closeLogFile() } -Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data) +Goal::WorkResult DerivationGoal::tooMuchLogs() { - assert(builderOutFD); + killChild(); + return done( + BuildResult::LogLimitExceeded, {}, + Error("%s killed after writing more than %d bytes of log output", + getName(), settings.maxLogSize)); +} - auto tooMuchLogs = [&] { - killChild(); - return done( - BuildResult::LogLimitExceeded, {}, - Error("%s killed after writing more than %d bytes of log output", - getName(), settings.maxLogSize)); - }; +struct DerivationGoal::InputStream final : private kj::AsyncObject +{ + int fd; + kj::UnixEventPort::FdObserver observer; + + InputStream(kj::UnixEventPort & ep, int fd) + : fd(fd) + , observer(ep, fd, kj::UnixEventPort::FdObserver::OBSERVE_READ) + { + int flags = fcntl(fd, F_GETFL); + if (flags < 0) { + throw SysError("fcntl(F_GETFL) failed on fd %i", fd); + } + if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) { + throw SysError("fcntl(F_SETFL) failed on fd %i", fd); + } + } + + kj::Promise<std::string_view> read(kj::ArrayPtr<char> buffer) + { + const auto res = ::read(fd, buffer.begin(), buffer.size()); + // closing a pty endpoint causes EIO on the other endpoint. stock kj streams + // do not handle this and throw exceptions we can't ask for errno instead :( + // (we can't use `errno` either because kj may well have mangled it by now.) + if (res == 0 || (res == -1 && errno == EIO)) { + return std::string_view{}; + } + + KJ_NONBLOCKING_SYSCALL(res) {} + + if (res > 0) { + return std::string_view{buffer.begin(), static_cast<size_t>(res)}; + } + + return observer.whenBecomesReadable().then([this, buffer] { + return read(buffer); + }); + } +}; + +kj::Promise<Outcome<void, Goal::WorkResult>> DerivationGoal::handleBuilderOutput(InputStream & in) noexcept +try { + auto buf = kj::heapArray<char>(4096); + while (true) { + auto data = co_await in.read(buf); + lastChildActivity = worker.aio.provider->getTimer().now(); + + if (data.empty()) { + co_return result::success(); + } - // local & `ssh://`-builds are dealt with here. - if (fd == builderOutFD->get()) { logSize += data.size(); if (settings.maxLogSize && logSize > settings.maxLogSize) { - return tooMuchLogs(); + co_return tooMuchLogs(); } for (auto c : data) @@ -1362,10 +1409,22 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data } if (logSink) (*logSink)(data); - return StillAlive{}; } +} catch (...) { + co_return std::current_exception(); +} + +kj::Promise<Outcome<void, Goal::WorkResult>> DerivationGoal::handleHookOutput(InputStream & in) noexcept +try { + auto buf = kj::heapArray<char>(4096); + while (true) { + auto data = co_await in.read(buf); + lastChildActivity = worker.aio.provider->getTimer().now(); + + if (data.empty()) { + co_return result::success(); + } - if (hook && fd == hook->fromHook.get()) { for (auto c : data) if (c == '\n') { auto json = parseJSONMessage(currentHookLine); @@ -1381,7 +1440,7 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data (fields.size() > 0 ? fields[0].get<std::string>() : "") + "\n"; logSize += logLine.size(); if (settings.maxLogSize && logSize > settings.maxLogSize) { - return tooMuchLogs(); + co_return tooMuchLogs(); } (*logSink)(logLine); } else if (type == resSetPhase && ! fields.is_null()) { @@ -1405,16 +1464,83 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data } else currentHookLine += c; } - - return StillAlive{}; +} catch (...) { + co_return std::current_exception(); } +kj::Promise<Outcome<void, Goal::WorkResult>> DerivationGoal::handleChildOutput() noexcept +try { + assert(builderOutFD); + + auto builderIn = kj::heap<InputStream>(worker.aio.unixEventPort, builderOutFD->get()); + kj::Own<InputStream> hookIn; + if (hook) { + hookIn = kj::heap<InputStream>(worker.aio.unixEventPort, hook->fromHook.get()); + } -void DerivationGoal::handleEOF(int fd) + auto handlers = handleChildStreams(*builderIn, hookIn.get()).attach(std::move(builderIn), std::move(hookIn)); + + if (respectsTimeouts() && settings.buildTimeout != 0) { + handlers = handlers.exclusiveJoin( + worker.aio.provider->getTimer() + .afterDelay(settings.buildTimeout.get() * kj::SECONDS) + .then([this]() -> Outcome<void, WorkResult> { + return timedOut( + Error("%1% timed out after %2% seconds", name, settings.buildTimeout) + ); + }) + ); + } + + return handlers.then([this](auto r) -> Outcome<void, WorkResult> { + if (!currentLogLine.empty()) flushLine(); + return r; + }); +} catch (...) { + return {std::current_exception()}; +} + +kj::Promise<Outcome<void, Goal::WorkResult>> DerivationGoal::monitorForSilence() noexcept { - if (!currentLogLine.empty()) flushLine(); + while (true) { + const auto stash = lastChildActivity; + auto waitUntil = lastChildActivity + settings.maxSilentTime.get() * kj::SECONDS; + co_await worker.aio.provider->getTimer().atTime(waitUntil); + if (lastChildActivity == stash) { + co_return timedOut( + Error("%1% timed out after %2% seconds of silence", name, settings.maxSilentTime) + ); + } + } } +kj::Promise<Outcome<void, Goal::WorkResult>> +DerivationGoal::handleChildStreams(InputStream & builderIn, InputStream * hookIn) noexcept +{ + lastChildActivity = worker.aio.provider->getTimer().now(); + + auto handlers = kj::joinPromisesFailFast([&] { + kj::Vector<kj::Promise<Outcome<void, WorkResult>>> parts{2}; + + parts.add(handleBuilderOutput(builderIn)); + if (hookIn) { + parts.add(handleHookOutput(*hookIn)); + } + + return parts.releaseAsArray(); + }()); + + if (respectsTimeouts() && settings.maxSilentTime != 0) { + handlers = handlers.exclusiveJoin(monitorForSilence().then([](auto r) { + return kj::arr(std::move(r)); + })); + } + + for (auto r : co_await handlers) { + BOOST_OUTCOME_CO_TRYV(r); + } + co_return result::success(); +} void DerivationGoal::flushLine() { @@ -1555,11 +1681,13 @@ SingleDrvOutputs DerivationGoal::assertPathValidity() } -Goal::Finished DerivationGoal::done( +Goal::WorkResult DerivationGoal::done( BuildResult::Status status, SingleDrvOutputs builtOutputs, std::optional<Error> ex) { + isDone = true; + outputLocks.unlock(); buildResult.status = status; if (ex) @@ -1590,7 +1718,7 @@ Goal::Finished DerivationGoal::done( logError(ex->info()); } - return Finished{ + return WorkResult{ .exitCode = buildResult.success() ? ecSuccess : ecFailed, .result = buildResult, .ex = ex ? std::make_shared<Error>(std::move(*ex)) : nullptr, @@ -1629,5 +1757,4 @@ void DerivationGoal::waiteeDone(GoalPtr waitee) } } } - } |