aboutsummaryrefslogtreecommitdiff
path: root/src/libstore/build
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstore/build')
-rw-r--r--src/libstore/build/derivation-goal.cc407
-rw-r--r--src/libstore/build/derivation-goal.hh76
-rw-r--r--src/libstore/build/drv-output-substitution-goal.cc75
-rw-r--r--src/libstore/build/drv-output-substitution-goal.hh18
-rw-r--r--src/libstore/build/entry-points.cc60
-rw-r--r--src/libstore/build/goal.cc69
-rw-r--r--src/libstore/build/goal.hh109
-rw-r--r--src/libstore/build/hook-instance.cc3
-rw-r--r--src/libstore/build/local-derivation-goal.cc72
-rw-r--r--src/libstore/build/local-derivation-goal.hh8
-rw-r--r--src/libstore/build/substitution-goal.cc160
-rw-r--r--src/libstore/build/substitution-goal.hh34
-rw-r--r--src/libstore/build/worker.cc605
-rw-r--r--src/libstore/build/worker.hh188
14 files changed, 837 insertions, 1047 deletions
diff --git a/src/libstore/build/derivation-goal.cc b/src/libstore/build/derivation-goal.cc
index 827c9f541..96140e10b 100644
--- a/src/libstore/build/derivation-goal.cc
+++ b/src/libstore/build/derivation-goal.cc
@@ -11,7 +11,13 @@
#include "drv-output-substitution-goal.hh"
#include "strings.hh"
+#include <boost/outcome/try.hpp>
#include <fstream>
+#include <kj/array.h>
+#include <kj/async-unix.h>
+#include <kj/async.h>
+#include <kj/debug.h>
+#include <kj/vector.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
@@ -65,7 +71,6 @@ DerivationGoal::DerivationGoal(const StorePath & drvPath,
, wantedOutputs(wantedOutputs)
, buildMode(buildMode)
{
- state = &DerivationGoal::getDerivation;
name = fmt(
"building of '%s' from .drv file",
DerivedPath::Built { makeConstantStorePathRef(drvPath), wantedOutputs }.to_string(worker.store));
@@ -85,7 +90,6 @@ DerivationGoal::DerivationGoal(const StorePath & drvPath, const BasicDerivation
{
this->drv = std::make_unique<Derivation>(drv);
- state = &DerivationGoal::haveDerivation;
name = fmt(
"building of '%s' from in-memory derivation",
DerivedPath::Built { makeConstantStorePathRef(drvPath), drv.outputNames() }.to_string(worker.store));
@@ -103,17 +107,7 @@ DerivationGoal::~DerivationGoal() noexcept(false)
{
/* Careful: we should never ever throw an exception from a
destructor. */
- try { closeLogFile(); } catch (...) { ignoreException(); }
-}
-
-
-std::string DerivationGoal::key()
-{
- /* Ensure that derivations get built in order of their name,
- i.e. a derivation named "aardvark" always comes before
- "baboon". And substitution goals always happen before
- derivation goals (due to "b$"). */
- return "b$" + std::string(drvPath.name()) + "$" + worker.store.printStorePath(drvPath);
+ try { closeLogFile(); } catch (...) { ignoreExceptionInDestructor(); }
}
@@ -124,20 +118,24 @@ void DerivationGoal::killChild()
}
-Goal::Finished DerivationGoal::timedOut(Error && ex)
+Goal::WorkResult DerivationGoal::timedOut(Error && ex)
{
killChild();
return done(BuildResult::TimedOut, {}, std::move(ex));
}
-kj::Promise<Result<Goal::WorkResult>> DerivationGoal::work(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DerivationGoal::workImpl() noexcept
{
- return (this->*state)(inBuildSlot);
+ return useDerivation ? getDerivation() : haveDerivation();
}
-void DerivationGoal::addWantedOutputs(const OutputsSpec & outputs)
+bool DerivationGoal::addWantedOutputs(const OutputsSpec & outputs)
{
+ if (isDone) {
+ return false;
+ }
+
auto newWanted = wantedOutputs.union_(outputs);
switch (needRestart) {
case NeedRestartForMoreOutputs::OutputsUnmodifedDontNeed:
@@ -154,10 +152,11 @@ void DerivationGoal::addWantedOutputs(const OutputsSpec & outputs)
break;
};
wantedOutputs = newWanted;
+ return true;
}
-kj::Promise<Result<Goal::WorkResult>> DerivationGoal::getDerivation(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DerivationGoal::getDerivation() noexcept
try {
trace("init");
@@ -165,18 +164,17 @@ try {
exists. If it doesn't, it may be created through a
substitute. */
if (buildMode == bmNormal && worker.evalStore.isValidPath(drvPath)) {
- return loadDerivation(inBuildSlot);
+ co_return co_await loadDerivation();
}
-
- state = &DerivationGoal::loadDerivation;
- return {WaitForGoals{{worker.goalFactory().makePathSubstitutionGoal(drvPath)}}};
+ (co_await waitForGoals(worker.goalFactory().makePathSubstitutionGoal(drvPath))).value();
+ co_return co_await loadDerivation();
} catch (...) {
- return {std::current_exception()};
+ co_return result::failure(std::current_exception());
}
-kj::Promise<Result<Goal::WorkResult>> DerivationGoal::loadDerivation(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DerivationGoal::loadDerivation() noexcept
try {
trace("loading derivation");
@@ -207,13 +205,13 @@ try {
}
assert(drv);
- return haveDerivation(inBuildSlot);
+ return haveDerivation();
} catch (...) {
return {std::current_exception()};
}
-kj::Promise<Result<Goal::WorkResult>> DerivationGoal::haveDerivation(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DerivationGoal::haveDerivation() noexcept
try {
trace("have derivation");
@@ -241,7 +239,7 @@ try {
});
}
- return gaveUpOnSubstitution(inBuildSlot);
+ co_return co_await gaveUpOnSubstitution();
}
for (auto & i : drv->outputsAndOptPaths(worker.store))
@@ -263,19 +261,19 @@ try {
/* If they are all valid, then we're done. */
if (allValid && buildMode == bmNormal) {
- return {done(BuildResult::AlreadyValid, std::move(validOutputs))};
+ co_return done(BuildResult::AlreadyValid, std::move(validOutputs));
}
/* We are first going to try to create the invalid output paths
through substitutes. If that doesn't work, we'll build
them. */
- WaitForGoals result;
+ kj::Vector<std::pair<GoalPtr, kj::Promise<Result<WorkResult>>>> dependencies;
if (settings.useSubstitutes) {
if (parsedDrv->substitutesAllowed()) {
for (auto & [outputName, status] : initialOutputs) {
if (!status.wanted) continue;
if (!status.known)
- result.goals.insert(
+ dependencies.add(
worker.goalFactory().makeDrvOutputSubstitutionGoal(
DrvOutput{status.outputHash, outputName},
buildMode == bmRepair ? Repair : NoRepair
@@ -283,7 +281,7 @@ try {
);
else {
auto * cap = getDerivationCA(*drv);
- result.goals.insert(worker.goalFactory().makePathSubstitutionGoal(
+ dependencies.add(worker.goalFactory().makePathSubstitutionGoal(
status.known->path,
buildMode == bmRepair ? Repair : NoRepair,
cap ? std::optional { *cap } : std::nullopt));
@@ -294,17 +292,15 @@ try {
}
}
- if (result.goals.empty()) { /* to prevent hang (no wake-up event) */
- return outputsSubstitutionTried(inBuildSlot);
- } else {
- state = &DerivationGoal::outputsSubstitutionTried;
- return {std::move(result)};
+ if (!dependencies.empty()) { /* to prevent hang (no wake-up event) */
+ (co_await waitForGoals(dependencies.releaseAsArray())).value();
}
+ co_return co_await outputsSubstitutionTried();
} catch (...) {
- return {std::current_exception()};
+ co_return result::failure(std::current_exception());
}
-kj::Promise<Result<Goal::WorkResult>> DerivationGoal::outputsSubstitutionTried(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DerivationGoal::outputsSubstitutionTried() noexcept
try {
trace("all outputs substituted (maybe)");
@@ -354,7 +350,7 @@ try {
if (needRestart == NeedRestartForMoreOutputs::OutputsAddedDoNeed) {
needRestart = NeedRestartForMoreOutputs::OutputsUnmodifedDontNeed;
- return haveDerivation(inBuildSlot);
+ return haveDerivation();
}
auto [allValid, validOutputs] = checkPathValidity();
@@ -370,7 +366,7 @@ try {
worker.store.printStorePath(drvPath));
/* Nothing to wait for; tail call */
- return gaveUpOnSubstitution(inBuildSlot);
+ return gaveUpOnSubstitution();
} catch (...) {
return {std::current_exception()};
}
@@ -378,9 +374,9 @@ try {
/* At least one of the output paths could not be
produced using a substitute. So we have to build instead. */
-kj::Promise<Result<Goal::WorkResult>> DerivationGoal::gaveUpOnSubstitution(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DerivationGoal::gaveUpOnSubstitution() noexcept
try {
- WaitForGoals result;
+ kj::Vector<std::pair<GoalPtr, kj::Promise<Result<WorkResult>>>> dependencies;
/* At this point we are building all outputs, so if more are wanted there
is no need to restart. */
@@ -393,7 +389,7 @@ try {
addWaiteeDerivedPath = [&](ref<SingleDerivedPath> inputDrv, const DerivedPathMap<StringSet>::ChildNode & inputNode) {
if (!inputNode.value.empty())
- result.goals.insert(worker.goalFactory().makeGoal(
+ dependencies.add(worker.goalFactory().makeGoal(
DerivedPath::Built {
.drvPath = inputDrv,
.outputs = inputNode.value,
@@ -438,17 +434,15 @@ try {
if (!settings.useSubstitutes)
throw Error("dependency '%s' of '%s' does not exist, and substitution is disabled",
worker.store.printStorePath(i), worker.store.printStorePath(drvPath));
- result.goals.insert(worker.goalFactory().makePathSubstitutionGoal(i));
+ dependencies.add(worker.goalFactory().makePathSubstitutionGoal(i));
}
- if (result.goals.empty()) {/* to prevent hang (no wake-up event) */
- return inputsRealised(inBuildSlot);
- } else {
- state = &DerivationGoal::inputsRealised;
- return {result};
+ if (!dependencies.empty()) {/* to prevent hang (no wake-up event) */
+ (co_await waitForGoals(dependencies.releaseAsArray())).value();
}
+ co_return co_await inputsRealised();
} catch (...) {
- return {std::current_exception()};
+ co_return result::failure(std::current_exception());
}
@@ -488,7 +482,7 @@ try {
}
/* Check each path (slow!). */
- WaitForGoals result;
+ kj::Vector<std::pair<GoalPtr, kj::Promise<Result<WorkResult>>>> dependencies;
for (auto & i : outputClosure) {
if (worker.pathContentsGood(i)) continue;
printError(
@@ -496,9 +490,9 @@ try {
worker.store.printStorePath(i), worker.store.printStorePath(drvPath));
auto drvPath2 = outputsToDrv.find(i);
if (drvPath2 == outputsToDrv.end())
- result.goals.insert(worker.goalFactory().makePathSubstitutionGoal(i, Repair));
+ dependencies.add(worker.goalFactory().makePathSubstitutionGoal(i, Repair));
else
- result.goals.insert(worker.goalFactory().makeGoal(
+ dependencies.add(worker.goalFactory().makeGoal(
DerivedPath::Built {
.drvPath = makeConstantStorePathRef(drvPath2->second),
.outputs = OutputsSpec::All { },
@@ -506,18 +500,18 @@ try {
bmRepair));
}
- if (result.goals.empty()) {
- return {done(BuildResult::AlreadyValid, assertPathValidity())};
+ if (dependencies.empty()) {
+ co_return done(BuildResult::AlreadyValid, assertPathValidity());
}
- state = &DerivationGoal::closureRepaired;
- return {result};
+ (co_await waitForGoals(dependencies.releaseAsArray())).value();
+ co_return co_await closureRepaired();
} catch (...) {
- return {std::current_exception()};
+ co_return result::failure(std::current_exception());
}
-kj::Promise<Result<Goal::WorkResult>> DerivationGoal::closureRepaired(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DerivationGoal::closureRepaired() noexcept
try {
trace("closure repaired");
if (nrFailed > 0)
@@ -529,14 +523,14 @@ try {
}
-kj::Promise<Result<Goal::WorkResult>> DerivationGoal::inputsRealised(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DerivationGoal::inputsRealised() noexcept
try {
trace("all inputs realised");
if (nrFailed != 0) {
if (!useDerivation)
throw Error("some dependencies of '%s' are missing", worker.store.printStorePath(drvPath));
- return {done(
+ co_return done(
BuildResult::DependencyFailed,
{},
Error(
@@ -544,12 +538,12 @@ try {
nrFailed,
worker.store.printStorePath(drvPath)
)
- )};
+ );
}
if (retrySubstitution == RetrySubstitution::YesNeed) {
retrySubstitution = RetrySubstitution::AlreadyRetried;
- return haveDerivation(inBuildSlot);
+ co_return co_await haveDerivation();
}
/* Gather information necessary for computing the closure and/or
@@ -611,11 +605,12 @@ try {
worker.store.printStorePath(pathResolved),
});
- resolvedDrvGoal = worker.goalFactory().makeDerivationGoal(
+ auto dependency = worker.goalFactory().makeDerivationGoal(
pathResolved, wantedOutputs, buildMode);
+ resolvedDrvGoal = dependency.first;
- state = &DerivationGoal::resolvedFinished;
- return {WaitForGoals{{resolvedDrvGoal}}};
+ (co_await waitForGoals(std::move(dependency))).value();
+ co_return co_await resolvedFinished();
}
std::function<void(const StorePath &, const DerivedPathMap<StringSet>::ChildNode &)> accumInputPaths;
@@ -679,10 +674,9 @@ try {
/* Okay, try to build. Note that here we don't wait for a build
slot to become available, since we don't need one if there is a
build hook. */
- state = &DerivationGoal::tryToBuild;
- return tryToBuild(inBuildSlot);
+ co_return co_await tryToBuild();
} catch (...) {
- return {std::current_exception()};
+ co_return result::failure(std::current_exception());
}
void DerivationGoal::started()
@@ -698,8 +692,9 @@ void DerivationGoal::started()
mcRunningBuilds = worker.runningBuilds.addTemporarily(1);
}
-kj::Promise<Result<Goal::WorkResult>> DerivationGoal::tryToBuild(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DerivationGoal::tryToBuild() noexcept
try {
+retry:
trace("trying to build");
/* Obtain locks on all output paths, if the paths are known a priori.
@@ -733,7 +728,9 @@ try {
if (!actLock)
actLock = std::make_unique<Activity>(*logger, lvlWarn, actBuildWaiting,
fmt("waiting for lock on %s", Magenta(showPaths(lockFiles))));
- return {WaitForAWhile{}};
+ co_await waitForAWhile();
+ // we can loop very often, and `co_return co_await` always allocates a new frame
+ goto retry;
}
actLock.reset();
@@ -750,7 +747,7 @@ try {
if (buildMode != bmCheck && allValid) {
debug("skipping build of derivation '%s', someone beat us to it", worker.store.printStorePath(drvPath));
outputLocks.setDeletion(true);
- return {done(BuildResult::AlreadyValid, std::move(validOutputs))};
+ co_return done(BuildResult::AlreadyValid, std::move(validOutputs));
}
/* If any of the outputs already exist but are not valid, delete
@@ -770,47 +767,56 @@ try {
&& settings.maxBuildJobs.get() != 0;
if (!buildLocally) {
- auto hookReply = tryBuildHook(inBuildSlot);
- auto result = std::visit(
- overloaded{
- [&](HookReply::Accept & a) -> std::optional<WorkResult> {
- /* Yes, it has started doing so. Wait until we get
- EOF from the hook. */
- actLock.reset();
- buildResult.startTime = time(0); // inexact
- state = &DerivationGoal::buildDone;
- started();
- return WaitForWorld{std::move(a.fds), false};
- },
- [&](HookReply::Postpone) -> std::optional<WorkResult> {
- /* Not now; wait until at least one child finishes or
- the wake-up timeout expires. */
- if (!actLock)
- actLock = std::make_unique<Activity>(*logger, lvlTalkative, actBuildWaiting,
- fmt("waiting for a machine to build '%s'", Magenta(worker.store.printStorePath(drvPath))));
- outputLocks.unlock();
- return WaitForAWhile{};
- },
- [&](HookReply::Decline) -> std::optional<WorkResult> {
- /* We should do it ourselves. */
- return std::nullopt;
- },
- },
- hookReply);
- if (result) {
- return {std::move(*result)};
+ auto hookReply = tryBuildHook();
+ switch (hookReply.index()) {
+ case 0: {
+ HookReply::Accept & a = std::get<0>(hookReply);
+ /* Yes, it has started doing so. Wait until we get
+ EOF from the hook. */
+ actLock.reset();
+ buildResult.startTime = time(0); // inexact
+ started();
+ auto r = co_await a.promise;
+ if (r.has_value()) {
+ co_return co_await buildDone();
+ } else if (r.has_error()) {
+ co_return r.assume_error();
+ } else {
+ co_return r.assume_exception();
+ }
+ }
+
+ case 1: {
+ HookReply::Decline _ [[gnu::unused]] = std::get<1>(hookReply);
+ break;
+ }
+
+ case 2: {
+ HookReply::Postpone _ [[gnu::unused]] = std::get<2>(hookReply);
+ /* Not now; wait until at least one child finishes or
+ the wake-up timeout expires. */
+ if (!actLock)
+ actLock = std::make_unique<Activity>(*logger, lvlTalkative, actBuildWaiting,
+ fmt("waiting for a machine to build '%s'", Magenta(worker.store.printStorePath(drvPath))));
+ outputLocks.unlock();
+ co_await waitForAWhile();
+ goto retry;
+ }
+
+ default:
+ // can't static_assert this because HookReply *subclasses* variant and std::variant_size breaks
+ assert(false && "unexpected hook reply");
}
}
actLock.reset();
- state = &DerivationGoal::tryLocalBuild;
- return tryLocalBuild(inBuildSlot);
+ co_return co_await tryLocalBuild();
} catch (...) {
- return {std::current_exception()};
+ co_return result::failure(std::current_exception());
}
-kj::Promise<Result<Goal::WorkResult>> DerivationGoal::tryLocalBuild(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DerivationGoal::tryLocalBuild() noexcept
try {
throw Error(
"unable to build with a primary store that isn't a local store; "
@@ -857,7 +863,7 @@ void replaceValidPath(const Path & storePath, const Path & tmpPath)
// attempt to recover
movePath(oldPath, storePath);
} catch (...) {
- ignoreException();
+ ignoreExceptionExceptInterrupt();
}
throw;
}
@@ -973,10 +979,11 @@ void runPostBuildHook(
proc.getStdout()->drainInto(sink);
}
-kj::Promise<Result<Goal::WorkResult>> DerivationGoal::buildDone(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DerivationGoal::buildDone() noexcept
try {
trace("build done");
+ slotToken = {};
Finally releaseBuildUser([&](){ this->cleanupHookFinally(); });
cleanupPreChildKill();
@@ -992,9 +999,6 @@ try {
buildResult.timesBuilt++;
buildResult.stopTime = time(0);
- /* So the child is gone now. */
- worker.childTerminated(this);
-
/* Close the read side of the logger pipe. */
closeReadPipes();
@@ -1095,7 +1099,7 @@ try {
return {std::current_exception()};
}
-kj::Promise<Result<Goal::WorkResult>> DerivationGoal::resolvedFinished(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DerivationGoal::resolvedFinished() noexcept
try {
trace("resolved derivation finished");
@@ -1168,7 +1172,7 @@ try {
return {std::current_exception()};
}
-HookReply DerivationGoal::tryBuildHook(bool inBuildSlot)
+HookReply DerivationGoal::tryBuildHook()
{
if (!worker.hook.available || !useDerivation) return HookReply::Decline{};
@@ -1180,7 +1184,7 @@ HookReply DerivationGoal::tryBuildHook(bool inBuildSlot)
/* Send the request to the hook. */
worker.hook.instance->sink
<< "try"
- << (inBuildSlot ? 1 : 0)
+ << (slotToken.valid() ? 1 : 0)
<< drv->platform
<< worker.store.printStorePath(drvPath)
<< parsedDrv->getRequiredSystemFeatures();
@@ -1207,6 +1211,7 @@ HookReply DerivationGoal::tryBuildHook(bool inBuildSlot)
else {
s += "\n";
writeLogsToStderr(s);
+ logger->log(lvlInfo, s);
}
}
@@ -1266,12 +1271,8 @@ HookReply DerivationGoal::tryBuildHook(bool inBuildSlot)
/* Create the log file and pipe. */
Path logFile = openLogFile();
- std::set<int> fds;
- fds.insert(hook->fromHook.get());
- fds.insert(hook->builderOut.get());
builderOutFD = &hook->builderOut;
-
- return HookReply::Accept{std::move(fds)};
+ return HookReply::Accept{handleChildOutput()};
}
@@ -1331,23 +1332,69 @@ void DerivationGoal::closeLogFile()
}
-Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data)
+Goal::WorkResult DerivationGoal::tooMuchLogs()
{
- assert(builderOutFD);
+ killChild();
+ return done(
+ BuildResult::LogLimitExceeded, {},
+ Error("%s killed after writing more than %d bytes of log output",
+ getName(), settings.maxLogSize));
+}
- auto tooMuchLogs = [&] {
- killChild();
- return done(
- BuildResult::LogLimitExceeded, {},
- Error("%s killed after writing more than %d bytes of log output",
- getName(), settings.maxLogSize));
- };
+struct DerivationGoal::InputStream final : private kj::AsyncObject
+{
+ int fd;
+ kj::UnixEventPort::FdObserver observer;
+
+ InputStream(kj::UnixEventPort & ep, int fd)
+ : fd(fd)
+ , observer(ep, fd, kj::UnixEventPort::FdObserver::OBSERVE_READ)
+ {
+ int flags = fcntl(fd, F_GETFL);
+ if (flags < 0) {
+ throw SysError("fcntl(F_GETFL) failed on fd %i", fd);
+ }
+ if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) {
+ throw SysError("fcntl(F_SETFL) failed on fd %i", fd);
+ }
+ }
+
+ kj::Promise<std::string_view> read(kj::ArrayPtr<char> buffer)
+ {
+ const auto res = ::read(fd, buffer.begin(), buffer.size());
+ // closing a pty endpoint causes EIO on the other endpoint. stock kj streams
+ // do not handle this and throw exceptions we can't ask for errno instead :(
+ // (we can't use `errno` either because kj may well have mangled it by now.)
+ if (res == 0 || (res == -1 && errno == EIO)) {
+ return std::string_view{};
+ }
+
+ KJ_NONBLOCKING_SYSCALL(res) {}
+
+ if (res > 0) {
+ return std::string_view{buffer.begin(), static_cast<size_t>(res)};
+ }
+
+ return observer.whenBecomesReadable().then([this, buffer] {
+ return read(buffer);
+ });
+ }
+};
+
+kj::Promise<Outcome<void, Goal::WorkResult>> DerivationGoal::handleBuilderOutput(InputStream & in) noexcept
+try {
+ auto buf = kj::heapArray<char>(4096);
+ while (true) {
+ auto data = co_await in.read(buf);
+ lastChildActivity = worker.aio.provider->getTimer().now();
+
+ if (data.empty()) {
+ co_return result::success();
+ }
- // local & `ssh://`-builds are dealt with here.
- if (fd == builderOutFD->get()) {
logSize += data.size();
if (settings.maxLogSize && logSize > settings.maxLogSize) {
- return tooMuchLogs();
+ co_return tooMuchLogs();
}
for (auto c : data)
@@ -1362,10 +1409,22 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data
}
if (logSink) (*logSink)(data);
- return StillAlive{};
}
+} catch (...) {
+ co_return std::current_exception();
+}
+
+kj::Promise<Outcome<void, Goal::WorkResult>> DerivationGoal::handleHookOutput(InputStream & in) noexcept
+try {
+ auto buf = kj::heapArray<char>(4096);
+ while (true) {
+ auto data = co_await in.read(buf);
+ lastChildActivity = worker.aio.provider->getTimer().now();
+
+ if (data.empty()) {
+ co_return result::success();
+ }
- if (hook && fd == hook->fromHook.get()) {
for (auto c : data)
if (c == '\n') {
auto json = parseJSONMessage(currentHookLine);
@@ -1381,7 +1440,7 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data
(fields.size() > 0 ? fields[0].get<std::string>() : "") + "\n";
logSize += logLine.size();
if (settings.maxLogSize && logSize > settings.maxLogSize) {
- return tooMuchLogs();
+ co_return tooMuchLogs();
}
(*logSink)(logLine);
} else if (type == resSetPhase && ! fields.is_null()) {
@@ -1405,16 +1464,83 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data
} else
currentHookLine += c;
}
-
- return StillAlive{};
+} catch (...) {
+ co_return std::current_exception();
}
+kj::Promise<Outcome<void, Goal::WorkResult>> DerivationGoal::handleChildOutput() noexcept
+try {
+ assert(builderOutFD);
+
+ auto builderIn = kj::heap<InputStream>(worker.aio.unixEventPort, builderOutFD->get());
+ kj::Own<InputStream> hookIn;
+ if (hook) {
+ hookIn = kj::heap<InputStream>(worker.aio.unixEventPort, hook->fromHook.get());
+ }
-void DerivationGoal::handleEOF(int fd)
+ auto handlers = handleChildStreams(*builderIn, hookIn.get()).attach(std::move(builderIn), std::move(hookIn));
+
+ if (respectsTimeouts() && settings.buildTimeout != 0) {
+ handlers = handlers.exclusiveJoin(
+ worker.aio.provider->getTimer()
+ .afterDelay(settings.buildTimeout.get() * kj::SECONDS)
+ .then([this]() -> Outcome<void, WorkResult> {
+ return timedOut(
+ Error("%1% timed out after %2% seconds", name, settings.buildTimeout)
+ );
+ })
+ );
+ }
+
+ return handlers.then([this](auto r) -> Outcome<void, WorkResult> {
+ if (!currentLogLine.empty()) flushLine();
+ return r;
+ });
+} catch (...) {
+ return {std::current_exception()};
+}
+
+kj::Promise<Outcome<void, Goal::WorkResult>> DerivationGoal::monitorForSilence() noexcept
{
- if (!currentLogLine.empty()) flushLine();
+ while (true) {
+ const auto stash = lastChildActivity;
+ auto waitUntil = lastChildActivity + settings.maxSilentTime.get() * kj::SECONDS;
+ co_await worker.aio.provider->getTimer().atTime(waitUntil);
+ if (lastChildActivity == stash) {
+ co_return timedOut(
+ Error("%1% timed out after %2% seconds of silence", name, settings.maxSilentTime)
+ );
+ }
+ }
}
+kj::Promise<Outcome<void, Goal::WorkResult>>
+DerivationGoal::handleChildStreams(InputStream & builderIn, InputStream * hookIn) noexcept
+{
+ lastChildActivity = worker.aio.provider->getTimer().now();
+
+ auto handlers = kj::joinPromisesFailFast([&] {
+ kj::Vector<kj::Promise<Outcome<void, WorkResult>>> parts{2};
+
+ parts.add(handleBuilderOutput(builderIn));
+ if (hookIn) {
+ parts.add(handleHookOutput(*hookIn));
+ }
+
+ return parts.releaseAsArray();
+ }());
+
+ if (respectsTimeouts() && settings.maxSilentTime != 0) {
+ handlers = handlers.exclusiveJoin(monitorForSilence().then([](auto r) {
+ return kj::arr(std::move(r));
+ }));
+ }
+
+ for (auto r : co_await handlers) {
+ BOOST_OUTCOME_CO_TRYV(r);
+ }
+ co_return result::success();
+}
void DerivationGoal::flushLine()
{
@@ -1555,11 +1681,13 @@ SingleDrvOutputs DerivationGoal::assertPathValidity()
}
-Goal::Finished DerivationGoal::done(
+Goal::WorkResult DerivationGoal::done(
BuildResult::Status status,
SingleDrvOutputs builtOutputs,
std::optional<Error> ex)
{
+ isDone = true;
+
outputLocks.unlock();
buildResult.status = status;
if (ex)
@@ -1590,7 +1718,7 @@ Goal::Finished DerivationGoal::done(
logError(ex->info());
}
- return Finished{
+ return WorkResult{
.exitCode = buildResult.success() ? ecSuccess : ecFailed,
.result = buildResult,
.ex = ex ? std::make_shared<Error>(std::move(*ex)) : nullptr,
@@ -1629,5 +1757,4 @@ void DerivationGoal::waiteeDone(GoalPtr waitee)
}
}
}
-
}
diff --git a/src/libstore/build/derivation-goal.hh b/src/libstore/build/derivation-goal.hh
index 020388d5a..6dd58afd2 100644
--- a/src/libstore/build/derivation-goal.hh
+++ b/src/libstore/build/derivation-goal.hh
@@ -8,6 +8,7 @@
#include "store-api.hh"
#include "pathlocks.hh"
#include "goal.hh"
+#include <kj/time.h>
namespace nix {
@@ -17,7 +18,7 @@ struct HookInstance;
struct HookReplyBase {
struct [[nodiscard]] Accept {
- std::set<int> fds;
+ kj::Promise<Outcome<void, Goal::WorkResult>> promise;
};
struct [[nodiscard]] Decline {};
struct [[nodiscard]] Postpone {};
@@ -62,7 +63,7 @@ struct InitialOutputStatus {
struct InitialOutput {
bool wanted;
Hash outputHash;
- std::optional<InitialOutputStatus> known;
+ std::optional<InitialOutputStatus> known = {};
};
/**
@@ -70,6 +71,14 @@ struct InitialOutput {
*/
struct DerivationGoal : public Goal
{
+ struct InputStream;
+
+ /**
+ * Whether this goal has completed. Completed goals can not be
+ * asked for more outputs, a new goal must be created instead.
+ */
+ bool isDone = false;
+
/**
* Whether to use an on-disk .drv file.
*/
@@ -176,6 +185,11 @@ struct DerivationGoal : public Goal
std::map<std::string, InitialOutput> initialOutputs;
/**
+ * Build result.
+ */
+ BuildResult buildResult;
+
+ /**
* File descriptor for the log file.
*/
AutoCloseFD fdLogFile;
@@ -213,9 +227,6 @@ struct DerivationGoal : public Goal
*/
std::optional<DerivationType> derivationType;
- typedef kj::Promise<Result<WorkResult>> (DerivationGoal::*GoalState)(bool inBuildSlot) noexcept;
- GoalState state;
-
BuildMode buildMode;
NotifyingCounter<uint64_t>::Bump mcExpectedBuilds, mcRunningBuilds;
@@ -242,37 +253,35 @@ struct DerivationGoal : public Goal
BuildMode buildMode = bmNormal);
virtual ~DerivationGoal() noexcept(false);
- Finished timedOut(Error && ex) override;
-
- std::string key() override;
+ WorkResult timedOut(Error && ex);
- kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept override;
+ kj::Promise<Result<WorkResult>> workImpl() noexcept override;
/**
* Add wanted outputs to an already existing derivation goal.
*/
- void addWantedOutputs(const OutputsSpec & outputs);
+ bool addWantedOutputs(const OutputsSpec & outputs);
/**
* The states.
*/
- kj::Promise<Result<WorkResult>> getDerivation(bool inBuildSlot) noexcept;
- kj::Promise<Result<WorkResult>> loadDerivation(bool inBuildSlot) noexcept;
- kj::Promise<Result<WorkResult>> haveDerivation(bool inBuildSlot) noexcept;
- kj::Promise<Result<WorkResult>> outputsSubstitutionTried(bool inBuildSlot) noexcept;
- kj::Promise<Result<WorkResult>> gaveUpOnSubstitution(bool inBuildSlot) noexcept;
- kj::Promise<Result<WorkResult>> closureRepaired(bool inBuildSlot) noexcept;
- kj::Promise<Result<WorkResult>> inputsRealised(bool inBuildSlot) noexcept;
- kj::Promise<Result<WorkResult>> tryToBuild(bool inBuildSlot) noexcept;
- virtual kj::Promise<Result<WorkResult>> tryLocalBuild(bool inBuildSlot) noexcept;
- kj::Promise<Result<WorkResult>> buildDone(bool inBuildSlot) noexcept;
+ kj::Promise<Result<WorkResult>> getDerivation() noexcept;
+ kj::Promise<Result<WorkResult>> loadDerivation() noexcept;
+ kj::Promise<Result<WorkResult>> haveDerivation() noexcept;
+ kj::Promise<Result<WorkResult>> outputsSubstitutionTried() noexcept;
+ kj::Promise<Result<WorkResult>> gaveUpOnSubstitution() noexcept;
+ kj::Promise<Result<WorkResult>> closureRepaired() noexcept;
+ kj::Promise<Result<WorkResult>> inputsRealised() noexcept;
+ kj::Promise<Result<WorkResult>> tryToBuild() noexcept;
+ virtual kj::Promise<Result<WorkResult>> tryLocalBuild() noexcept;
+ kj::Promise<Result<WorkResult>> buildDone() noexcept;
- kj::Promise<Result<WorkResult>> resolvedFinished(bool inBuildSlot) noexcept;
+ kj::Promise<Result<WorkResult>> resolvedFinished() noexcept;
/**
* Is the build hook willing to perform the build?
*/
- HookReply tryBuildHook(bool inBuildSlot);
+ HookReply tryBuildHook();
virtual int getChildStatus();
@@ -312,13 +321,19 @@ struct DerivationGoal : public Goal
virtual void cleanupPostOutputsRegisteredModeCheck();
virtual void cleanupPostOutputsRegisteredModeNonCheck();
- /**
- * Callback used by the worker to write to the log.
- */
- WorkResult handleChildOutput(int fd, std::string_view data) override;
- void handleEOF(int fd) override;
+protected:
+ kj::TimePoint lastChildActivity = kj::minValue;
+
+ kj::Promise<Outcome<void, WorkResult>> handleChildOutput() noexcept;
+ kj::Promise<Outcome<void, WorkResult>>
+ handleChildStreams(InputStream & builderIn, InputStream * hookIn) noexcept;
+ kj::Promise<Outcome<void, WorkResult>> handleBuilderOutput(InputStream & in) noexcept;
+ kj::Promise<Outcome<void, WorkResult>> handleHookOutput(InputStream & in) noexcept;
+ kj::Promise<Outcome<void, WorkResult>> monitorForSilence() noexcept;
+ WorkResult tooMuchLogs();
void flushLine();
+public:
/**
* Wrappers around the corresponding Store methods that first consult the
* derivation. This is currently needed because when there is no drv file
@@ -350,13 +365,18 @@ struct DerivationGoal : public Goal
void started();
- Finished done(
+ WorkResult done(
BuildResult::Status status,
SingleDrvOutputs builtOutputs = {},
std::optional<Error> ex = {});
void waiteeDone(GoalPtr waitee) override;
+ virtual bool respectsTimeouts()
+ {
+ return false;
+ }
+
StorePathSet exportReferences(const StorePathSet & storePaths);
JobCategory jobCategory() const override {
diff --git a/src/libstore/build/drv-output-substitution-goal.cc b/src/libstore/build/drv-output-substitution-goal.cc
index 7986123cc..f04beb884 100644
--- a/src/libstore/build/drv-output-substitution-goal.cc
+++ b/src/libstore/build/drv-output-substitution-goal.cc
@@ -4,6 +4,9 @@
#include "worker.hh"
#include "substitution-goal.hh"
#include "signals.hh"
+#include <kj/array.h>
+#include <kj/async.h>
+#include <kj/vector.h>
namespace nix {
@@ -16,33 +19,32 @@ DrvOutputSubstitutionGoal::DrvOutputSubstitutionGoal(
: Goal(worker, isDependency)
, id(id)
{
- state = &DrvOutputSubstitutionGoal::init;
name = fmt("substitution of '%s'", id.to_string());
trace("created");
}
-kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::init(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::workImpl() noexcept
try {
trace("init");
/* If the derivation already exists, we’re done */
if (worker.store.queryRealisation(id)) {
- return {Finished{ecSuccess, std::move(buildResult)}};
+ co_return WorkResult{ecSuccess};
}
subs = settings.useSubstitutes ? getDefaultSubstituters() : std::list<ref<Store>>();
- return tryNext(inBuildSlot);
+ co_return co_await tryNext();
} catch (...) {
- return {std::current_exception()};
+ co_return result::failure(std::current_exception());
}
-kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::tryNext(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::tryNext() noexcept
try {
trace("trying next substituter");
- if (!inBuildSlot) {
- return {WaitForSlot{}};
+ if (!slotToken.valid()) {
+ slotToken = co_await worker.substitutions.acquire();
}
maintainRunningSubstitutions = worker.runningSubstitutions.addTemporarily(1);
@@ -59,7 +61,7 @@ try {
/* Hack: don't indicate failure if there were no substituters.
In that case the calling derivation should just do a
build. */
- return {Finished{substituterFailed ? ecFailed : ecNoSubstituters, std::move(buildResult)}};
+ co_return WorkResult{substituterFailed ? ecFailed : ecNoSubstituters};
}
sub = subs.front();
@@ -69,25 +71,26 @@ try {
some other error occurs), so it must not touch `this`. So put
the shared state in a separate refcounted object. */
downloadState = std::make_shared<DownloadState>();
- downloadState->outPipe.create();
+ auto pipe = kj::newPromiseAndCrossThreadFulfiller<void>();
+ downloadState->outPipe = kj::mv(pipe.fulfiller);
downloadState->result =
std::async(std::launch::async, [downloadState{downloadState}, id{id}, sub{sub}] {
+ Finally updateStats([&]() { downloadState->outPipe->fulfill(); });
ReceiveInterrupts receiveInterrupts;
- Finally updateStats([&]() { downloadState->outPipe.writeSide.close(); });
return sub->queryRealisation(id);
});
- state = &DrvOutputSubstitutionGoal::realisationFetched;
- return {WaitForWorld{{downloadState->outPipe.readSide.get()}, true}};
+ co_await pipe.promise;
+ co_return co_await realisationFetched();
} catch (...) {
- return {std::current_exception()};
+ co_return result::failure(std::current_exception());
}
-kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::realisationFetched(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::realisationFetched() noexcept
try {
- worker.childTerminated(this);
maintainRunningSubstitutions.reset();
+ slotToken = {};
try {
outputInfo = downloadState->result.get();
@@ -97,10 +100,10 @@ try {
}
if (!outputInfo) {
- return tryNext(inBuildSlot);
+ co_return co_await tryNext();
}
- WaitForGoals result;
+ kj::Vector<std::pair<GoalPtr, kj::Promise<Result<WorkResult>>>> dependencies;
for (const auto & [depId, depPath] : outputInfo->dependentRealisations) {
if (depId != id) {
if (auto localOutputInfo = worker.store.queryRealisation(depId);
@@ -114,34 +117,31 @@ try {
worker.store.printStorePath(localOutputInfo->outPath),
worker.store.printStorePath(depPath)
);
- return tryNext(inBuildSlot);
+ co_return co_await tryNext();
}
- result.goals.insert(worker.goalFactory().makeDrvOutputSubstitutionGoal(depId));
+ dependencies.add(worker.goalFactory().makeDrvOutputSubstitutionGoal(depId));
}
}
- result.goals.insert(worker.goalFactory().makePathSubstitutionGoal(outputInfo->outPath));
+ dependencies.add(worker.goalFactory().makePathSubstitutionGoal(outputInfo->outPath));
- if (result.goals.empty()) {
- return outPathValid(inBuildSlot);
- } else {
- state = &DrvOutputSubstitutionGoal::outPathValid;
- return {std::move(result)};
+ if (!dependencies.empty()) {
+ (co_await waitForGoals(dependencies.releaseAsArray())).value();
}
+ co_return co_await outPathValid();
} catch (...) {
- return {std::current_exception()};
+ co_return result::failure(std::current_exception());
}
-kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::outPathValid(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::outPathValid() noexcept
try {
assert(outputInfo);
trace("output path substituted");
if (nrFailed > 0) {
debug("The output path of the derivation output '%s' could not be substituted", id.to_string());
- return {Finished{
+ return {WorkResult{
nrNoSubstituters > 0 || nrIncompleteClosure > 0 ? ecIncompleteClosure : ecFailed,
- std::move(buildResult),
}};
}
@@ -154,22 +154,9 @@ try {
kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::finished() noexcept
try {
trace("finished");
- return {Finished{ecSuccess, std::move(buildResult)}};
+ return {WorkResult{ecSuccess}};
} catch (...) {
return {std::current_exception()};
}
-std::string DrvOutputSubstitutionGoal::key()
-{
- /* "a$" ensures substitution goals happen before derivation
- goals. */
- return "a$" + std::string(id.to_string());
-}
-
-kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::work(bool inBuildSlot) noexcept
-{
- return (this->*state)(inBuildSlot);
-}
-
-
}
diff --git a/src/libstore/build/drv-output-substitution-goal.hh b/src/libstore/build/drv-output-substitution-goal.hh
index f33196665..f959e2a7b 100644
--- a/src/libstore/build/drv-output-substitution-goal.hh
+++ b/src/libstore/build/drv-output-substitution-goal.hh
@@ -45,7 +45,7 @@ class DrvOutputSubstitutionGoal : public Goal {
struct DownloadState
{
- Pipe outPipe;
+ kj::Own<kj::CrossThreadPromiseFulfiller<void>> outPipe;
std::future<std::shared_ptr<const Realisation>> result;
};
@@ -65,20 +65,12 @@ public:
std::optional<ContentAddress> ca = std::nullopt
);
- typedef kj::Promise<Result<WorkResult>> (DrvOutputSubstitutionGoal::*GoalState)(bool inBuildSlot) noexcept;
- GoalState state;
-
- kj::Promise<Result<WorkResult>> init(bool inBuildSlot) noexcept;
- kj::Promise<Result<WorkResult>> tryNext(bool inBuildSlot) noexcept;
- kj::Promise<Result<WorkResult>> realisationFetched(bool inBuildSlot) noexcept;
- kj::Promise<Result<WorkResult>> outPathValid(bool inBuildSlot) noexcept;
+ kj::Promise<Result<WorkResult>> tryNext() noexcept;
+ kj::Promise<Result<WorkResult>> realisationFetched() noexcept;
+ kj::Promise<Result<WorkResult>> outPathValid() noexcept;
kj::Promise<Result<WorkResult>> finished() noexcept;
- Finished timedOut(Error && ex) override { abort(); };
-
- std::string key() override;
-
- kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept override;
+ kj::Promise<Result<WorkResult>> workImpl() noexcept override;
JobCategory jobCategory() const override {
return JobCategory::Substitution;
diff --git a/src/libstore/build/entry-points.cc b/src/libstore/build/entry-points.cc
index a0f18a02c..808179a4d 100644
--- a/src/libstore/build/entry-points.cc
+++ b/src/libstore/build/entry-points.cc
@@ -17,22 +17,22 @@ void Store::buildPaths(const std::vector<DerivedPath> & reqs, BuildMode buildMod
Worker worker(*this, evalStore ? *evalStore : *this, aio);
auto goals = runWorker(worker, [&](GoalFactory & gf) {
- Goals goals;
+ Worker::Targets goals;
for (auto & br : reqs)
- goals.insert(gf.makeGoal(br, buildMode));
+ goals.emplace(gf.makeGoal(br, buildMode));
return goals;
});
StringSet failed;
std::shared_ptr<Error> ex;
- for (auto & i : goals) {
- if (i->ex) {
+ for (auto & [i, result] : goals) {
+ if (result.ex) {
if (ex)
- logError(i->ex->info());
+ logError(result.ex->info());
else
- ex = i->ex;
+ ex = result.ex;
}
- if (i->exitCode != Goal::ecSuccess) {
+ if (result.exitCode != Goal::ecSuccess) {
if (auto i2 = dynamic_cast<DerivationGoal *>(i.get()))
failed.insert(printStorePath(i2->drvPath));
else if (auto i2 = dynamic_cast<PathSubstitutionGoal *>(i.get()))
@@ -60,11 +60,11 @@ std::vector<KeyedBuildResult> Store::buildPathsWithResults(
std::vector<std::pair<const DerivedPath &, GoalPtr>> state;
auto goals = runWorker(worker, [&](GoalFactory & gf) {
- Goals goals;
+ Worker::Targets goals;
for (const auto & req : reqs) {
auto goal = gf.makeGoal(req, buildMode);
- goals.insert(goal);
- state.push_back({req, goal});
+ state.push_back({req, goal.first});
+ goals.emplace(std::move(goal));
}
return goals;
});
@@ -72,7 +72,7 @@ std::vector<KeyedBuildResult> Store::buildPathsWithResults(
std::vector<KeyedBuildResult> results;
for (auto & [req, goalPtr] : state)
- results.emplace_back(goalPtr->buildResult.restrictTo(req));
+ results.emplace_back(goals[goalPtr].result.restrictTo(req));
return results;
}
@@ -84,11 +84,13 @@ BuildResult Store::buildDerivation(const StorePath & drvPath, const BasicDerivat
Worker worker(*this, *this, aio);
try {
- auto goals = runWorker(worker, [&](GoalFactory & gf) -> Goals {
- return Goals{gf.makeBasicDerivationGoal(drvPath, drv, OutputsSpec::All{}, buildMode)};
+ auto goals = runWorker(worker, [&](GoalFactory & gf) {
+ Worker::Targets goals;
+ goals.emplace(gf.makeBasicDerivationGoal(drvPath, drv, OutputsSpec::All{}, buildMode));
+ return goals;
});
- auto goal = *goals.begin();
- return goal->buildResult.restrictTo(DerivedPath::Built {
+ auto [goal, result] = *goals.begin();
+ return result.result.restrictTo(DerivedPath::Built {
.drvPath = makeConstantStorePathRef(drvPath),
.outputs = OutputsSpec::All {},
});
@@ -110,14 +112,16 @@ void Store::ensurePath(const StorePath & path)
Worker worker(*this, *this, aio);
auto goals = runWorker(worker, [&](GoalFactory & gf) {
- return Goals{gf.makePathSubstitutionGoal(path)};
+ Worker::Targets goals;
+ goals.emplace(gf.makePathSubstitutionGoal(path));
+ return goals;
});
- auto goal = *goals.begin();
+ auto [goal, result] = *goals.begin();
- if (goal->exitCode != Goal::ecSuccess) {
- if (goal->ex) {
- goal->ex->withExitStatus(worker.failingExitStatus());
- throw std::move(*goal->ex);
+ if (result.exitCode != Goal::ecSuccess) {
+ if (result.ex) {
+ result.ex->withExitStatus(worker.failingExitStatus());
+ throw std::move(*result.ex);
} else
throw Error(worker.failingExitStatus(), "path '%s' does not exist and cannot be created", printStorePath(path));
}
@@ -130,24 +134,28 @@ void Store::repairPath(const StorePath & path)
Worker worker(*this, *this, aio);
auto goals = runWorker(worker, [&](GoalFactory & gf) {
- return Goals{gf.makePathSubstitutionGoal(path, Repair)};
+ Worker::Targets goals;
+ goals.emplace(gf.makePathSubstitutionGoal(path, Repair));
+ return goals;
});
- auto goal = *goals.begin();
+ auto [goal, result] = *goals.begin();
- if (goal->exitCode != Goal::ecSuccess) {
+ if (result.exitCode != Goal::ecSuccess) {
/* Since substituting the path didn't work, if we have a valid
deriver, then rebuild the deriver. */
auto info = queryPathInfo(path);
if (info->deriver && isValidPath(*info->deriver)) {
worker.run([&](GoalFactory & gf) {
- return Goals{gf.makeGoal(
+ Worker::Targets goals;
+ goals.emplace(gf.makeGoal(
DerivedPath::Built{
.drvPath = makeConstantStorePathRef(*info->deriver),
// FIXME: Should just build the specific output we need.
.outputs = OutputsSpec::All{},
},
bmRepair
- )};
+ ));
+ return goals;
});
} else
throw Error(worker.failingExitStatus(), "cannot repair path '%s'", printStorePath(path));
diff --git a/src/libstore/build/goal.cc b/src/libstore/build/goal.cc
index 82861ad2b..02b22b8ad 100644
--- a/src/libstore/build/goal.cc
+++ b/src/libstore/build/goal.cc
@@ -1,18 +1,73 @@
#include "goal.hh"
+#include "async-collect.hh"
+#include "worker.hh"
+#include <boost/outcome/try.hpp>
+#include <kj/time.h>
namespace nix {
-bool CompareGoalPtrs::operator() (const GoalPtr & a, const GoalPtr & b) const {
- std::string s1 = a->key();
- std::string s2 = b->key();
- return s1 < s2;
-}
-
-
void Goal::trace(std::string_view s)
{
debug("%1%: %2%", name, s);
}
+kj::Promise<void> Goal::waitForAWhile()
+{
+ trace("wait for a while");
+ /* If we are polling goals that are waiting for a lock, then wake
+ up after a few seconds at most. */
+ return worker.aio.provider->getTimer().afterDelay(settings.pollInterval.get() * kj::SECONDS);
+}
+
+kj::Promise<Result<Goal::WorkResult>> Goal::work() noexcept
+try {
+ BOOST_OUTCOME_CO_TRY(auto result, co_await workImpl());
+
+ trace("done");
+
+ cleanup();
+
+ co_return std::move(result);
+} catch (...) {
+ co_return result::failure(std::current_exception());
+}
+
+kj::Promise<Result<void>>
+Goal::waitForGoals(kj::Array<std::pair<GoalPtr, kj::Promise<Result<WorkResult>>>> dependencies) noexcept
+try {
+ auto left = dependencies.size();
+ for (auto & [dep, p] : dependencies) {
+ p = p.then([this, dep, &left](auto _result) -> Result<WorkResult> {
+ BOOST_OUTCOME_TRY(auto result, _result);
+
+ left--;
+ trace(fmt("waitee '%s' done; %d left", dep->name, left));
+
+ if (result.exitCode != Goal::ecSuccess) ++nrFailed;
+ if (result.exitCode == Goal::ecNoSubstituters) ++nrNoSubstituters;
+ if (result.exitCode == Goal::ecIncompleteClosure) ++nrIncompleteClosure;
+
+ return std::move(result);
+ }).eagerlyEvaluate(nullptr);
+ }
+
+ auto collectDeps = asyncCollect(std::move(dependencies));
+
+ while (auto item = co_await collectDeps.next()) {
+ auto & [dep, _result] = *item;
+ BOOST_OUTCOME_CO_TRY(auto result, _result);
+
+ waiteeDone(dep);
+
+ if (result.exitCode == ecFailed && !settings.keepGoing) {
+ co_return result::success();
+ }
+ }
+
+ co_return result::success();
+} catch (...) {
+ co_return result::failure(std::current_exception());
+}
+
}
diff --git a/src/libstore/build/goal.hh b/src/libstore/build/goal.hh
index 189505308..29540dcd3 100644
--- a/src/libstore/build/goal.hh
+++ b/src/libstore/build/goal.hh
@@ -1,10 +1,12 @@
#pragma once
///@file
+#include "async-semaphore.hh"
#include "result.hh"
#include "types.hh"
#include "store-api.hh"
#include "build-result.hh"
+#include <concepts> // IWYU pragma: keep
#include <kj/async.h>
namespace nix {
@@ -19,22 +21,11 @@ class Worker;
* A pointer to a goal.
*/
typedef std::shared_ptr<Goal> GoalPtr;
-typedef std::weak_ptr<Goal> WeakGoalPtr;
-
-struct CompareGoalPtrs {
- bool operator() (const GoalPtr & a, const GoalPtr & b) const;
-};
/**
* Set of goals.
*/
-typedef std::set<GoalPtr, CompareGoalPtrs> Goals;
-typedef std::set<WeakGoalPtr, std::owner_less<WeakGoalPtr>> WeakGoals;
-
-/**
- * A map of paths to goals (and the other way around).
- */
-typedef std::map<StorePath, WeakGoalPtr> WeakGoalMap;
+typedef std::set<GoalPtr> Goals;
/**
* Used as a hint to the worker on how to schedule a particular goal. For example,
@@ -70,17 +61,6 @@ struct Goal
const bool isDependency;
/**
- * Goals that this goal is waiting for.
- */
- Goals waitees;
-
- /**
- * Goals waiting for this one to finish. Must use weak pointers
- * here to prevent cycles.
- */
- WeakGoals waiters;
-
- /**
* Number of goals we are/were waiting for that have failed.
*/
size_t nrFailed = 0;
@@ -102,57 +82,37 @@ struct Goal
*/
std::string name;
- /**
- * Whether the goal is finished.
- */
- std::optional<ExitCode> exitCode;
-
- /**
- * Build result.
- */
- BuildResult buildResult;
+protected:
+ AsyncSemaphore::Token slotToken;
public:
-
- struct [[nodiscard]] StillAlive {};
- struct [[nodiscard]] WaitForSlot {};
- struct [[nodiscard]] WaitForAWhile {};
- struct [[nodiscard]] ContinueImmediately {};
- struct [[nodiscard]] WaitForGoals {
- Goals goals;
- };
- struct [[nodiscard]] WaitForWorld {
- std::set<int> fds;
- bool inBuildSlot;
- };
- struct [[nodiscard]] Finished {
+ struct [[nodiscard]] WorkResult {
ExitCode exitCode;
- BuildResult result;
- std::shared_ptr<Error> ex;
+ BuildResult result = {};
+ std::shared_ptr<Error> ex = {};
bool permanentFailure = false;
bool timedOut = false;
bool hashMismatch = false;
bool checkMismatch = false;
};
- struct [[nodiscard]] WorkResult : std::variant<
- StillAlive,
- WaitForSlot,
- WaitForAWhile,
- ContinueImmediately,
- WaitForGoals,
- WaitForWorld,
- Finished>
+protected:
+ kj::Promise<void> waitForAWhile();
+ kj::Promise<Result<void>>
+ waitForGoals(kj::Array<std::pair<GoalPtr, kj::Promise<Result<WorkResult>>>> dependencies) noexcept;
+
+ template<std::derived_from<Goal>... G>
+ kj::Promise<Result<void>>
+ waitForGoals(std::pair<std::shared_ptr<G>, kj::Promise<Result<WorkResult>>>... goals) noexcept
{
- WorkResult() = delete;
- using variant::variant;
- };
+ return waitForGoals(
+ kj::arrOf<std::pair<GoalPtr, kj::Promise<Result<WorkResult>>>>(std::move(goals)...)
+ );
+ }
- /**
- * Exception containing an error message, if any.
- */
- std::shared_ptr<Error> ex;
+ virtual kj::Promise<Result<WorkResult>> workImpl() noexcept = 0;
+public:
explicit Goal(Worker & worker, bool isDependency)
: worker(worker)
, isDependency(isDependency)
@@ -163,24 +123,10 @@ public:
trace("goal destroyed");
}
- virtual kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept = 0;
+ kj::Promise<Result<WorkResult>> work() noexcept;
virtual void waiteeDone(GoalPtr waitee) { }
- virtual WorkResult handleChildOutput(int fd, std::string_view data)
- {
- abort();
- }
-
- virtual void handleEOF(int fd)
- {
- }
-
- virtual bool respectsTimeouts()
- {
- return false;
- }
-
void trace(std::string_view s);
std::string getName() const
@@ -188,15 +134,6 @@ public:
return name;
}
- /**
- * Callback in case of a timeout. It should wake up its waiters,
- * get rid of any running child processes that are being monitored
- * by the worker (important!), etc.
- */
- virtual Finished timedOut(Error && ex) = 0;
-
- virtual std::string key() = 0;
-
virtual void cleanup() { }
/**
diff --git a/src/libstore/build/hook-instance.cc b/src/libstore/build/hook-instance.cc
index f91a904cc..521f34917 100644
--- a/src/libstore/build/hook-instance.cc
+++ b/src/libstore/build/hook-instance.cc
@@ -1,4 +1,5 @@
#include "child.hh"
+#include "error.hh"
#include "file-system.hh"
#include "globals.hh"
#include "hook-instance.hh"
@@ -86,7 +87,7 @@ HookInstance::~HookInstance()
toHook.reset();
if (pid) pid.kill();
} catch (...) {
- ignoreException();
+ ignoreExceptionInDestructor();
}
}
diff --git a/src/libstore/build/local-derivation-goal.cc b/src/libstore/build/local-derivation-goal.cc
index 4baa525d9..c8c68f99f 100644
--- a/src/libstore/build/local-derivation-goal.cc
+++ b/src/libstore/build/local-derivation-goal.cc
@@ -1,4 +1,5 @@
#include "local-derivation-goal.hh"
+#include "error.hh"
#include "indirect-root-store.hh"
#include "machines.hh"
#include "store-api.hh"
@@ -98,9 +99,9 @@ LocalDerivationGoal::~LocalDerivationGoal() noexcept(false)
{
/* Careful: we should never ever throw an exception from a
destructor. */
- try { deleteTmpDir(false); } catch (...) { ignoreException(); }
- try { killChild(); } catch (...) { ignoreException(); }
- try { stopDaemon(); } catch (...) { ignoreException(); }
+ try { deleteTmpDir(false); } catch (...) { ignoreExceptionInDestructor(); }
+ try { killChild(); } catch (...) { ignoreExceptionInDestructor(); }
+ try { stopDaemon(); } catch (...) { ignoreExceptionInDestructor(); }
}
@@ -121,8 +122,6 @@ LocalStore & LocalDerivationGoal::getLocalStore()
void LocalDerivationGoal::killChild()
{
if (pid) {
- worker.childTerminated(this);
-
/* If we're using a build user, then there is a tricky race
condition: if we kill the build user before the child has
done its setuid() to the build user uid, then it won't be
@@ -149,17 +148,18 @@ void LocalDerivationGoal::killSandbox(bool getStats)
}
-kj::Promise<Result<Goal::WorkResult>> LocalDerivationGoal::tryLocalBuild(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> LocalDerivationGoal::tryLocalBuild() noexcept
try {
+retry:
#if __APPLE__
additionalSandboxProfile = parsedDrv->getStringAttr("__sandboxProfile").value_or("");
#endif
- if (!inBuildSlot) {
- state = &DerivationGoal::tryToBuild;
+ if (!slotToken.valid()) {
outputLocks.unlock();
- if (0U != settings.maxBuildJobs) {
- return {WaitForSlot{}};
+ if (worker.localBuilds.capacity() > 0) {
+ slotToken = co_await worker.localBuilds.acquire();
+ co_return co_await tryToBuild();
}
if (getMachines().empty()) {
throw Error(
@@ -214,7 +214,9 @@ try {
if (!actLock)
actLock = std::make_unique<Activity>(*logger, lvlWarn, actBuildWaiting,
fmt("waiting for a free build user ID for '%s'", Magenta(worker.store.printStorePath(drvPath))));
- return {WaitForAWhile{}};
+ co_await waitForAWhile();
+ // we can loop very often, and `co_return co_await` always allocates a new frame
+ goto retry;
}
}
@@ -243,24 +245,29 @@ try {
try {
/* Okay, we have to build. */
- auto fds = startBuilder();
-
- /* This state will be reached when we get EOF on the child's
- log pipe. */
- state = &DerivationGoal::buildDone;
+ auto promise = startBuilder();
started();
- return {WaitForWorld{std::move(fds), true}};
+ auto r = co_await promise;
+ if (r.has_value()) {
+ // all good so far
+ } else if (r.has_error()) {
+ co_return r.assume_error();
+ } else {
+ co_return r.assume_exception();
+ }
} catch (BuildError & e) {
outputLocks.unlock();
buildUser.reset();
auto report = done(BuildResult::InputRejected, {}, std::move(e));
report.permanentFailure = true;
- return {std::move(report)};
+ co_return report;
}
+
+ co_return co_await buildDone();
} catch (...) {
- return {std::current_exception()};
+ co_return result::failure(std::current_exception());
}
@@ -390,7 +397,9 @@ void LocalDerivationGoal::cleanupPostOutputsRegisteredModeNonCheck()
cleanupPostOutputsRegisteredModeCheck();
}
-std::set<int> LocalDerivationGoal::startBuilder()
+// NOTE this one isn't noexcept because it's called from places that expect
+// exceptions to signal failure to launch. we should change this some time.
+kj::Promise<Outcome<void, Goal::WorkResult>> LocalDerivationGoal::startBuilder()
{
if ((buildUser && buildUser->getUIDCount() != 1)
#if __linux__
@@ -779,7 +788,7 @@ std::set<int> LocalDerivationGoal::startBuilder()
msgs.push_back(std::move(msg));
}
- return {builderOutPTY.get()};
+ return handleChildOutput();
}
@@ -1241,7 +1250,7 @@ void LocalDerivationGoal::startDaemon()
NotTrusted, daemon::Recursive);
debug("terminated daemon connection");
} catch (SysError &) {
- ignoreException();
+ ignoreExceptionExceptInterrupt();
}
});
@@ -1361,13 +1370,20 @@ void LocalDerivationGoal::runChild()
bool setUser = true;
- /* Make the contents of netrc available to builtin:fetchurl
- (which may run under a different uid and/or in a sandbox). */
+ /* Make the contents of netrc and the CA certificate bundle
+ available to builtin:fetchurl (which may run under a
+ different uid and/or in a sandbox). */
std::string netrcData;
- try {
- if (drv->isBuiltin() && drv->builder == "builtin:fetchurl" && !derivationType->isSandboxed())
+ std::string caFileData;
+ if (drv->isBuiltin() && drv->builder == "builtin:fetchurl" && !derivationType->isSandboxed()) {
+ try {
netrcData = readFile(settings.netrcFile);
- } catch (SysError &) { }
+ } catch (SysError &) { }
+
+ try {
+ caFileData = readFile(settings.caFile);
+ } catch (SysError &) { }
+ }
#if __linux__
if (useChroot) {
@@ -1802,7 +1818,7 @@ void LocalDerivationGoal::runChild()
e.second = rewriteStrings(e.second, inputRewrites);
if (drv->builder == "builtin:fetchurl")
- builtinFetchurl(drv2, netrcData);
+ builtinFetchurl(drv2, netrcData, caFileData);
else if (drv->builder == "builtin:buildenv")
builtinBuildenv(drv2);
else if (drv->builder == "builtin:unpack-channel")
diff --git a/src/libstore/build/local-derivation-goal.hh b/src/libstore/build/local-derivation-goal.hh
index cd040bc15..cd6ea2b55 100644
--- a/src/libstore/build/local-derivation-goal.hh
+++ b/src/libstore/build/local-derivation-goal.hh
@@ -182,7 +182,7 @@ struct LocalDerivationGoal : public DerivationGoal
* Create a LocalDerivationGoal without an on-disk .drv file,
* possibly a platform-specific subclass
*/
- static std::shared_ptr<LocalDerivationGoal> makeLocalDerivationGoal(
+ static std::unique_ptr<LocalDerivationGoal> makeLocalDerivationGoal(
const StorePath & drvPath,
const OutputsSpec & wantedOutputs,
Worker & worker,
@@ -194,7 +194,7 @@ struct LocalDerivationGoal : public DerivationGoal
* Create a LocalDerivationGoal for an on-disk .drv file,
* possibly a platform-specific subclass
*/
- static std::shared_ptr<LocalDerivationGoal> makeLocalDerivationGoal(
+ static std::unique_ptr<LocalDerivationGoal> makeLocalDerivationGoal(
const StorePath & drvPath,
const BasicDerivation & drv,
const OutputsSpec & wantedOutputs,
@@ -213,12 +213,12 @@ struct LocalDerivationGoal : public DerivationGoal
/**
* The additional states.
*/
- kj::Promise<Result<WorkResult>> tryLocalBuild(bool inBuildSlot) noexcept override;
+ kj::Promise<Result<WorkResult>> tryLocalBuild() noexcept override;
/**
* Start building a derivation.
*/
- std::set<int> startBuilder();
+ kj::Promise<Outcome<void, WorkResult>> startBuilder();
/**
* Fill in the environment for the builder.
diff --git a/src/libstore/build/substitution-goal.cc b/src/libstore/build/substitution-goal.cc
index bd0ffcb9b..e0ca23a86 100644
--- a/src/libstore/build/substitution-goal.cc
+++ b/src/libstore/build/substitution-goal.cc
@@ -3,6 +3,8 @@
#include "nar-info.hh"
#include "signals.hh"
#include "finally.hh"
+#include <kj/array.h>
+#include <kj/vector.h>
namespace nix {
@@ -18,7 +20,6 @@ PathSubstitutionGoal::PathSubstitutionGoal(
, repair(repair)
, ca(ca)
{
- state = &PathSubstitutionGoal::init;
name = fmt("substitution of '%s'", worker.store.printStorePath(this->storePath));
trace("created");
maintainExpectedSubstitutions = worker.expectedSubstitutions.addTemporarily(1);
@@ -31,27 +32,21 @@ PathSubstitutionGoal::~PathSubstitutionGoal()
}
-Goal::Finished PathSubstitutionGoal::done(
+Goal::WorkResult PathSubstitutionGoal::done(
ExitCode result,
BuildResult::Status status,
std::optional<std::string> errorMsg)
{
- buildResult.status = status;
+ BuildResult buildResult{.status = status};
if (errorMsg) {
debug(*errorMsg);
buildResult.errorMsg = *errorMsg;
}
- return Finished{result, std::move(buildResult)};
+ return WorkResult{result, std::move(buildResult)};
}
-kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::work(bool inBuildSlot) noexcept
-{
- return (this->*state)(inBuildSlot);
-}
-
-
-kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::init(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::workImpl() noexcept
try {
trace("init");
@@ -67,13 +62,13 @@ try {
subs = settings.useSubstitutes ? getDefaultSubstituters() : std::list<ref<Store>>();
- return tryNext(inBuildSlot);
+ return tryNext();
} catch (...) {
return {std::current_exception()};
}
-kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::tryNext(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::tryNext() noexcept
try {
trace("trying next substituter");
@@ -89,10 +84,10 @@ try {
/* Hack: don't indicate failure if there were no substituters.
In that case the calling derivation should just do a
build. */
- return {done(
+ co_return done(
substituterFailed ? ecFailed : ecNoSubstituters,
BuildResult::NoSubstituters,
- fmt("path '%s' is required, but there is no substituter that can build it", worker.store.printStorePath(storePath)))};
+ fmt("path '%s' is required, but there is no substituter that can build it", worker.store.printStorePath(storePath)));
}
sub = subs.front();
@@ -105,26 +100,28 @@ try {
if (sub->storeDir == worker.store.storeDir)
assert(subPath == storePath);
} else if (sub->storeDir != worker.store.storeDir) {
- return tryNext(inBuildSlot);
+ co_return co_await tryNext();
}
- try {
- // FIXME: make async
- info = sub->queryPathInfo(subPath ? *subPath : storePath);
- } catch (InvalidPath &) {
- return tryNext(inBuildSlot);
- } catch (SubstituterDisabled &) {
- if (settings.tryFallback) {
- return tryNext(inBuildSlot);
- }
- throw;
- } catch (Error & e) {
- if (settings.tryFallback) {
- logError(e.info());
- return tryNext(inBuildSlot);
+ do {
+ try {
+ // FIXME: make async
+ info = sub->queryPathInfo(subPath ? *subPath : storePath);
+ break;
+ } catch (InvalidPath &) {
+ } catch (SubstituterDisabled &) {
+ if (!settings.tryFallback) {
+ throw;
+ }
+ } catch (Error & e) {
+ if (settings.tryFallback) {
+ logError(e.info());
+ } else {
+ throw;
+ }
}
- throw;
- }
+ co_return co_await tryNext();
+ } while (false);
if (info->path != storePath) {
if (info->isContentAddressed(*sub) && info->references.empty()) {
@@ -134,7 +131,7 @@ try {
} else {
printError("asked '%s' for '%s' but got '%s'",
sub->getUri(), worker.store.printStorePath(storePath), sub->printStorePath(info->path));
- return tryNext(inBuildSlot);
+ co_return co_await tryNext();
}
}
@@ -155,28 +152,26 @@ try {
{
warn("ignoring substitute for '%s' from '%s', as it's not signed by any of the keys in 'trusted-public-keys'",
worker.store.printStorePath(storePath), sub->getUri());
- return tryNext(inBuildSlot);
+ co_return co_await tryNext();
}
/* To maintain the closure invariant, we first have to realise the
paths referenced by this one. */
- WaitForGoals result;
+ kj::Vector<std::pair<GoalPtr, kj::Promise<Result<WorkResult>>>> dependencies;
for (auto & i : info->references)
if (i != storePath) /* ignore self-references */
- result.goals.insert(worker.goalFactory().makePathSubstitutionGoal(i));
+ dependencies.add(worker.goalFactory().makePathSubstitutionGoal(i));
- if (result.goals.empty()) {/* to prevent hang (no wake-up event) */
- return referencesValid(inBuildSlot);
- } else {
- state = &PathSubstitutionGoal::referencesValid;
- return {std::move(result)};
+ if (!dependencies.empty()) {/* to prevent hang (no wake-up event) */
+ (co_await waitForGoals(dependencies.releaseAsArray())).value();
}
+ co_return co_await referencesValid();
} catch (...) {
- return {std::current_exception()};
+ co_return result::failure(std::current_exception());
}
-kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::referencesValid(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::referencesValid() noexcept
try {
trace("all references realised");
@@ -191,33 +186,33 @@ try {
if (i != storePath) /* ignore self-references */
assert(worker.store.isValidPath(i));
- state = &PathSubstitutionGoal::tryToRun;
- return tryToRun(inBuildSlot);
+ return tryToRun();
} catch (...) {
return {std::current_exception()};
}
-kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::tryToRun(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::tryToRun() noexcept
try {
trace("trying to run");
- if (!inBuildSlot) {
- return {WaitForSlot{}};
+ if (!slotToken.valid()) {
+ slotToken = co_await worker.substitutions.acquire();
}
maintainRunningSubstitutions = worker.runningSubstitutions.addTemporarily(1);
- outPipe.create();
+ auto pipe = kj::newPromiseAndCrossThreadFulfiller<void>();
+ outPipe = kj::mv(pipe.fulfiller);
thr = std::async(std::launch::async, [this]() {
+ /* Wake up the worker loop when we're done. */
+ Finally updateStats([this]() { outPipe->fulfill(); });
+
auto & fetchPath = subPath ? *subPath : storePath;
try {
ReceiveInterrupts receiveInterrupts;
- /* Wake up the worker loop when we're done. */
- Finally updateStats([this]() { outPipe.writeSide.close(); });
-
Activity act(*logger, actSubstitute, Logger::Fields{worker.store.printStorePath(storePath), sub->getUri()});
PushActivity pact(act.id);
@@ -233,39 +228,39 @@ try {
}
});
- state = &PathSubstitutionGoal::finished;
- return {WaitForWorld{{outPipe.readSide.get()}, true}};
+ co_await pipe.promise;
+ co_return co_await finished();
} catch (...) {
- return {std::current_exception()};
+ co_return result::failure(std::current_exception());
}
-kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::finished(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::finished() noexcept
try {
trace("substitute finished");
- worker.childTerminated(this);
-
- try {
- thr.get();
- } catch (std::exception & e) {
- printError(e.what());
-
- /* Cause the parent build to fail unless --fallback is given,
- or the substitute has disappeared. The latter case behaves
- the same as the substitute never having existed in the
- first place. */
+ do {
try {
- throw;
- } catch (SubstituteGone &) {
- } catch (...) {
- substituterFailed = true;
+ slotToken = {};
+ thr.get();
+ break;
+ } catch (std::exception & e) {
+ printError(e.what());
+
+ /* Cause the parent build to fail unless --fallback is given,
+ or the substitute has disappeared. The latter case behaves
+ the same as the substitute never having existed in the
+ first place. */
+ try {
+ throw;
+ } catch (SubstituteGone &) {
+ } catch (...) {
+ substituterFailed = true;
+ }
}
-
/* Try the next substitute. */
- state = &PathSubstitutionGoal::tryNext;
- return tryNext(inBuildSlot);
- }
+ co_return co_await tryNext();
+ } while (false);
worker.markContentsGood(storePath);
@@ -282,15 +277,9 @@ try {
worker.doneNarSize += maintainExpectedNar.delta();
maintainExpectedNar.reset();
- return {done(ecSuccess, BuildResult::Substituted)};
+ co_return done(ecSuccess, BuildResult::Substituted);
} catch (...) {
- return {std::current_exception()};
-}
-
-
-Goal::WorkResult PathSubstitutionGoal::handleChildOutput(int fd, std::string_view data)
-{
- return StillAlive{};
+ co_return result::failure(std::current_exception());
}
@@ -300,12 +289,9 @@ void PathSubstitutionGoal::cleanup()
if (thr.valid()) {
// FIXME: signal worker thread to quit.
thr.get();
- worker.childTerminated(this);
}
-
- outPipe.close();
} catch (...) {
- ignoreException();
+ ignoreExceptionInDestructor();
}
}
diff --git a/src/libstore/build/substitution-goal.hh b/src/libstore/build/substitution-goal.hh
index 3c97b19fd..18b4262a4 100644
--- a/src/libstore/build/substitution-goal.hh
+++ b/src/libstore/build/substitution-goal.hh
@@ -46,7 +46,7 @@ struct PathSubstitutionGoal : public Goal
/**
* Pipe for the substituter's standard output.
*/
- Pipe outPipe;
+ kj::Own<kj::CrossThreadPromiseFulfiller<void>> outPipe;
/**
* The substituter thread.
@@ -67,15 +67,12 @@ struct PathSubstitutionGoal : public Goal
NotifyingCounter<uint64_t>::Bump maintainExpectedSubstitutions,
maintainRunningSubstitutions, maintainExpectedNar, maintainExpectedDownload;
- typedef kj::Promise<Result<WorkResult>> (PathSubstitutionGoal::*GoalState)(bool inBuildSlot) noexcept;
- GoalState state;
-
/**
* Content address for recomputing store path
*/
std::optional<ContentAddress> ca;
- Finished done(
+ WorkResult done(
ExitCode result,
BuildResult::Status status,
std::optional<std::string> errorMsg = {});
@@ -90,32 +87,15 @@ public:
);
~PathSubstitutionGoal();
- Finished timedOut(Error && ex) override { abort(); };
-
- /**
- * We prepend "a$" to the key name to ensure substitution goals
- * happen before derivation goals.
- */
- std::string key() override
- {
- return "a$" + std::string(storePath.name()) + "$" + worker.store.printStorePath(storePath);
- }
-
- kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept override;
+ kj::Promise<Result<WorkResult>> workImpl() noexcept override;
/**
* The states.
*/
- kj::Promise<Result<WorkResult>> init(bool inBuildSlot) noexcept;
- kj::Promise<Result<WorkResult>> tryNext(bool inBuildSlot) noexcept;
- kj::Promise<Result<WorkResult>> referencesValid(bool inBuildSlot) noexcept;
- kj::Promise<Result<WorkResult>> tryToRun(bool inBuildSlot) noexcept;
- kj::Promise<Result<WorkResult>> finished(bool inBuildSlot) noexcept;
-
- /**
- * Callback used by the worker to write to the log.
- */
- WorkResult handleChildOutput(int fd, std::string_view data) override;
+ kj::Promise<Result<WorkResult>> tryNext() noexcept;
+ kj::Promise<Result<WorkResult>> referencesValid() noexcept;
+ kj::Promise<Result<WorkResult>> tryToRun() noexcept;
+ kj::Promise<Result<WorkResult>> finished() noexcept;
/* Called by destructor, can't be overridden */
void cleanup() override final;
diff --git a/src/libstore/build/worker.cc b/src/libstore/build/worker.cc
index ee45c7e3f..10f58f5d3 100644
--- a/src/libstore/build/worker.cc
+++ b/src/libstore/build/worker.cc
@@ -1,3 +1,4 @@
+#include "async-collect.hh"
#include "charptr-cast.hh"
#include "worker.hh"
#include "finally.hh"
@@ -6,11 +7,22 @@
#include "local-derivation-goal.hh"
#include "signals.hh"
#include "hook-instance.hh" // IWYU pragma: keep
-
-#include <poll.h>
+#include <boost/outcome/try.hpp>
+#include <kj/vector.h>
namespace nix {
+namespace {
+struct ErrorHandler : kj::TaskSet::ErrorHandler
+{
+ void taskFailed(kj::Exception && e) override
+ {
+ printError("unexpected async failure in Worker: %s", kj::str(e).cStr());
+ abort();
+ }
+} errorHandler;
+}
+
Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio)
: act(*logger, actRealise)
, actDerivations(*logger, actBuilds)
@@ -18,11 +30,13 @@ Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio)
, store(store)
, evalStore(evalStore)
, aio(aio)
+ /* Make sure that we are always allowed to run at least one substitution.
+ This prevents infinite waiting. */
+ , substitutions(std::max<unsigned>(1, settings.maxSubstitutionJobs))
+ , localBuilds(settings.maxBuildJobs)
+ , children(errorHandler)
{
/* Debugging: prevent recursive workers. */
- nrLocalBuilds = 0;
- nrSubstitutions = 0;
- lastWokenUp = steady_time_point::min();
}
@@ -32,7 +46,11 @@ Worker::~Worker()
goals that refer to this worker should be gone. (Otherwise we
are in trouble, since goals may call childTerminated() etc. in
their destructors). */
- topGoals.clear();
+ children.clear();
+
+ derivationGoals.clear();
+ drvOutputSubstitutionGoals.clear();
+ substitutionGoals.clear();
assert(expectedSubstitutions == 0);
assert(expectedDownloadSize == 0);
@@ -40,292 +58,158 @@ Worker::~Worker()
}
-std::shared_ptr<DerivationGoal> Worker::makeDerivationGoalCommon(
- const StorePath & drvPath,
- const OutputsSpec & wantedOutputs,
- std::function<std::shared_ptr<DerivationGoal>()> mkDrvGoal)
+template<typename ID, std::derived_from<Goal> G>
+std::pair<std::shared_ptr<G>, kj::Promise<Result<Goal::WorkResult>>> Worker::makeGoalCommon(
+ std::map<ID, CachedGoal<G>> & map,
+ const ID & key,
+ InvocableR<std::unique_ptr<G>> auto create,
+ InvocableR<bool, G &> auto modify
+)
{
- std::weak_ptr<DerivationGoal> & goal_weak = derivationGoals[drvPath];
- std::shared_ptr<DerivationGoal> goal = goal_weak.lock();
- if (!goal) {
- goal = mkDrvGoal();
- goal_weak = goal;
- wakeUp(goal);
- } else {
- goal->addWantedOutputs(wantedOutputs);
+ auto [it, _inserted] = map.try_emplace(key);
+ // try twice to create the goal. we can only loop if we hit the continue,
+ // and then we only want to recreate the goal *once*. concurrent accesses
+ // to the worker are not sound, we want to catch them if at all possible.
+ for ([[maybe_unused]] auto _attempt : {1, 2}) {
+ auto & cachedGoal = it->second;
+ auto & goal = cachedGoal.goal;
+ if (!goal) {
+ goal = create();
+ // do not start working immediately. if we are not yet running we
+ // may create dependencies as though they were toplevel goals, in
+ // which case the dependencies will not report build errors. when
+ // we are running we may be called for this same goal more times,
+ // and then we want to modify rather than recreate when possible.
+ auto removeWhenDone = [goal, &map, it] {
+ // c++ lambda coroutine capture semantics are *so* fucked up.
+ return [](auto goal, auto & map, auto it) -> kj::Promise<Result<Goal::WorkResult>> {
+ auto result = co_await goal->work();
+ // a concurrent call to makeGoalCommon may have reset our
+ // cached goal and replaced it with a new instance. don't
+ // remove the goal in this case, otherwise we will crash.
+ if (goal == it->second.goal) {
+ map.erase(it);
+ }
+ co_return result;
+ }(goal, map, it);
+ };
+ cachedGoal.promise = kj::evalLater(std::move(removeWhenDone)).fork();
+ children.add(cachedGoal.promise.addBranch().then([this](auto _result) {
+ if (_result.has_value()) {
+ auto & result = _result.value();
+ permanentFailure |= result.permanentFailure;
+ timedOut |= result.timedOut;
+ hashMismatch |= result.hashMismatch;
+ checkMismatch |= result.checkMismatch;
+ }
+ }));
+ } else {
+ if (!modify(*goal)) {
+ cachedGoal = {};
+ continue;
+ }
+ }
+ return {goal, cachedGoal.promise.addBranch()};
}
- return goal;
+ assert(false && "could not make a goal. possible concurrent worker access");
}
-std::shared_ptr<DerivationGoal> Worker::makeDerivationGoal(const StorePath & drvPath,
- const OutputsSpec & wantedOutputs, BuildMode buildMode)
+std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<Result<Goal::WorkResult>>> Worker::makeDerivationGoal(
+ const StorePath & drvPath, const OutputsSpec & wantedOutputs, BuildMode buildMode
+)
{
- return makeDerivationGoalCommon(
+ return makeGoalCommon(
+ derivationGoals,
drvPath,
- wantedOutputs,
- [&]() -> std::shared_ptr<DerivationGoal> {
+ [&]() -> std::unique_ptr<DerivationGoal> {
return !dynamic_cast<LocalStore *>(&store)
- ? std::make_shared<DerivationGoal>(
+ ? std::make_unique<DerivationGoal>(
drvPath, wantedOutputs, *this, running, buildMode
)
: LocalDerivationGoal::makeLocalDerivationGoal(
drvPath, wantedOutputs, *this, running, buildMode
);
- }
+ },
+ [&](DerivationGoal & g) { return g.addWantedOutputs(wantedOutputs); }
);
}
-std::shared_ptr<DerivationGoal> Worker::makeBasicDerivationGoal(const StorePath & drvPath,
- const BasicDerivation & drv, const OutputsSpec & wantedOutputs, BuildMode buildMode)
+std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<Result<Goal::WorkResult>>> Worker::makeBasicDerivationGoal(
+ const StorePath & drvPath,
+ const BasicDerivation & drv,
+ const OutputsSpec & wantedOutputs,
+ BuildMode buildMode
+)
{
- return makeDerivationGoalCommon(
+ return makeGoalCommon(
+ derivationGoals,
drvPath,
- wantedOutputs,
- [&]() -> std::shared_ptr<DerivationGoal> {
+ [&]() -> std::unique_ptr<DerivationGoal> {
return !dynamic_cast<LocalStore *>(&store)
- ? std::make_shared<DerivationGoal>(
+ ? std::make_unique<DerivationGoal>(
drvPath, drv, wantedOutputs, *this, running, buildMode
)
: LocalDerivationGoal::makeLocalDerivationGoal(
drvPath, drv, wantedOutputs, *this, running, buildMode
);
- }
+ },
+ [&](DerivationGoal & g) { return g.addWantedOutputs(wantedOutputs); }
);
}
-std::shared_ptr<PathSubstitutionGoal> Worker::makePathSubstitutionGoal(const StorePath & path, RepairFlag repair, std::optional<ContentAddress> ca)
+std::pair<std::shared_ptr<PathSubstitutionGoal>, kj::Promise<Result<Goal::WorkResult>>>
+Worker::makePathSubstitutionGoal(
+ const StorePath & path, RepairFlag repair, std::optional<ContentAddress> ca
+)
{
- std::weak_ptr<PathSubstitutionGoal> & goal_weak = substitutionGoals[path];
- auto goal = goal_weak.lock(); // FIXME
- if (!goal) {
- goal = std::make_shared<PathSubstitutionGoal>(path, *this, running, repair, ca);
- goal_weak = goal;
- wakeUp(goal);
- }
- return goal;
+ return makeGoalCommon(
+ substitutionGoals,
+ path,
+ [&] { return std::make_unique<PathSubstitutionGoal>(path, *this, running, repair, ca); },
+ [&](auto &) { return true; }
+ );
}
-std::shared_ptr<DrvOutputSubstitutionGoal> Worker::makeDrvOutputSubstitutionGoal(const DrvOutput& id, RepairFlag repair, std::optional<ContentAddress> ca)
+std::pair<std::shared_ptr<DrvOutputSubstitutionGoal>, kj::Promise<Result<Goal::WorkResult>>>
+Worker::makeDrvOutputSubstitutionGoal(
+ const DrvOutput & id, RepairFlag repair, std::optional<ContentAddress> ca
+)
{
- std::weak_ptr<DrvOutputSubstitutionGoal> & goal_weak = drvOutputSubstitutionGoals[id];
- auto goal = goal_weak.lock(); // FIXME
- if (!goal) {
- goal = std::make_shared<DrvOutputSubstitutionGoal>(id, *this, running, repair, ca);
- goal_weak = goal;
- wakeUp(goal);
- }
- return goal;
+ return makeGoalCommon(
+ drvOutputSubstitutionGoals,
+ id,
+ [&] { return std::make_unique<DrvOutputSubstitutionGoal>(id, *this, running, repair, ca); },
+ [&](auto &) { return true; }
+ );
}
-GoalPtr Worker::makeGoal(const DerivedPath & req, BuildMode buildMode)
+std::pair<GoalPtr, kj::Promise<Result<Goal::WorkResult>>> Worker::makeGoal(const DerivedPath & req, BuildMode buildMode)
{
return std::visit(overloaded {
- [&](const DerivedPath::Built & bfd) -> GoalPtr {
+ [&](const DerivedPath::Built & bfd) -> std::pair<GoalPtr, kj::Promise<Result<Goal::WorkResult>>> {
if (auto bop = std::get_if<DerivedPath::Opaque>(&*bfd.drvPath))
return makeDerivationGoal(bop->path, bfd.outputs, buildMode);
else
throw UnimplementedError("Building dynamic derivations in one shot is not yet implemented.");
},
- [&](const DerivedPath::Opaque & bo) -> GoalPtr {
+ [&](const DerivedPath::Opaque & bo) -> std::pair<GoalPtr, kj::Promise<Result<Goal::WorkResult>>> {
return makePathSubstitutionGoal(bo.path, buildMode == bmRepair ? Repair : NoRepair);
},
}, req.raw());
}
+kj::Promise<Result<Worker::Results>> Worker::updateStatistics()
+try {
+ while (true) {
+ statisticsUpdateInhibitor = co_await statisticsUpdateSignal.acquire();
-template<typename K, typename G>
-static void removeGoal(std::shared_ptr<G> goal, std::map<K, std::weak_ptr<G>> & goalMap)
-{
- /* !!! inefficient */
- for (auto i = goalMap.begin();
- i != goalMap.end(); )
- if (i->second.lock() == goal) {
- auto j = i; ++j;
- goalMap.erase(i);
- i = j;
- }
- else ++i;
-}
-
-
-void Worker::goalFinished(GoalPtr goal, Goal::Finished & f)
-{
- goal->trace("done");
- assert(!goal->exitCode.has_value());
- goal->exitCode = f.exitCode;
- goal->ex = f.ex;
-
- permanentFailure |= f.permanentFailure;
- timedOut |= f.timedOut;
- hashMismatch |= f.hashMismatch;
- checkMismatch |= f.checkMismatch;
-
- for (auto & i : goal->waiters) {
- if (GoalPtr waiting = i.lock()) {
- assert(waiting->waitees.count(goal));
- waiting->waitees.erase(goal);
-
- waiting->trace(fmt("waitee '%s' done; %d left", goal->name, waiting->waitees.size()));
-
- if (f.exitCode != Goal::ecSuccess) ++waiting->nrFailed;
- if (f.exitCode == Goal::ecNoSubstituters) ++waiting->nrNoSubstituters;
- if (f.exitCode == Goal::ecIncompleteClosure) ++waiting->nrIncompleteClosure;
-
- if (waiting->waitees.empty() || (f.exitCode == Goal::ecFailed && !settings.keepGoing)) {
- /* If we failed and keepGoing is not set, we remove all
- remaining waitees. */
- for (auto & i : waiting->waitees) {
- i->waiters.extract(waiting);
- }
- waiting->waitees.clear();
-
- wakeUp(waiting);
- }
-
- waiting->waiteeDone(goal);
- }
- }
- goal->waiters.clear();
- removeGoal(goal);
- goal->cleanup();
-}
-
-void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how)
-{
- std::visit(
- overloaded{
- [&](Goal::StillAlive) {},
- [&](Goal::WaitForSlot) { waitForBuildSlot(goal); },
- [&](Goal::WaitForAWhile) { waitForAWhile(goal); },
- [&](Goal::ContinueImmediately) { wakeUp(goal); },
- [&](Goal::WaitForGoals & w) {
- for (auto & dep : w.goals) {
- goal->waitees.insert(dep);
- dep->waiters.insert(goal);
- }
- },
- [&](Goal::WaitForWorld & w) { childStarted(goal, w.fds, w.inBuildSlot); },
- [&](Goal::Finished & f) { goalFinished(goal, f); },
- },
- how
- );
-}
-
-void Worker::removeGoal(GoalPtr goal)
-{
- if (auto drvGoal = std::dynamic_pointer_cast<DerivationGoal>(goal))
- nix::removeGoal(drvGoal, derivationGoals);
- else if (auto subGoal = std::dynamic_pointer_cast<PathSubstitutionGoal>(goal))
- nix::removeGoal(subGoal, substitutionGoals);
- else if (auto subGoal = std::dynamic_pointer_cast<DrvOutputSubstitutionGoal>(goal))
- nix::removeGoal(subGoal, drvOutputSubstitutionGoals);
- else
- assert(false);
-
- if (topGoals.find(goal) != topGoals.end()) {
- topGoals.erase(goal);
- /* If a top-level goal failed, then kill all other goals
- (unless keepGoing was set). */
- if (goal->exitCode == Goal::ecFailed && !settings.keepGoing)
- topGoals.clear();
- }
-}
-
-
-void Worker::wakeUp(GoalPtr goal)
-{
- goal->trace("woken up");
- awake.insert(goal);
-}
-
-
-void Worker::childStarted(GoalPtr goal, const std::set<int> & fds,
- bool inBuildSlot)
-{
- Child child;
- child.goal = goal;
- child.goal2 = goal.get();
- child.fds = fds;
- child.timeStarted = child.lastOutput = steady_time_point::clock::now();
- child.inBuildSlot = inBuildSlot;
- children.emplace_back(child);
- if (inBuildSlot) {
- switch (goal->jobCategory()) {
- case JobCategory::Substitution:
- nrSubstitutions++;
- break;
- case JobCategory::Build:
- nrLocalBuilds++;
- break;
- default:
- abort();
- }
- }
-}
-
-
-void Worker::childTerminated(Goal * goal)
-{
- auto i = std::find_if(children.begin(), children.end(),
- [&](const Child & child) { return child.goal2 == goal; });
- if (i == children.end()) return;
-
- if (i->inBuildSlot) {
- switch (goal->jobCategory()) {
- case JobCategory::Substitution:
- assert(nrSubstitutions > 0);
- nrSubstitutions--;
- break;
- case JobCategory::Build:
- assert(nrLocalBuilds > 0);
- nrLocalBuilds--;
- break;
- default:
- abort();
- }
- }
-
- children.erase(i);
-
- /* Wake up goals waiting for a build slot. */
- for (auto & j : wantingToBuild) {
- GoalPtr goal = j.lock();
- if (goal) wakeUp(goal);
- }
-
- wantingToBuild.clear();
-}
-
-
-void Worker::waitForBuildSlot(GoalPtr goal)
-{
- goal->trace("wait for build slot");
- bool isSubstitutionGoal = goal->jobCategory() == JobCategory::Substitution;
- if ((!isSubstitutionGoal && nrLocalBuilds < settings.maxBuildJobs) ||
- (isSubstitutionGoal && nrSubstitutions < settings.maxSubstitutionJobs))
- wakeUp(goal); /* we can do it right away */
- else
- wantingToBuild.insert(goal);
-}
-
-
-void Worker::waitForAWhile(GoalPtr goal)
-{
- debug("wait for a while");
- waitingForAWhile.insert(goal);
-}
-
-
-void Worker::updateStatistics()
-{
- // only update progress info while running. this notably excludes updating
- // progress info while destroying, which causes the progress bar to assert
- if (running && statisticsOutdated) {
+ // only update progress info while running. this notably excludes updating
+ // progress info while destroying, which causes the progress bar to assert
actDerivations.progress(
doneBuilds, expectedBuilds + doneBuilds, runningBuilds, failedBuilds
);
@@ -338,221 +222,82 @@ void Worker::updateStatistics()
act.setExpected(actFileTransfer, expectedDownloadSize + doneDownloadSize);
act.setExpected(actCopyPath, expectedNarSize + doneNarSize);
- statisticsOutdated = false;
+ // limit to 50fps. that should be more than good enough for anything we do
+ co_await aio.provider->getTimer().afterDelay(20 * kj::MILLISECONDS);
}
+} catch (...) {
+ co_return result::failure(std::current_exception());
}
-Goals Worker::run(std::function<Goals (GoalFactory &)> req)
+Worker::Results Worker::run(std::function<Targets (GoalFactory &)> req)
{
- auto _topGoals = req(goalFactory());
+ auto topGoals = req(goalFactory());
assert(!running);
running = true;
Finally const _stop([&] { running = false; });
- updateStatistics();
+ auto onInterrupt = kj::newPromiseAndCrossThreadFulfiller<Result<Results>>();
+ auto interruptCallback = createInterruptCallback([&] {
+ onInterrupt.fulfiller->fulfill(result::failure(std::make_exception_ptr(makeInterrupted())));
+ });
- topGoals = _topGoals;
+ auto promise = runImpl(std::move(topGoals))
+ .exclusiveJoin(updateStatistics())
+ .exclusiveJoin(std::move(onInterrupt.promise));
- debug("entered goal loop");
+ // TODO GC interface?
+ if (auto localStore = dynamic_cast<LocalStore *>(&store); localStore && settings.minFree != 0u) {
+ // Periodically wake up to see if we need to run the garbage collector.
+ promise = promise.exclusiveJoin(boopGC(*localStore));
+ }
- while (1) {
+ return promise.wait(aio.waitScope).value();
+}
- checkInterrupt();
+kj::Promise<Result<Worker::Results>> Worker::runImpl(Targets topGoals)
+try {
+ debug("entered goal loop");
- // TODO GC interface?
- if (auto localStore = dynamic_cast<LocalStore *>(&store))
- localStore->autoGC(false);
+ kj::Vector<Targets::value_type> promises(topGoals.size());
+ for (auto & gp : topGoals) {
+ promises.add(std::move(gp));
+ }
- /* Call every wake goal (in the ordering established by
- CompareGoalPtrs). */
- while (!awake.empty() && !topGoals.empty()) {
- Goals awake2;
- for (auto & i : awake) {
- GoalPtr goal = i.lock();
- if (goal) awake2.insert(goal);
- }
- awake.clear();
- for (auto & goal : awake2) {
- checkInterrupt();
- /* Make sure that we are always allowed to run at least one substitution.
- This prevents infinite waiting. */
- const bool inSlot = goal->jobCategory() == JobCategory::Substitution
- ? nrSubstitutions < std::max(1U, (unsigned int) settings.maxSubstitutionJobs)
- : nrLocalBuilds < settings.maxBuildJobs;
- handleWorkResult(goal, goal->work(inSlot).wait(aio.waitScope).value());
- updateStatistics();
-
- if (topGoals.empty()) break; // stuff may have been cancelled
- }
- }
+ Results results;
- if (topGoals.empty()) break;
+ auto collect = AsyncCollect(promises.releaseAsArray());
+ while (auto done = co_await collect.next()) {
+ // propagate goal exceptions outward
+ BOOST_OUTCOME_CO_TRY(auto result, done->second);
+ results.emplace(done->first, result);
- /* Wait for input. */
- if (!children.empty() || !waitingForAWhile.empty())
- waitForInput();
- else {
- assert(!awake.empty());
+ /* If a top-level goal failed, then kill all other goals
+ (unless keepGoing was set). */
+ if (result.exitCode == Goal::ecFailed && !settings.keepGoing) {
+ children.clear();
+ break;
}
}
/* If --keep-going is not set, it's possible that the main goal
exited while some of its subgoals were still active. But if
--keep-going *is* set, then they must all be finished now. */
- assert(!settings.keepGoing || awake.empty());
- assert(!settings.keepGoing || wantingToBuild.empty());
- assert(!settings.keepGoing || children.empty());
+ assert(!settings.keepGoing || children.isEmpty());
- return _topGoals;
+ co_return std::move(results);
+} catch (...) {
+ co_return result::failure(std::current_exception());
}
-void Worker::waitForInput()
-{
- printMsg(lvlVomit, "waiting for children");
-
- /* Process output from the file descriptors attached to the
- children, namely log output and output path creation commands.
- We also use this to detect child termination: if we get EOF on
- the logger pipe of a build, we assume that the builder has
- terminated. */
-
- bool useTimeout = false;
- long timeout = 0;
- auto before = steady_time_point::clock::now();
-
- /* If we're monitoring for silence on stdout/stderr, or if there
- is a build timeout, then wait for input until the first
- deadline for any child. */
- auto nearest = steady_time_point::max(); // nearest deadline
- if (settings.minFree.get() != 0)
- // Periodicallty wake up to see if we need to run the garbage collector.
- nearest = before + std::chrono::seconds(10);
- for (auto & i : children) {
- if (auto goal = i.goal.lock()) {
- if (!goal->respectsTimeouts()) continue;
- if (0 != settings.maxSilentTime)
- nearest = std::min(nearest, i.lastOutput + std::chrono::seconds(settings.maxSilentTime));
- if (0 != settings.buildTimeout)
- nearest = std::min(nearest, i.timeStarted + std::chrono::seconds(settings.buildTimeout));
- }
- }
- if (nearest != steady_time_point::max()) {
- timeout = std::max(1L, (long) std::chrono::duration_cast<std::chrono::seconds>(nearest - before).count());
- useTimeout = true;
- }
-
- /* If we are polling goals that are waiting for a lock, then wake
- up after a few seconds at most. */
- if (!waitingForAWhile.empty()) {
- useTimeout = true;
- if (lastWokenUp == steady_time_point::min() || lastWokenUp > before) lastWokenUp = before;
- timeout = std::max(1L,
- (long) std::chrono::duration_cast<std::chrono::seconds>(
- lastWokenUp + std::chrono::seconds(settings.pollInterval) - before).count());
- } else lastWokenUp = steady_time_point::min();
-
- if (useTimeout)
- vomit("sleeping %d seconds", timeout);
-
- /* Use select() to wait for the input side of any logger pipe to
- become `available'. Note that `available' (i.e., non-blocking)
- includes EOF. */
- std::vector<struct pollfd> pollStatus;
- std::map<int, size_t> fdToPollStatus;
- for (auto & i : children) {
- for (auto & j : i.fds) {
- pollStatus.push_back((struct pollfd) { .fd = j, .events = POLLIN });
- fdToPollStatus[j] = pollStatus.size() - 1;
- }
- }
-
- if (poll(pollStatus.data(), pollStatus.size(),
- useTimeout ? timeout * 1000 : -1) == -1) {
- if (errno == EINTR) return;
- throw SysError("waiting for input");
- }
-
- auto after = steady_time_point::clock::now();
-
- /* Process all available file descriptors. FIXME: this is
- O(children * fds). */
- decltype(children)::iterator i;
- for (auto j = children.begin(); j != children.end(); j = i) {
- i = std::next(j);
-
- checkInterrupt();
-
- GoalPtr goal = j->goal.lock();
- assert(goal);
-
- if (!goal->exitCode.has_value() &&
- 0 != settings.maxSilentTime &&
- goal->respectsTimeouts() &&
- after - j->lastOutput >= std::chrono::seconds(settings.maxSilentTime))
- {
- handleWorkResult(
- goal,
- goal->timedOut(Error(
- "%1% timed out after %2% seconds of silence",
- goal->getName(),
- settings.maxSilentTime
- ))
- );
- continue;
- }
-
- else if (!goal->exitCode.has_value() &&
- 0 != settings.buildTimeout &&
- goal->respectsTimeouts() &&
- after - j->timeStarted >= std::chrono::seconds(settings.buildTimeout))
- {
- handleWorkResult(
- goal,
- goal->timedOut(
- Error("%1% timed out after %2% seconds", goal->getName(), settings.buildTimeout)
- )
- );
- continue;
- }
-
- std::set<int> fds2(j->fds);
- std::vector<unsigned char> buffer(4096);
- for (auto & k : fds2) {
- const auto fdPollStatusId = get(fdToPollStatus, k);
- assert(fdPollStatusId);
- assert(*fdPollStatusId < pollStatus.size());
- if (pollStatus.at(*fdPollStatusId).revents) {
- ssize_t rd = ::read(k, buffer.data(), buffer.size());
- // FIXME: is there a cleaner way to handle pt close
- // than EIO? Is this even standard?
- if (rd == 0 || (rd == -1 && errno == EIO)) {
- debug("%1%: got EOF", goal->getName());
- goal->handleEOF(k);
- handleWorkResult(goal, Goal::ContinueImmediately{});
- j->fds.erase(k);
- } else if (rd == -1) {
- if (errno != EINTR)
- throw SysError("%s: read failed", goal->getName());
- } else {
- printMsg(lvlVomit, "%1%: read %2% bytes",
- goal->getName(), rd);
- std::string_view data(charptr_cast<char *>(buffer.data()), rd);
- j->lastOutput = after;
- handleWorkResult(goal, goal->handleChildOutput(k, data));
- }
- }
- }
- }
-
- if (!waitingForAWhile.empty() && lastWokenUp + std::chrono::seconds(settings.pollInterval) <= after) {
- lastWokenUp = after;
- for (auto & i : waitingForAWhile) {
- GoalPtr goal = i.lock();
- if (goal) wakeUp(goal);
- }
- waitingForAWhile.clear();
+kj::Promise<Result<Worker::Results>> Worker::boopGC(LocalStore & localStore)
+try {
+ while (true) {
+ co_await aio.provider->getTimer().afterDelay(10 * kj::SECONDS);
+ localStore.autoGC(false);
}
+} catch (...) {
+ co_return result::failure(std::current_exception());
}
diff --git a/src/libstore/build/worker.hh b/src/libstore/build/worker.hh
index 6735ea0b9..1a913ca16 100644
--- a/src/libstore/build/worker.hh
+++ b/src/libstore/build/worker.hh
@@ -1,6 +1,8 @@
#pragma once
///@file
+#include "async-semaphore.hh"
+#include "concepts.hh"
#include "notifying-counter.hh"
#include "types.hh"
#include "lock.hh"
@@ -18,37 +20,22 @@ namespace nix {
struct DerivationGoal;
struct PathSubstitutionGoal;
class DrvOutputSubstitutionGoal;
+class LocalStore;
typedef std::chrono::time_point<std::chrono::steady_clock> steady_time_point;
-/**
- * A mapping used to remember for each child process to what goal it
- * belongs, and file descriptors for receiving log data and output
- * path creation commands.
- */
-struct Child
-{
- WeakGoalPtr goal;
- Goal * goal2; // ugly hackery
- std::set<int> fds;
- bool inBuildSlot;
- /**
- * Time we last got output on stdout/stderr
- */
- steady_time_point lastOutput;
- steady_time_point timeStarted;
-};
-
/* Forward definition. */
struct HookInstance;
class GoalFactory
{
public:
- virtual std::shared_ptr<DerivationGoal> makeDerivationGoal(
+ virtual std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<Result<Goal::WorkResult>>>
+ makeDerivationGoal(
const StorePath & drvPath, const OutputsSpec & wantedOutputs, BuildMode buildMode = bmNormal
) = 0;
- virtual std::shared_ptr<DerivationGoal> makeBasicDerivationGoal(
+ virtual std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<Result<Goal::WorkResult>>>
+ makeBasicDerivationGoal(
const StorePath & drvPath,
const BasicDerivation & drv,
const OutputsSpec & wantedOutputs,
@@ -58,12 +45,14 @@ public:
/**
* @ref SubstitutionGoal "substitution goal"
*/
- virtual std::shared_ptr<PathSubstitutionGoal> makePathSubstitutionGoal(
+ virtual std::pair<std::shared_ptr<PathSubstitutionGoal>, kj::Promise<Result<Goal::WorkResult>>>
+ makePathSubstitutionGoal(
const StorePath & storePath,
RepairFlag repair = NoRepair,
std::optional<ContentAddress> ca = std::nullopt
) = 0;
- virtual std::shared_ptr<DrvOutputSubstitutionGoal> makeDrvOutputSubstitutionGoal(
+ virtual std::pair<std::shared_ptr<DrvOutputSubstitutionGoal>, kj::Promise<Result<Goal::WorkResult>>>
+ makeDrvOutputSubstitutionGoal(
const DrvOutput & id,
RepairFlag repair = NoRepair,
std::optional<ContentAddress> ca = std::nullopt
@@ -75,7 +64,8 @@ public:
* It will be a `DerivationGoal` for a `DerivedPath::Built` or
* a `SubstitutionGoal` for a `DerivedPath::Opaque`.
*/
- virtual GoalPtr makeGoal(const DerivedPath & req, BuildMode buildMode = bmNormal) = 0;
+ virtual std::pair<GoalPtr, kj::Promise<Result<Goal::WorkResult>>>
+ makeGoal(const DerivedPath & req, BuildMode buildMode = bmNormal) = 0;
};
// elaborate hoax to let goals access factory methods while hiding them from the public
@@ -94,61 +84,27 @@ protected:
*/
class Worker : public WorkerBase
{
+public:
+ using Targets = std::map<GoalPtr, kj::Promise<Result<Goal::WorkResult>>>;
+ using Results = std::map<GoalPtr, Goal::WorkResult>;
+
private:
bool running = false;
- /* Note: the worker should only have strong pointers to the
- top-level goals. */
-
- /**
- * The top-level goals of the worker.
- */
- Goals topGoals;
-
- /**
- * Goals that are ready to do some work.
- */
- WeakGoals awake;
-
- /**
- * Goals waiting for a build slot.
- */
- WeakGoals wantingToBuild;
-
- /**
- * Child processes currently running.
- */
- std::list<Child> children;
-
- /**
- * Number of build slots occupied. This includes local builds but does not
- * include substitutions or remote builds via the build hook.
- */
- unsigned int nrLocalBuilds;
-
- /**
- * Number of substitution slots occupied.
- */
- unsigned int nrSubstitutions;
-
+ template<typename G>
+ struct CachedGoal
+ {
+ std::shared_ptr<G> goal;
+ kj::ForkedPromise<Result<Goal::WorkResult>> promise{nullptr};
+ };
/**
* Maps used to prevent multiple instantiations of a goal for the
* same derivation / path.
*/
- std::map<StorePath, std::weak_ptr<DerivationGoal>> derivationGoals;
- std::map<StorePath, std::weak_ptr<PathSubstitutionGoal>> substitutionGoals;
- std::map<DrvOutput, std::weak_ptr<DrvOutputSubstitutionGoal>> drvOutputSubstitutionGoals;
-
- /**
- * Goals sleeping for a few seconds (polling a lock).
- */
- WeakGoals waitingForAWhile;
-
- /**
- * Last time the goals in `waitingForAWhile` where woken up.
- */
- steady_time_point lastWokenUp;
+ std::map<StorePath, CachedGoal<DerivationGoal>> derivationGoals;
+ std::map<StorePath, CachedGoal<PathSubstitutionGoal>> substitutionGoals;
+ std::map<DrvOutput, CachedGoal<DrvOutputSubstitutionGoal>> drvOutputSubstitutionGoals;
/**
* Cache for pathContentsGood().
@@ -176,60 +132,25 @@ private:
*/
bool checkMismatch = false;
- void goalFinished(GoalPtr goal, Goal::Finished & f);
- void handleWorkResult(GoalPtr goal, Goal::WorkResult how);
-
- /**
- * Put `goal` to sleep until a build slot becomes available (which
- * might be right away).
- */
- void waitForBuildSlot(GoalPtr goal);
-
- /**
- * Wait for a few seconds and then retry this goal. Used when
- * waiting for a lock held by another process. This kind of
- * polling is inefficient, but POSIX doesn't really provide a way
- * to wait for multiple locks in the main select() loop.
- */
- void waitForAWhile(GoalPtr goal);
-
- /**
- * Wake up a goal (i.e., there is something for it to do).
- */
- void wakeUp(GoalPtr goal);
-
- /**
- * Wait for input to become available.
- */
- void waitForInput();
-
- /**
- * Remove a dead goal.
- */
- void removeGoal(GoalPtr goal);
-
- /**
- * Registers a running child process. `inBuildSlot` means that
- * the process counts towards the jobs limit.
- */
- void childStarted(GoalPtr goal, const std::set<int> & fds,
- bool inBuildSlot);
-
/**
* Pass current stats counters to the logger for progress bar updates.
*/
- void updateStatistics();
+ kj::Promise<Result<Results>> updateStatistics();
- bool statisticsOutdated = true;
+ AsyncSemaphore statisticsUpdateSignal{1};
+ std::optional<AsyncSemaphore::Token> statisticsUpdateInhibitor;
/**
* Mark statistics as outdated, such that `updateStatistics` will be called.
*/
void updateStatisticsLater()
{
- statisticsOutdated = true;
+ statisticsUpdateInhibitor = {};
}
+ kj::Promise<Result<Results>> runImpl(Targets topGoals);
+ kj::Promise<Result<Results>> boopGC(LocalStore & localStore);
+
public:
const Activity act;
@@ -239,7 +160,12 @@ public:
Store & store;
Store & evalStore;
kj::AsyncIoContext & aio;
+ AsyncSemaphore substitutions, localBuilds;
+private:
+ kj::TaskSet children;
+
+public:
struct HookState {
std::unique_ptr<HookInstance> instance;
@@ -277,21 +203,35 @@ public:
* @ref DerivationGoal "derivation goal"
*/
private:
- std::shared_ptr<DerivationGoal> makeDerivationGoalCommon(
- const StorePath & drvPath, const OutputsSpec & wantedOutputs,
- std::function<std::shared_ptr<DerivationGoal>()> mkDrvGoal);
- std::shared_ptr<DerivationGoal> makeDerivationGoal(
+ template<typename ID, std::derived_from<Goal> G>
+ std::pair<std::shared_ptr<G>, kj::Promise<Result<Goal::WorkResult>>> makeGoalCommon(
+ std::map<ID, CachedGoal<G>> & map,
+ const ID & key,
+ InvocableR<std::unique_ptr<G>> auto create,
+ InvocableR<bool, G &> auto modify
+ );
+ std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<Result<Goal::WorkResult>>> makeDerivationGoal(
const StorePath & drvPath,
const OutputsSpec & wantedOutputs, BuildMode buildMode = bmNormal) override;
- std::shared_ptr<DerivationGoal> makeBasicDerivationGoal(
+ std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<Result<Goal::WorkResult>>> makeBasicDerivationGoal(
const StorePath & drvPath, const BasicDerivation & drv,
const OutputsSpec & wantedOutputs, BuildMode buildMode = bmNormal) override;
/**
* @ref SubstitutionGoal "substitution goal"
*/
- std::shared_ptr<PathSubstitutionGoal> makePathSubstitutionGoal(const StorePath & storePath, RepairFlag repair = NoRepair, std::optional<ContentAddress> ca = std::nullopt) override;
- std::shared_ptr<DrvOutputSubstitutionGoal> makeDrvOutputSubstitutionGoal(const DrvOutput & id, RepairFlag repair = NoRepair, std::optional<ContentAddress> ca = std::nullopt) override;
+ std::pair<std::shared_ptr<PathSubstitutionGoal>, kj::Promise<Result<Goal::WorkResult>>>
+ makePathSubstitutionGoal(
+ const StorePath & storePath,
+ RepairFlag repair = NoRepair,
+ std::optional<ContentAddress> ca = std::nullopt
+ ) override;
+ std::pair<std::shared_ptr<DrvOutputSubstitutionGoal>, kj::Promise<Result<Goal::WorkResult>>>
+ makeDrvOutputSubstitutionGoal(
+ const DrvOutput & id,
+ RepairFlag repair = NoRepair,
+ std::optional<ContentAddress> ca = std::nullopt
+ ) override;
/**
* Make a goal corresponding to the `DerivedPath`.
@@ -299,18 +239,14 @@ private:
* It will be a `DerivationGoal` for a `DerivedPath::Built` or
* a `SubstitutionGoal` for a `DerivedPath::Opaque`.
*/
- GoalPtr makeGoal(const DerivedPath & req, BuildMode buildMode = bmNormal) override;
+ std::pair<GoalPtr, kj::Promise<Result<Goal::WorkResult>>>
+ makeGoal(const DerivedPath & req, BuildMode buildMode = bmNormal) override;
public:
/**
- * Unregisters a running child process.
- */
- void childTerminated(Goal * goal);
-
- /**
* Loop until the specified top-level goals have finished.
*/
- Goals run(std::function<Goals (GoalFactory &)> req);
+ Results run(std::function<Targets (GoalFactory &)> req);
/***
* The exit status in case of failure.