aboutsummaryrefslogtreecommitdiff
path: root/src/libstore/build/derivation-goal.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstore/build/derivation-goal.cc')
-rw-r--r--src/libstore/build/derivation-goal.cc407
1 files changed, 267 insertions, 140 deletions
diff --git a/src/libstore/build/derivation-goal.cc b/src/libstore/build/derivation-goal.cc
index 827c9f541..96140e10b 100644
--- a/src/libstore/build/derivation-goal.cc
+++ b/src/libstore/build/derivation-goal.cc
@@ -11,7 +11,13 @@
#include "drv-output-substitution-goal.hh"
#include "strings.hh"
+#include <boost/outcome/try.hpp>
#include <fstream>
+#include <kj/array.h>
+#include <kj/async-unix.h>
+#include <kj/async.h>
+#include <kj/debug.h>
+#include <kj/vector.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
@@ -65,7 +71,6 @@ DerivationGoal::DerivationGoal(const StorePath & drvPath,
, wantedOutputs(wantedOutputs)
, buildMode(buildMode)
{
- state = &DerivationGoal::getDerivation;
name = fmt(
"building of '%s' from .drv file",
DerivedPath::Built { makeConstantStorePathRef(drvPath), wantedOutputs }.to_string(worker.store));
@@ -85,7 +90,6 @@ DerivationGoal::DerivationGoal(const StorePath & drvPath, const BasicDerivation
{
this->drv = std::make_unique<Derivation>(drv);
- state = &DerivationGoal::haveDerivation;
name = fmt(
"building of '%s' from in-memory derivation",
DerivedPath::Built { makeConstantStorePathRef(drvPath), drv.outputNames() }.to_string(worker.store));
@@ -103,17 +107,7 @@ DerivationGoal::~DerivationGoal() noexcept(false)
{
/* Careful: we should never ever throw an exception from a
destructor. */
- try { closeLogFile(); } catch (...) { ignoreException(); }
-}
-
-
-std::string DerivationGoal::key()
-{
- /* Ensure that derivations get built in order of their name,
- i.e. a derivation named "aardvark" always comes before
- "baboon". And substitution goals always happen before
- derivation goals (due to "b$"). */
- return "b$" + std::string(drvPath.name()) + "$" + worker.store.printStorePath(drvPath);
+ try { closeLogFile(); } catch (...) { ignoreExceptionInDestructor(); }
}
@@ -124,20 +118,24 @@ void DerivationGoal::killChild()
}
-Goal::Finished DerivationGoal::timedOut(Error && ex)
+Goal::WorkResult DerivationGoal::timedOut(Error && ex)
{
killChild();
return done(BuildResult::TimedOut, {}, std::move(ex));
}
-kj::Promise<Result<Goal::WorkResult>> DerivationGoal::work(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DerivationGoal::workImpl() noexcept
{
- return (this->*state)(inBuildSlot);
+ return useDerivation ? getDerivation() : haveDerivation();
}
-void DerivationGoal::addWantedOutputs(const OutputsSpec & outputs)
+bool DerivationGoal::addWantedOutputs(const OutputsSpec & outputs)
{
+ if (isDone) {
+ return false;
+ }
+
auto newWanted = wantedOutputs.union_(outputs);
switch (needRestart) {
case NeedRestartForMoreOutputs::OutputsUnmodifedDontNeed:
@@ -154,10 +152,11 @@ void DerivationGoal::addWantedOutputs(const OutputsSpec & outputs)
break;
};
wantedOutputs = newWanted;
+ return true;
}
-kj::Promise<Result<Goal::WorkResult>> DerivationGoal::getDerivation(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DerivationGoal::getDerivation() noexcept
try {
trace("init");
@@ -165,18 +164,17 @@ try {
exists. If it doesn't, it may be created through a
substitute. */
if (buildMode == bmNormal && worker.evalStore.isValidPath(drvPath)) {
- return loadDerivation(inBuildSlot);
+ co_return co_await loadDerivation();
}
-
- state = &DerivationGoal::loadDerivation;
- return {WaitForGoals{{worker.goalFactory().makePathSubstitutionGoal(drvPath)}}};
+ (co_await waitForGoals(worker.goalFactory().makePathSubstitutionGoal(drvPath))).value();
+ co_return co_await loadDerivation();
} catch (...) {
- return {std::current_exception()};
+ co_return result::failure(std::current_exception());
}
-kj::Promise<Result<Goal::WorkResult>> DerivationGoal::loadDerivation(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DerivationGoal::loadDerivation() noexcept
try {
trace("loading derivation");
@@ -207,13 +205,13 @@ try {
}
assert(drv);
- return haveDerivation(inBuildSlot);
+ return haveDerivation();
} catch (...) {
return {std::current_exception()};
}
-kj::Promise<Result<Goal::WorkResult>> DerivationGoal::haveDerivation(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DerivationGoal::haveDerivation() noexcept
try {
trace("have derivation");
@@ -241,7 +239,7 @@ try {
});
}
- return gaveUpOnSubstitution(inBuildSlot);
+ co_return co_await gaveUpOnSubstitution();
}
for (auto & i : drv->outputsAndOptPaths(worker.store))
@@ -263,19 +261,19 @@ try {
/* If they are all valid, then we're done. */
if (allValid && buildMode == bmNormal) {
- return {done(BuildResult::AlreadyValid, std::move(validOutputs))};
+ co_return done(BuildResult::AlreadyValid, std::move(validOutputs));
}
/* We are first going to try to create the invalid output paths
through substitutes. If that doesn't work, we'll build
them. */
- WaitForGoals result;
+ kj::Vector<std::pair<GoalPtr, kj::Promise<Result<WorkResult>>>> dependencies;
if (settings.useSubstitutes) {
if (parsedDrv->substitutesAllowed()) {
for (auto & [outputName, status] : initialOutputs) {
if (!status.wanted) continue;
if (!status.known)
- result.goals.insert(
+ dependencies.add(
worker.goalFactory().makeDrvOutputSubstitutionGoal(
DrvOutput{status.outputHash, outputName},
buildMode == bmRepair ? Repair : NoRepair
@@ -283,7 +281,7 @@ try {
);
else {
auto * cap = getDerivationCA(*drv);
- result.goals.insert(worker.goalFactory().makePathSubstitutionGoal(
+ dependencies.add(worker.goalFactory().makePathSubstitutionGoal(
status.known->path,
buildMode == bmRepair ? Repair : NoRepair,
cap ? std::optional { *cap } : std::nullopt));
@@ -294,17 +292,15 @@ try {
}
}
- if (result.goals.empty()) { /* to prevent hang (no wake-up event) */
- return outputsSubstitutionTried(inBuildSlot);
- } else {
- state = &DerivationGoal::outputsSubstitutionTried;
- return {std::move(result)};
+ if (!dependencies.empty()) { /* to prevent hang (no wake-up event) */
+ (co_await waitForGoals(dependencies.releaseAsArray())).value();
}
+ co_return co_await outputsSubstitutionTried();
} catch (...) {
- return {std::current_exception()};
+ co_return result::failure(std::current_exception());
}
-kj::Promise<Result<Goal::WorkResult>> DerivationGoal::outputsSubstitutionTried(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DerivationGoal::outputsSubstitutionTried() noexcept
try {
trace("all outputs substituted (maybe)");
@@ -354,7 +350,7 @@ try {
if (needRestart == NeedRestartForMoreOutputs::OutputsAddedDoNeed) {
needRestart = NeedRestartForMoreOutputs::OutputsUnmodifedDontNeed;
- return haveDerivation(inBuildSlot);
+ return haveDerivation();
}
auto [allValid, validOutputs] = checkPathValidity();
@@ -370,7 +366,7 @@ try {
worker.store.printStorePath(drvPath));
/* Nothing to wait for; tail call */
- return gaveUpOnSubstitution(inBuildSlot);
+ return gaveUpOnSubstitution();
} catch (...) {
return {std::current_exception()};
}
@@ -378,9 +374,9 @@ try {
/* At least one of the output paths could not be
produced using a substitute. So we have to build instead. */
-kj::Promise<Result<Goal::WorkResult>> DerivationGoal::gaveUpOnSubstitution(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DerivationGoal::gaveUpOnSubstitution() noexcept
try {
- WaitForGoals result;
+ kj::Vector<std::pair<GoalPtr, kj::Promise<Result<WorkResult>>>> dependencies;
/* At this point we are building all outputs, so if more are wanted there
is no need to restart. */
@@ -393,7 +389,7 @@ try {
addWaiteeDerivedPath = [&](ref<SingleDerivedPath> inputDrv, const DerivedPathMap<StringSet>::ChildNode & inputNode) {
if (!inputNode.value.empty())
- result.goals.insert(worker.goalFactory().makeGoal(
+ dependencies.add(worker.goalFactory().makeGoal(
DerivedPath::Built {
.drvPath = inputDrv,
.outputs = inputNode.value,
@@ -438,17 +434,15 @@ try {
if (!settings.useSubstitutes)
throw Error("dependency '%s' of '%s' does not exist, and substitution is disabled",
worker.store.printStorePath(i), worker.store.printStorePath(drvPath));
- result.goals.insert(worker.goalFactory().makePathSubstitutionGoal(i));
+ dependencies.add(worker.goalFactory().makePathSubstitutionGoal(i));
}
- if (result.goals.empty()) {/* to prevent hang (no wake-up event) */
- return inputsRealised(inBuildSlot);
- } else {
- state = &DerivationGoal::inputsRealised;
- return {result};
+ if (!dependencies.empty()) {/* to prevent hang (no wake-up event) */
+ (co_await waitForGoals(dependencies.releaseAsArray())).value();
}
+ co_return co_await inputsRealised();
} catch (...) {
- return {std::current_exception()};
+ co_return result::failure(std::current_exception());
}
@@ -488,7 +482,7 @@ try {
}
/* Check each path (slow!). */
- WaitForGoals result;
+ kj::Vector<std::pair<GoalPtr, kj::Promise<Result<WorkResult>>>> dependencies;
for (auto & i : outputClosure) {
if (worker.pathContentsGood(i)) continue;
printError(
@@ -496,9 +490,9 @@ try {
worker.store.printStorePath(i), worker.store.printStorePath(drvPath));
auto drvPath2 = outputsToDrv.find(i);
if (drvPath2 == outputsToDrv.end())
- result.goals.insert(worker.goalFactory().makePathSubstitutionGoal(i, Repair));
+ dependencies.add(worker.goalFactory().makePathSubstitutionGoal(i, Repair));
else
- result.goals.insert(worker.goalFactory().makeGoal(
+ dependencies.add(worker.goalFactory().makeGoal(
DerivedPath::Built {
.drvPath = makeConstantStorePathRef(drvPath2->second),
.outputs = OutputsSpec::All { },
@@ -506,18 +500,18 @@ try {
bmRepair));
}
- if (result.goals.empty()) {
- return {done(BuildResult::AlreadyValid, assertPathValidity())};
+ if (dependencies.empty()) {
+ co_return done(BuildResult::AlreadyValid, assertPathValidity());
}
- state = &DerivationGoal::closureRepaired;
- return {result};
+ (co_await waitForGoals(dependencies.releaseAsArray())).value();
+ co_return co_await closureRepaired();
} catch (...) {
- return {std::current_exception()};
+ co_return result::failure(std::current_exception());
}
-kj::Promise<Result<Goal::WorkResult>> DerivationGoal::closureRepaired(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DerivationGoal::closureRepaired() noexcept
try {
trace("closure repaired");
if (nrFailed > 0)
@@ -529,14 +523,14 @@ try {
}
-kj::Promise<Result<Goal::WorkResult>> DerivationGoal::inputsRealised(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DerivationGoal::inputsRealised() noexcept
try {
trace("all inputs realised");
if (nrFailed != 0) {
if (!useDerivation)
throw Error("some dependencies of '%s' are missing", worker.store.printStorePath(drvPath));
- return {done(
+ co_return done(
BuildResult::DependencyFailed,
{},
Error(
@@ -544,12 +538,12 @@ try {
nrFailed,
worker.store.printStorePath(drvPath)
)
- )};
+ );
}
if (retrySubstitution == RetrySubstitution::YesNeed) {
retrySubstitution = RetrySubstitution::AlreadyRetried;
- return haveDerivation(inBuildSlot);
+ co_return co_await haveDerivation();
}
/* Gather information necessary for computing the closure and/or
@@ -611,11 +605,12 @@ try {
worker.store.printStorePath(pathResolved),
});
- resolvedDrvGoal = worker.goalFactory().makeDerivationGoal(
+ auto dependency = worker.goalFactory().makeDerivationGoal(
pathResolved, wantedOutputs, buildMode);
+ resolvedDrvGoal = dependency.first;
- state = &DerivationGoal::resolvedFinished;
- return {WaitForGoals{{resolvedDrvGoal}}};
+ (co_await waitForGoals(std::move(dependency))).value();
+ co_return co_await resolvedFinished();
}
std::function<void(const StorePath &, const DerivedPathMap<StringSet>::ChildNode &)> accumInputPaths;
@@ -679,10 +674,9 @@ try {
/* Okay, try to build. Note that here we don't wait for a build
slot to become available, since we don't need one if there is a
build hook. */
- state = &DerivationGoal::tryToBuild;
- return tryToBuild(inBuildSlot);
+ co_return co_await tryToBuild();
} catch (...) {
- return {std::current_exception()};
+ co_return result::failure(std::current_exception());
}
void DerivationGoal::started()
@@ -698,8 +692,9 @@ void DerivationGoal::started()
mcRunningBuilds = worker.runningBuilds.addTemporarily(1);
}
-kj::Promise<Result<Goal::WorkResult>> DerivationGoal::tryToBuild(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DerivationGoal::tryToBuild() noexcept
try {
+retry:
trace("trying to build");
/* Obtain locks on all output paths, if the paths are known a priori.
@@ -733,7 +728,9 @@ try {
if (!actLock)
actLock = std::make_unique<Activity>(*logger, lvlWarn, actBuildWaiting,
fmt("waiting for lock on %s", Magenta(showPaths(lockFiles))));
- return {WaitForAWhile{}};
+ co_await waitForAWhile();
+ // we can loop very often, and `co_return co_await` always allocates a new frame
+ goto retry;
}
actLock.reset();
@@ -750,7 +747,7 @@ try {
if (buildMode != bmCheck && allValid) {
debug("skipping build of derivation '%s', someone beat us to it", worker.store.printStorePath(drvPath));
outputLocks.setDeletion(true);
- return {done(BuildResult::AlreadyValid, std::move(validOutputs))};
+ co_return done(BuildResult::AlreadyValid, std::move(validOutputs));
}
/* If any of the outputs already exist but are not valid, delete
@@ -770,47 +767,56 @@ try {
&& settings.maxBuildJobs.get() != 0;
if (!buildLocally) {
- auto hookReply = tryBuildHook(inBuildSlot);
- auto result = std::visit(
- overloaded{
- [&](HookReply::Accept & a) -> std::optional<WorkResult> {
- /* Yes, it has started doing so. Wait until we get
- EOF from the hook. */
- actLock.reset();
- buildResult.startTime = time(0); // inexact
- state = &DerivationGoal::buildDone;
- started();
- return WaitForWorld{std::move(a.fds), false};
- },
- [&](HookReply::Postpone) -> std::optional<WorkResult> {
- /* Not now; wait until at least one child finishes or
- the wake-up timeout expires. */
- if (!actLock)
- actLock = std::make_unique<Activity>(*logger, lvlTalkative, actBuildWaiting,
- fmt("waiting for a machine to build '%s'", Magenta(worker.store.printStorePath(drvPath))));
- outputLocks.unlock();
- return WaitForAWhile{};
- },
- [&](HookReply::Decline) -> std::optional<WorkResult> {
- /* We should do it ourselves. */
- return std::nullopt;
- },
- },
- hookReply);
- if (result) {
- return {std::move(*result)};
+ auto hookReply = tryBuildHook();
+ switch (hookReply.index()) {
+ case 0: {
+ HookReply::Accept & a = std::get<0>(hookReply);
+ /* Yes, it has started doing so. Wait until we get
+ EOF from the hook. */
+ actLock.reset();
+ buildResult.startTime = time(0); // inexact
+ started();
+ auto r = co_await a.promise;
+ if (r.has_value()) {
+ co_return co_await buildDone();
+ } else if (r.has_error()) {
+ co_return r.assume_error();
+ } else {
+ co_return r.assume_exception();
+ }
+ }
+
+ case 1: {
+ HookReply::Decline _ [[gnu::unused]] = std::get<1>(hookReply);
+ break;
+ }
+
+ case 2: {
+ HookReply::Postpone _ [[gnu::unused]] = std::get<2>(hookReply);
+ /* Not now; wait until at least one child finishes or
+ the wake-up timeout expires. */
+ if (!actLock)
+ actLock = std::make_unique<Activity>(*logger, lvlTalkative, actBuildWaiting,
+ fmt("waiting for a machine to build '%s'", Magenta(worker.store.printStorePath(drvPath))));
+ outputLocks.unlock();
+ co_await waitForAWhile();
+ goto retry;
+ }
+
+ default:
+ // can't static_assert this because HookReply *subclasses* variant and std::variant_size breaks
+ assert(false && "unexpected hook reply");
}
}
actLock.reset();
- state = &DerivationGoal::tryLocalBuild;
- return tryLocalBuild(inBuildSlot);
+ co_return co_await tryLocalBuild();
} catch (...) {
- return {std::current_exception()};
+ co_return result::failure(std::current_exception());
}
-kj::Promise<Result<Goal::WorkResult>> DerivationGoal::tryLocalBuild(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DerivationGoal::tryLocalBuild() noexcept
try {
throw Error(
"unable to build with a primary store that isn't a local store; "
@@ -857,7 +863,7 @@ void replaceValidPath(const Path & storePath, const Path & tmpPath)
// attempt to recover
movePath(oldPath, storePath);
} catch (...) {
- ignoreException();
+ ignoreExceptionExceptInterrupt();
}
throw;
}
@@ -973,10 +979,11 @@ void runPostBuildHook(
proc.getStdout()->drainInto(sink);
}
-kj::Promise<Result<Goal::WorkResult>> DerivationGoal::buildDone(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DerivationGoal::buildDone() noexcept
try {
trace("build done");
+ slotToken = {};
Finally releaseBuildUser([&](){ this->cleanupHookFinally(); });
cleanupPreChildKill();
@@ -992,9 +999,6 @@ try {
buildResult.timesBuilt++;
buildResult.stopTime = time(0);
- /* So the child is gone now. */
- worker.childTerminated(this);
-
/* Close the read side of the logger pipe. */
closeReadPipes();
@@ -1095,7 +1099,7 @@ try {
return {std::current_exception()};
}
-kj::Promise<Result<Goal::WorkResult>> DerivationGoal::resolvedFinished(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DerivationGoal::resolvedFinished() noexcept
try {
trace("resolved derivation finished");
@@ -1168,7 +1172,7 @@ try {
return {std::current_exception()};
}
-HookReply DerivationGoal::tryBuildHook(bool inBuildSlot)
+HookReply DerivationGoal::tryBuildHook()
{
if (!worker.hook.available || !useDerivation) return HookReply::Decline{};
@@ -1180,7 +1184,7 @@ HookReply DerivationGoal::tryBuildHook(bool inBuildSlot)
/* Send the request to the hook. */
worker.hook.instance->sink
<< "try"
- << (inBuildSlot ? 1 : 0)
+ << (slotToken.valid() ? 1 : 0)
<< drv->platform
<< worker.store.printStorePath(drvPath)
<< parsedDrv->getRequiredSystemFeatures();
@@ -1207,6 +1211,7 @@ HookReply DerivationGoal::tryBuildHook(bool inBuildSlot)
else {
s += "\n";
writeLogsToStderr(s);
+ logger->log(lvlInfo, s);
}
}
@@ -1266,12 +1271,8 @@ HookReply DerivationGoal::tryBuildHook(bool inBuildSlot)
/* Create the log file and pipe. */
Path logFile = openLogFile();
- std::set<int> fds;
- fds.insert(hook->fromHook.get());
- fds.insert(hook->builderOut.get());
builderOutFD = &hook->builderOut;
-
- return HookReply::Accept{std::move(fds)};
+ return HookReply::Accept{handleChildOutput()};
}
@@ -1331,23 +1332,69 @@ void DerivationGoal::closeLogFile()
}
-Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data)
+Goal::WorkResult DerivationGoal::tooMuchLogs()
{
- assert(builderOutFD);
+ killChild();
+ return done(
+ BuildResult::LogLimitExceeded, {},
+ Error("%s killed after writing more than %d bytes of log output",
+ getName(), settings.maxLogSize));
+}
- auto tooMuchLogs = [&] {
- killChild();
- return done(
- BuildResult::LogLimitExceeded, {},
- Error("%s killed after writing more than %d bytes of log output",
- getName(), settings.maxLogSize));
- };
+struct DerivationGoal::InputStream final : private kj::AsyncObject
+{
+ int fd;
+ kj::UnixEventPort::FdObserver observer;
+
+ InputStream(kj::UnixEventPort & ep, int fd)
+ : fd(fd)
+ , observer(ep, fd, kj::UnixEventPort::FdObserver::OBSERVE_READ)
+ {
+ int flags = fcntl(fd, F_GETFL);
+ if (flags < 0) {
+ throw SysError("fcntl(F_GETFL) failed on fd %i", fd);
+ }
+ if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) {
+ throw SysError("fcntl(F_SETFL) failed on fd %i", fd);
+ }
+ }
+
+ kj::Promise<std::string_view> read(kj::ArrayPtr<char> buffer)
+ {
+ const auto res = ::read(fd, buffer.begin(), buffer.size());
+ // closing a pty endpoint causes EIO on the other endpoint. stock kj streams
+ // do not handle this and throw exceptions we can't ask for errno instead :(
+ // (we can't use `errno` either because kj may well have mangled it by now.)
+ if (res == 0 || (res == -1 && errno == EIO)) {
+ return std::string_view{};
+ }
+
+ KJ_NONBLOCKING_SYSCALL(res) {}
+
+ if (res > 0) {
+ return std::string_view{buffer.begin(), static_cast<size_t>(res)};
+ }
+
+ return observer.whenBecomesReadable().then([this, buffer] {
+ return read(buffer);
+ });
+ }
+};
+
+kj::Promise<Outcome<void, Goal::WorkResult>> DerivationGoal::handleBuilderOutput(InputStream & in) noexcept
+try {
+ auto buf = kj::heapArray<char>(4096);
+ while (true) {
+ auto data = co_await in.read(buf);
+ lastChildActivity = worker.aio.provider->getTimer().now();
+
+ if (data.empty()) {
+ co_return result::success();
+ }
- // local & `ssh://`-builds are dealt with here.
- if (fd == builderOutFD->get()) {
logSize += data.size();
if (settings.maxLogSize && logSize > settings.maxLogSize) {
- return tooMuchLogs();
+ co_return tooMuchLogs();
}
for (auto c : data)
@@ -1362,10 +1409,22 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data
}
if (logSink) (*logSink)(data);
- return StillAlive{};
}
+} catch (...) {
+ co_return std::current_exception();
+}
+
+kj::Promise<Outcome<void, Goal::WorkResult>> DerivationGoal::handleHookOutput(InputStream & in) noexcept
+try {
+ auto buf = kj::heapArray<char>(4096);
+ while (true) {
+ auto data = co_await in.read(buf);
+ lastChildActivity = worker.aio.provider->getTimer().now();
+
+ if (data.empty()) {
+ co_return result::success();
+ }
- if (hook && fd == hook->fromHook.get()) {
for (auto c : data)
if (c == '\n') {
auto json = parseJSONMessage(currentHookLine);
@@ -1381,7 +1440,7 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data
(fields.size() > 0 ? fields[0].get<std::string>() : "") + "\n";
logSize += logLine.size();
if (settings.maxLogSize && logSize > settings.maxLogSize) {
- return tooMuchLogs();
+ co_return tooMuchLogs();
}
(*logSink)(logLine);
} else if (type == resSetPhase && ! fields.is_null()) {
@@ -1405,16 +1464,83 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data
} else
currentHookLine += c;
}
-
- return StillAlive{};
+} catch (...) {
+ co_return std::current_exception();
}
+kj::Promise<Outcome<void, Goal::WorkResult>> DerivationGoal::handleChildOutput() noexcept
+try {
+ assert(builderOutFD);
+
+ auto builderIn = kj::heap<InputStream>(worker.aio.unixEventPort, builderOutFD->get());
+ kj::Own<InputStream> hookIn;
+ if (hook) {
+ hookIn = kj::heap<InputStream>(worker.aio.unixEventPort, hook->fromHook.get());
+ }
-void DerivationGoal::handleEOF(int fd)
+ auto handlers = handleChildStreams(*builderIn, hookIn.get()).attach(std::move(builderIn), std::move(hookIn));
+
+ if (respectsTimeouts() && settings.buildTimeout != 0) {
+ handlers = handlers.exclusiveJoin(
+ worker.aio.provider->getTimer()
+ .afterDelay(settings.buildTimeout.get() * kj::SECONDS)
+ .then([this]() -> Outcome<void, WorkResult> {
+ return timedOut(
+ Error("%1% timed out after %2% seconds", name, settings.buildTimeout)
+ );
+ })
+ );
+ }
+
+ return handlers.then([this](auto r) -> Outcome<void, WorkResult> {
+ if (!currentLogLine.empty()) flushLine();
+ return r;
+ });
+} catch (...) {
+ return {std::current_exception()};
+}
+
+kj::Promise<Outcome<void, Goal::WorkResult>> DerivationGoal::monitorForSilence() noexcept
{
- if (!currentLogLine.empty()) flushLine();
+ while (true) {
+ const auto stash = lastChildActivity;
+ auto waitUntil = lastChildActivity + settings.maxSilentTime.get() * kj::SECONDS;
+ co_await worker.aio.provider->getTimer().atTime(waitUntil);
+ if (lastChildActivity == stash) {
+ co_return timedOut(
+ Error("%1% timed out after %2% seconds of silence", name, settings.maxSilentTime)
+ );
+ }
+ }
}
+kj::Promise<Outcome<void, Goal::WorkResult>>
+DerivationGoal::handleChildStreams(InputStream & builderIn, InputStream * hookIn) noexcept
+{
+ lastChildActivity = worker.aio.provider->getTimer().now();
+
+ auto handlers = kj::joinPromisesFailFast([&] {
+ kj::Vector<kj::Promise<Outcome<void, WorkResult>>> parts{2};
+
+ parts.add(handleBuilderOutput(builderIn));
+ if (hookIn) {
+ parts.add(handleHookOutput(*hookIn));
+ }
+
+ return parts.releaseAsArray();
+ }());
+
+ if (respectsTimeouts() && settings.maxSilentTime != 0) {
+ handlers = handlers.exclusiveJoin(monitorForSilence().then([](auto r) {
+ return kj::arr(std::move(r));
+ }));
+ }
+
+ for (auto r : co_await handlers) {
+ BOOST_OUTCOME_CO_TRYV(r);
+ }
+ co_return result::success();
+}
void DerivationGoal::flushLine()
{
@@ -1555,11 +1681,13 @@ SingleDrvOutputs DerivationGoal::assertPathValidity()
}
-Goal::Finished DerivationGoal::done(
+Goal::WorkResult DerivationGoal::done(
BuildResult::Status status,
SingleDrvOutputs builtOutputs,
std::optional<Error> ex)
{
+ isDone = true;
+
outputLocks.unlock();
buildResult.status = status;
if (ex)
@@ -1590,7 +1718,7 @@ Goal::Finished DerivationGoal::done(
logError(ex->info());
}
- return Finished{
+ return WorkResult{
.exitCode = buildResult.success() ? ecSuccess : ecFailed,
.result = buildResult,
.ex = ex ? std::make_shared<Error>(std::move(*ex)) : nullptr,
@@ -1629,5 +1757,4 @@ void DerivationGoal::waiteeDone(GoalPtr waitee)
}
}
}
-
}