diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/libstore/build/derivation-goal.cc | 234 | ||||
-rw-r--r-- | src/libstore/build/derivation-goal.hh | 30 | ||||
-rw-r--r-- | src/libstore/build/drv-output-substitution-goal.cc | 33 | ||||
-rw-r--r-- | src/libstore/build/drv-output-substitution-goal.hh | 6 | ||||
-rw-r--r-- | src/libstore/build/entry-points.cc | 30 | ||||
-rw-r--r-- | src/libstore/build/goal.cc | 42 | ||||
-rw-r--r-- | src/libstore/build/goal.hh | 69 | ||||
-rw-r--r-- | src/libstore/build/local-derivation-goal.cc | 21 | ||||
-rw-r--r-- | src/libstore/build/local-derivation-goal.hh | 2 | ||||
-rw-r--r-- | src/libstore/build/substitution-goal.cc | 44 | ||||
-rw-r--r-- | src/libstore/build/substitution-goal.hh | 11 | ||||
-rw-r--r-- | src/libstore/build/worker.cc | 398 | ||||
-rw-r--r-- | src/libstore/build/worker.hh | 144 | ||||
-rw-r--r-- | src/nix-channel/meson.build | 6 |
14 files changed, 522 insertions, 548 deletions
diff --git a/src/libstore/build/derivation-goal.cc b/src/libstore/build/derivation-goal.cc index 827c9f541..b8c4d278d 100644 --- a/src/libstore/build/derivation-goal.cc +++ b/src/libstore/build/derivation-goal.cc @@ -11,7 +11,13 @@ #include "drv-output-substitution-goal.hh" #include "strings.hh" +#include <boost/outcome/try.hpp> #include <fstream> +#include <kj/array.h> +#include <kj/async-unix.h> +#include <kj/async.h> +#include <kj/debug.h> +#include <kj/vector.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/un.h> @@ -131,9 +137,9 @@ Goal::Finished DerivationGoal::timedOut(Error && ex) } -kj::Promise<Result<Goal::WorkResult>> DerivationGoal::work(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DerivationGoal::work() noexcept { - return (this->*state)(inBuildSlot); + return (this->*state)(slotToken.valid()); } void DerivationGoal::addWantedOutputs(const OutputsSpec & outputs) @@ -170,7 +176,7 @@ try { state = &DerivationGoal::loadDerivation; - return {WaitForGoals{{worker.goalFactory().makePathSubstitutionGoal(drvPath)}}}; + return waitForGoals(worker.goalFactory().makePathSubstitutionGoal(drvPath)); } catch (...) { return {std::current_exception()}; } @@ -269,13 +275,13 @@ try { /* We are first going to try to create the invalid output paths through substitutes. If that doesn't work, we'll build them. */ - WaitForGoals result; + kj::Vector<std::pair<GoalPtr, kj::Promise<void>>> dependencies; if (settings.useSubstitutes) { if (parsedDrv->substitutesAllowed()) { for (auto & [outputName, status] : initialOutputs) { if (!status.wanted) continue; if (!status.known) - result.goals.insert( + dependencies.add( worker.goalFactory().makeDrvOutputSubstitutionGoal( DrvOutput{status.outputHash, outputName}, buildMode == bmRepair ? Repair : NoRepair @@ -283,7 +289,7 @@ try { ); else { auto * cap = getDerivationCA(*drv); - result.goals.insert(worker.goalFactory().makePathSubstitutionGoal( + dependencies.add(worker.goalFactory().makePathSubstitutionGoal( status.known->path, buildMode == bmRepair ? Repair : NoRepair, cap ? std::optional { *cap } : std::nullopt)); @@ -294,11 +300,11 @@ try { } } - if (result.goals.empty()) { /* to prevent hang (no wake-up event) */ + if (dependencies.empty()) { /* to prevent hang (no wake-up event) */ return outputsSubstitutionTried(inBuildSlot); } else { state = &DerivationGoal::outputsSubstitutionTried; - return {std::move(result)}; + return waitForGoals(dependencies.releaseAsArray()); } } catch (...) { return {std::current_exception()}; @@ -380,7 +386,7 @@ try { produced using a substitute. So we have to build instead. */ kj::Promise<Result<Goal::WorkResult>> DerivationGoal::gaveUpOnSubstitution(bool inBuildSlot) noexcept try { - WaitForGoals result; + kj::Vector<std::pair<GoalPtr, kj::Promise<void>>> dependencies; /* At this point we are building all outputs, so if more are wanted there is no need to restart. */ @@ -393,7 +399,7 @@ try { addWaiteeDerivedPath = [&](ref<SingleDerivedPath> inputDrv, const DerivedPathMap<StringSet>::ChildNode & inputNode) { if (!inputNode.value.empty()) - result.goals.insert(worker.goalFactory().makeGoal( + dependencies.add(worker.goalFactory().makeGoal( DerivedPath::Built { .drvPath = inputDrv, .outputs = inputNode.value, @@ -438,14 +444,14 @@ try { if (!settings.useSubstitutes) throw Error("dependency '%s' of '%s' does not exist, and substitution is disabled", worker.store.printStorePath(i), worker.store.printStorePath(drvPath)); - result.goals.insert(worker.goalFactory().makePathSubstitutionGoal(i)); + dependencies.add(worker.goalFactory().makePathSubstitutionGoal(i)); } - if (result.goals.empty()) {/* to prevent hang (no wake-up event) */ + if (dependencies.empty()) {/* to prevent hang (no wake-up event) */ return inputsRealised(inBuildSlot); } else { state = &DerivationGoal::inputsRealised; - return {result}; + return waitForGoals(dependencies.releaseAsArray()); } } catch (...) { return {std::current_exception()}; @@ -488,7 +494,7 @@ try { } /* Check each path (slow!). */ - WaitForGoals result; + kj::Vector<std::pair<GoalPtr, kj::Promise<void>>> dependencies; for (auto & i : outputClosure) { if (worker.pathContentsGood(i)) continue; printError( @@ -496,9 +502,9 @@ try { worker.store.printStorePath(i), worker.store.printStorePath(drvPath)); auto drvPath2 = outputsToDrv.find(i); if (drvPath2 == outputsToDrv.end()) - result.goals.insert(worker.goalFactory().makePathSubstitutionGoal(i, Repair)); + dependencies.add(worker.goalFactory().makePathSubstitutionGoal(i, Repair)); else - result.goals.insert(worker.goalFactory().makeGoal( + dependencies.add(worker.goalFactory().makeGoal( DerivedPath::Built { .drvPath = makeConstantStorePathRef(drvPath2->second), .outputs = OutputsSpec::All { }, @@ -506,12 +512,12 @@ try { bmRepair)); } - if (result.goals.empty()) { + if (dependencies.empty()) { return {done(BuildResult::AlreadyValid, assertPathValidity())}; } state = &DerivationGoal::closureRepaired; - return {result}; + return waitForGoals(dependencies.releaseAsArray()); } catch (...) { return {std::current_exception()}; } @@ -611,11 +617,12 @@ try { worker.store.printStorePath(pathResolved), }); - resolvedDrvGoal = worker.goalFactory().makeDerivationGoal( + auto dependency = worker.goalFactory().makeDerivationGoal( pathResolved, wantedOutputs, buildMode); + resolvedDrvGoal = dependency.first; state = &DerivationGoal::resolvedFinished; - return {WaitForGoals{{resolvedDrvGoal}}}; + return waitForGoals(std::move(dependency)); } std::function<void(const StorePath &, const DerivedPathMap<StringSet>::ChildNode &)> accumInputPaths; @@ -733,7 +740,7 @@ try { if (!actLock) actLock = std::make_unique<Activity>(*logger, lvlWarn, actBuildWaiting, fmt("waiting for lock on %s", Magenta(showPaths(lockFiles)))); - return {WaitForAWhile{}}; + return waitForAWhile(); } actLock.reset(); @@ -773,32 +780,32 @@ try { auto hookReply = tryBuildHook(inBuildSlot); auto result = std::visit( overloaded{ - [&](HookReply::Accept & a) -> std::optional<WorkResult> { + [&](HookReply::Accept & a) -> std::optional<kj::Promise<Result<WorkResult>>> { /* Yes, it has started doing so. Wait until we get EOF from the hook. */ actLock.reset(); buildResult.startTime = time(0); // inexact state = &DerivationGoal::buildDone; started(); - return WaitForWorld{std::move(a.fds), false}; + return {{WaitForWorld{std::move(a.promise)}}}; }, - [&](HookReply::Postpone) -> std::optional<WorkResult> { + [&](HookReply::Postpone) -> std::optional<kj::Promise<Result<WorkResult>>> { /* Not now; wait until at least one child finishes or the wake-up timeout expires. */ if (!actLock) actLock = std::make_unique<Activity>(*logger, lvlTalkative, actBuildWaiting, fmt("waiting for a machine to build '%s'", Magenta(worker.store.printStorePath(drvPath)))); outputLocks.unlock(); - return WaitForAWhile{}; + return waitForAWhile(); }, - [&](HookReply::Decline) -> std::optional<WorkResult> { + [&](HookReply::Decline) -> std::optional<kj::Promise<Result<WorkResult>>> { /* We should do it ourselves. */ return std::nullopt; }, }, hookReply); if (result) { - return {std::move(*result)}; + return std::move(*result); } } @@ -977,6 +984,7 @@ kj::Promise<Result<Goal::WorkResult>> DerivationGoal::buildDone(bool inBuildSlot try { trace("build done"); + slotToken = {}; Finally releaseBuildUser([&](){ this->cleanupHookFinally(); }); cleanupPreChildKill(); @@ -992,9 +1000,6 @@ try { buildResult.timesBuilt++; buildResult.stopTime = time(0); - /* So the child is gone now. */ - worker.childTerminated(this); - /* Close the read side of the logger pipe. */ closeReadPipes(); @@ -1266,12 +1271,8 @@ HookReply DerivationGoal::tryBuildHook(bool inBuildSlot) /* Create the log file and pipe. */ Path logFile = openLogFile(); - std::set<int> fds; - fds.insert(hook->fromHook.get()); - fds.insert(hook->builderOut.get()); builderOutFD = &hook->builderOut; - - return HookReply::Accept{std::move(fds)}; + return HookReply::Accept{handleChildOutput()}; } @@ -1331,23 +1332,69 @@ void DerivationGoal::closeLogFile() } -Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data) +Goal::Finished DerivationGoal::tooMuchLogs() { - assert(builderOutFD); + killChild(); + return done( + BuildResult::LogLimitExceeded, {}, + Error("%s killed after writing more than %d bytes of log output", + getName(), settings.maxLogSize)); +} - auto tooMuchLogs = [&] { - killChild(); - return done( - BuildResult::LogLimitExceeded, {}, - Error("%s killed after writing more than %d bytes of log output", - getName(), settings.maxLogSize)); - }; +struct DerivationGoal::InputStream final : private kj::AsyncObject +{ + int fd; + kj::UnixEventPort::FdObserver observer; + + InputStream(kj::UnixEventPort & ep, int fd) + : fd(fd) + , observer(ep, fd, kj::UnixEventPort::FdObserver::OBSERVE_READ) + { + int flags = fcntl(fd, F_GETFL); + if (flags < 0) { + throw SysError("fcntl(F_GETFL) failed on fd %i", fd); + } + if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) { + throw SysError("fcntl(F_SETFL) failed on fd %i", fd); + } + } + + kj::Promise<std::string_view> read(kj::ArrayPtr<char> buffer) + { + const auto res = ::read(fd, buffer.begin(), buffer.size()); + // closing a pty endpoint causes EIO on the other endpoint. stock kj streams + // do not handle this and throw exceptions we can't ask for errno instead :( + // (we can't use `errno` either because kj may well have mangled it by now.) + if (res == 0 || (res == -1 && errno == EIO)) { + return std::string_view{}; + } + + KJ_NONBLOCKING_SYSCALL(res) {} + + if (res > 0) { + return std::string_view{buffer.begin(), static_cast<size_t>(res)}; + } + + return observer.whenBecomesReadable().then([this, buffer] { + return read(buffer); + }); + } +}; + +kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::handleBuilderOutput(InputStream & in) noexcept +try { + auto buf = kj::heapArray<char>(4096); + while (true) { + auto data = co_await in.read(buf); + lastChildActivity = worker.aio.provider->getTimer().now(); + + if (data.empty()) { + co_return result::success(); + } - // local & `ssh://`-builds are dealt with here. - if (fd == builderOutFD->get()) { logSize += data.size(); if (settings.maxLogSize && logSize > settings.maxLogSize) { - return tooMuchLogs(); + co_return tooMuchLogs(); } for (auto c : data) @@ -1362,10 +1409,22 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data } if (logSink) (*logSink)(data); - return StillAlive{}; } +} catch (...) { + co_return std::current_exception(); +} + +kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::handleHookOutput(InputStream & in) noexcept +try { + auto buf = kj::heapArray<char>(4096); + while (true) { + auto data = co_await in.read(buf); + lastChildActivity = worker.aio.provider->getTimer().now(); + + if (data.empty()) { + co_return result::success(); + } - if (hook && fd == hook->fromHook.get()) { for (auto c : data) if (c == '\n') { auto json = parseJSONMessage(currentHookLine); @@ -1381,7 +1440,7 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data (fields.size() > 0 ? fields[0].get<std::string>() : "") + "\n"; logSize += logLine.size(); if (settings.maxLogSize && logSize > settings.maxLogSize) { - return tooMuchLogs(); + co_return tooMuchLogs(); } (*logSink)(logLine); } else if (type == resSetPhase && ! fields.is_null()) { @@ -1405,16 +1464,83 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data } else currentHookLine += c; } - - return StillAlive{}; +} catch (...) { + co_return std::current_exception(); } +kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::handleChildOutput() noexcept +try { + assert(builderOutFD); + + auto builderIn = kj::heap<InputStream>(worker.aio.unixEventPort, builderOutFD->get()); + kj::Own<InputStream> hookIn; + if (hook) { + hookIn = kj::heap<InputStream>(worker.aio.unixEventPort, hook->fromHook.get()); + } + + auto handlers = handleChildStreams(*builderIn, hookIn.get()).attach(std::move(builderIn), std::move(hookIn)); -void DerivationGoal::handleEOF(int fd) + if (respectsTimeouts() && settings.buildTimeout != 0) { + handlers = handlers.exclusiveJoin( + worker.aio.provider->getTimer() + .afterDelay(settings.buildTimeout.get() * kj::SECONDS) + .then([this]() -> Outcome<void, Finished> { + return timedOut( + Error("%1% timed out after %2% seconds", name, settings.buildTimeout) + ); + }) + ); + } + + return handlers.then([this](auto r) -> Outcome<void, Finished> { + if (!currentLogLine.empty()) flushLine(); + return r; + }); +} catch (...) { + return {std::current_exception()}; +} + +kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::monitorForSilence() noexcept { - if (!currentLogLine.empty()) flushLine(); + while (true) { + const auto stash = lastChildActivity; + auto waitUntil = lastChildActivity + settings.maxSilentTime.get() * kj::SECONDS; + co_await worker.aio.provider->getTimer().atTime(waitUntil); + if (lastChildActivity == stash) { + co_return timedOut( + Error("%1% timed out after %2% seconds of silence", name, settings.maxSilentTime) + ); + } + } } +kj::Promise<Outcome<void, Goal::Finished>> +DerivationGoal::handleChildStreams(InputStream & builderIn, InputStream * hookIn) noexcept +{ + lastChildActivity = worker.aio.provider->getTimer().now(); + + auto handlers = kj::joinPromisesFailFast([&] { + kj::Vector<kj::Promise<Outcome<void, Finished>>> parts{2}; + + parts.add(handleBuilderOutput(builderIn)); + if (hookIn) { + parts.add(handleHookOutput(*hookIn)); + } + + return parts.releaseAsArray(); + }()); + + if (respectsTimeouts() && settings.maxSilentTime != 0) { + handlers = handlers.exclusiveJoin(monitorForSilence().then([](auto r) { + return kj::arr(std::move(r)); + })); + } + + for (auto r : co_await handlers) { + BOOST_OUTCOME_CO_TRYV(r); + } + co_return result::success(); +} void DerivationGoal::flushLine() { diff --git a/src/libstore/build/derivation-goal.hh b/src/libstore/build/derivation-goal.hh index 020388d5a..d60bb0b4c 100644 --- a/src/libstore/build/derivation-goal.hh +++ b/src/libstore/build/derivation-goal.hh @@ -8,6 +8,7 @@ #include "store-api.hh" #include "pathlocks.hh" #include "goal.hh" +#include <kj/time.h> namespace nix { @@ -17,7 +18,7 @@ struct HookInstance; struct HookReplyBase { struct [[nodiscard]] Accept { - std::set<int> fds; + kj::Promise<Outcome<void, Goal::Finished>> promise; }; struct [[nodiscard]] Decline {}; struct [[nodiscard]] Postpone {}; @@ -70,6 +71,8 @@ struct InitialOutput { */ struct DerivationGoal : public Goal { + struct InputStream; + /** * Whether to use an on-disk .drv file. */ @@ -242,11 +245,11 @@ struct DerivationGoal : public Goal BuildMode buildMode = bmNormal); virtual ~DerivationGoal() noexcept(false); - Finished timedOut(Error && ex) override; + Finished timedOut(Error && ex); std::string key() override; - kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept override; + kj::Promise<Result<WorkResult>> work() noexcept override; /** * Add wanted outputs to an already existing derivation goal. @@ -312,13 +315,19 @@ struct DerivationGoal : public Goal virtual void cleanupPostOutputsRegisteredModeCheck(); virtual void cleanupPostOutputsRegisteredModeNonCheck(); - /** - * Callback used by the worker to write to the log. - */ - WorkResult handleChildOutput(int fd, std::string_view data) override; - void handleEOF(int fd) override; +protected: + kj::TimePoint lastChildActivity = kj::minValue; + + kj::Promise<Outcome<void, Finished>> handleChildOutput() noexcept; + kj::Promise<Outcome<void, Finished>> + handleChildStreams(InputStream & builderIn, InputStream * hookIn) noexcept; + kj::Promise<Outcome<void, Finished>> handleBuilderOutput(InputStream & in) noexcept; + kj::Promise<Outcome<void, Finished>> handleHookOutput(InputStream & in) noexcept; + kj::Promise<Outcome<void, Finished>> monitorForSilence() noexcept; + Finished tooMuchLogs(); void flushLine(); +public: /** * Wrappers around the corresponding Store methods that first consult the * derivation. This is currently needed because when there is no drv file @@ -357,6 +366,11 @@ struct DerivationGoal : public Goal void waiteeDone(GoalPtr waitee) override; + virtual bool respectsTimeouts() + { + return false; + } + StorePathSet exportReferences(const StorePathSet & storePaths); JobCategory jobCategory() const override { diff --git a/src/libstore/build/drv-output-substitution-goal.cc b/src/libstore/build/drv-output-substitution-goal.cc index 7986123cc..80b2c4cfb 100644 --- a/src/libstore/build/drv-output-substitution-goal.cc +++ b/src/libstore/build/drv-output-substitution-goal.cc @@ -4,6 +4,9 @@ #include "worker.hh" #include "substitution-goal.hh" #include "signals.hh" +#include <kj/array.h> +#include <kj/async.h> +#include <kj/vector.h> namespace nix { @@ -42,7 +45,10 @@ try { trace("trying next substituter"); if (!inBuildSlot) { - return {WaitForSlot{}}; + return worker.substitutions.acquire().then([this](auto token) { + slotToken = std::move(token); + return work(); + }); } maintainRunningSubstitutions = worker.runningSubstitutions.addTemporarily(1); @@ -69,25 +75,28 @@ try { some other error occurs), so it must not touch `this`. So put the shared state in a separate refcounted object. */ downloadState = std::make_shared<DownloadState>(); - downloadState->outPipe.create(); + auto pipe = kj::newPromiseAndCrossThreadFulfiller<void>(); + downloadState->outPipe = kj::mv(pipe.fulfiller); downloadState->result = std::async(std::launch::async, [downloadState{downloadState}, id{id}, sub{sub}] { + Finally updateStats([&]() { downloadState->outPipe->fulfill(); }); ReceiveInterrupts receiveInterrupts; - Finally updateStats([&]() { downloadState->outPipe.writeSide.close(); }); return sub->queryRealisation(id); }); state = &DrvOutputSubstitutionGoal::realisationFetched; - return {WaitForWorld{{downloadState->outPipe.readSide.get()}, true}}; + return {WaitForWorld{ + pipe.promise.then([]() -> Outcome<void, Finished> { return result::success(); }) + }}; } catch (...) { return {std::current_exception()}; } kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::realisationFetched(bool inBuildSlot) noexcept try { - worker.childTerminated(this); maintainRunningSubstitutions.reset(); + slotToken = {}; try { outputInfo = downloadState->result.get(); @@ -100,7 +109,7 @@ try { return tryNext(inBuildSlot); } - WaitForGoals result; + kj::Vector<std::pair<GoalPtr, kj::Promise<void>>> dependencies; for (const auto & [depId, depPath] : outputInfo->dependentRealisations) { if (depId != id) { if (auto localOutputInfo = worker.store.queryRealisation(depId); @@ -116,17 +125,17 @@ try { ); return tryNext(inBuildSlot); } - result.goals.insert(worker.goalFactory().makeDrvOutputSubstitutionGoal(depId)); + dependencies.add(worker.goalFactory().makeDrvOutputSubstitutionGoal(depId)); } } - result.goals.insert(worker.goalFactory().makePathSubstitutionGoal(outputInfo->outPath)); + dependencies.add(worker.goalFactory().makePathSubstitutionGoal(outputInfo->outPath)); - if (result.goals.empty()) { + if (dependencies.empty()) { return outPathValid(inBuildSlot); } else { state = &DrvOutputSubstitutionGoal::outPathValid; - return {std::move(result)}; + return waitForGoals(dependencies.releaseAsArray()); } } catch (...) { return {std::current_exception()}; @@ -166,9 +175,9 @@ std::string DrvOutputSubstitutionGoal::key() return "a$" + std::string(id.to_string()); } -kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::work(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::work() noexcept { - return (this->*state)(inBuildSlot); + return (this->*state)(slotToken.valid()); } diff --git a/src/libstore/build/drv-output-substitution-goal.hh b/src/libstore/build/drv-output-substitution-goal.hh index f33196665..805b65bfa 100644 --- a/src/libstore/build/drv-output-substitution-goal.hh +++ b/src/libstore/build/drv-output-substitution-goal.hh @@ -45,7 +45,7 @@ class DrvOutputSubstitutionGoal : public Goal { struct DownloadState { - Pipe outPipe; + kj::Own<kj::CrossThreadPromiseFulfiller<void>> outPipe; std::future<std::shared_ptr<const Realisation>> result; }; @@ -74,11 +74,9 @@ public: kj::Promise<Result<WorkResult>> outPathValid(bool inBuildSlot) noexcept; kj::Promise<Result<WorkResult>> finished() noexcept; - Finished timedOut(Error && ex) override { abort(); }; - std::string key() override; - kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept override; + kj::Promise<Result<WorkResult>> work() noexcept override; JobCategory jobCategory() const override { return JobCategory::Substitution; diff --git a/src/libstore/build/entry-points.cc b/src/libstore/build/entry-points.cc index a0f18a02c..27c341295 100644 --- a/src/libstore/build/entry-points.cc +++ b/src/libstore/build/entry-points.cc @@ -17,9 +17,9 @@ void Store::buildPaths(const std::vector<DerivedPath> & reqs, BuildMode buildMod Worker worker(*this, evalStore ? *evalStore : *this, aio); auto goals = runWorker(worker, [&](GoalFactory & gf) { - Goals goals; + Worker::Targets goals; for (auto & br : reqs) - goals.insert(gf.makeGoal(br, buildMode)); + goals.emplace(gf.makeGoal(br, buildMode)); return goals; }); @@ -60,11 +60,11 @@ std::vector<KeyedBuildResult> Store::buildPathsWithResults( std::vector<std::pair<const DerivedPath &, GoalPtr>> state; auto goals = runWorker(worker, [&](GoalFactory & gf) { - Goals goals; + Worker::Targets goals; for (const auto & req : reqs) { auto goal = gf.makeGoal(req, buildMode); - goals.insert(goal); - state.push_back({req, goal}); + state.push_back({req, goal.first}); + goals.emplace(std::move(goal)); } return goals; }); @@ -84,8 +84,10 @@ BuildResult Store::buildDerivation(const StorePath & drvPath, const BasicDerivat Worker worker(*this, *this, aio); try { - auto goals = runWorker(worker, [&](GoalFactory & gf) -> Goals { - return Goals{gf.makeBasicDerivationGoal(drvPath, drv, OutputsSpec::All{}, buildMode)}; + auto goals = runWorker(worker, [&](GoalFactory & gf) { + Worker::Targets goals; + goals.emplace(gf.makeBasicDerivationGoal(drvPath, drv, OutputsSpec::All{}, buildMode)); + return goals; }); auto goal = *goals.begin(); return goal->buildResult.restrictTo(DerivedPath::Built { @@ -110,7 +112,9 @@ void Store::ensurePath(const StorePath & path) Worker worker(*this, *this, aio); auto goals = runWorker(worker, [&](GoalFactory & gf) { - return Goals{gf.makePathSubstitutionGoal(path)}; + Worker::Targets goals; + goals.emplace(gf.makePathSubstitutionGoal(path)); + return goals; }); auto goal = *goals.begin(); @@ -130,7 +134,9 @@ void Store::repairPath(const StorePath & path) Worker worker(*this, *this, aio); auto goals = runWorker(worker, [&](GoalFactory & gf) { - return Goals{gf.makePathSubstitutionGoal(path, Repair)}; + Worker::Targets goals; + goals.emplace(gf.makePathSubstitutionGoal(path, Repair)); + return goals; }); auto goal = *goals.begin(); @@ -140,14 +146,16 @@ void Store::repairPath(const StorePath & path) auto info = queryPathInfo(path); if (info->deriver && isValidPath(*info->deriver)) { worker.run([&](GoalFactory & gf) { - return Goals{gf.makeGoal( + Worker::Targets goals; + goals.emplace(gf.makeGoal( DerivedPath::Built{ .drvPath = makeConstantStorePathRef(*info->deriver), // FIXME: Should just build the specific output we need. .outputs = OutputsSpec::All{}, }, bmRepair - )}; + )); + return goals; }); } else throw Error(worker.failingExitStatus(), "cannot repair path '%s'", printStorePath(path)); diff --git a/src/libstore/build/goal.cc b/src/libstore/build/goal.cc index 82861ad2b..8a2f4ab35 100644 --- a/src/libstore/build/goal.cc +++ b/src/libstore/build/goal.cc @@ -1,4 +1,7 @@ #include "goal.hh" +#include "async-collect.hh" +#include "worker.hh" +#include <kj/time.h> namespace nix { @@ -15,4 +18,43 @@ void Goal::trace(std::string_view s) debug("%1%: %2%", name, s); } +kj::Promise<Result<Goal::WorkResult>> Goal::waitForAWhile() +try { + trace("wait for a while"); + /* If we are polling goals that are waiting for a lock, then wake + up after a few seconds at most. */ + co_await worker.aio.provider->getTimer().afterDelay(settings.pollInterval.get() * kj::SECONDS); + co_return ContinueImmediately{}; +} catch (...) { + co_return std::current_exception(); +} + +kj::Promise<Result<Goal::WorkResult>> +Goal::waitForGoals(kj::Array<std::pair<GoalPtr, kj::Promise<void>>> dependencies) noexcept +try { + auto left = dependencies.size(); + auto collectDeps = asyncCollect(std::move(dependencies)); + + while (auto item = co_await collectDeps.next()) { + left--; + auto & dep = *item; + + trace(fmt("waitee '%s' done; %d left", dep->name, left)); + + if (dep->exitCode != Goal::ecSuccess) ++nrFailed; + if (dep->exitCode == Goal::ecNoSubstituters) ++nrNoSubstituters; + if (dep->exitCode == Goal::ecIncompleteClosure) ++nrIncompleteClosure; + + waiteeDone(dep); + + if (dep->exitCode == ecFailed && !settings.keepGoing) { + co_return result::success(ContinueImmediately{}); + } + } + + co_return result::success(ContinueImmediately{}); +} catch (...) { + co_return result::failure(std::current_exception()); +} + } diff --git a/src/libstore/build/goal.hh b/src/libstore/build/goal.hh index 189505308..e7a500a00 100644 --- a/src/libstore/build/goal.hh +++ b/src/libstore/build/goal.hh @@ -1,10 +1,12 @@ #pragma once ///@file +#include "async-semaphore.hh" #include "result.hh" #include "types.hh" #include "store-api.hh" #include "build-result.hh" +#include <concepts> // IWYU pragma: keep #include <kj/async.h> namespace nix { @@ -70,17 +72,6 @@ struct Goal const bool isDependency; /** - * Goals that this goal is waiting for. - */ - Goals waitees; - - /** - * Goals waiting for this one to finish. Must use weak pointers - * here to prevent cycles. - */ - WeakGoals waiters; - - /** * Number of goals we are/were waiting for that have failed. */ size_t nrFailed = 0; @@ -112,18 +103,20 @@ struct Goal */ BuildResult buildResult; + // for use by Worker only. will go away once work() is a promise. + kj::Own<kj::PromiseFulfiller<void>> notify; + +protected: + AsyncSemaphore::Token slotToken; + public: + struct Finished; + struct [[nodiscard]] StillAlive {}; - struct [[nodiscard]] WaitForSlot {}; - struct [[nodiscard]] WaitForAWhile {}; struct [[nodiscard]] ContinueImmediately {}; - struct [[nodiscard]] WaitForGoals { - Goals goals; - }; struct [[nodiscard]] WaitForWorld { - std::set<int> fds; - bool inBuildSlot; + kj::Promise<Outcome<void, Finished>> promise; }; struct [[nodiscard]] Finished { ExitCode exitCode; @@ -137,10 +130,7 @@ public: struct [[nodiscard]] WorkResult : std::variant< StillAlive, - WaitForSlot, - WaitForAWhile, ContinueImmediately, - WaitForGoals, WaitForWorld, Finished> { @@ -148,6 +138,20 @@ public: using variant::variant; }; +protected: + kj::Promise<Result<WorkResult>> waitForAWhile(); + kj::Promise<Result<WorkResult>> + waitForGoals(kj::Array<std::pair<GoalPtr, kj::Promise<void>>> dependencies) noexcept; + + template<std::derived_from<Goal>... G> + kj::Promise<Result<Goal::WorkResult>> + waitForGoals(std::pair<std::shared_ptr<G>, kj::Promise<void>>... goals) noexcept + { + return waitForGoals(kj::arrOf<std::pair<GoalPtr, kj::Promise<void>>>(std::move(goals)...)); + } + +public: + /** * Exception containing an error message, if any. */ @@ -163,24 +167,10 @@ public: trace("goal destroyed"); } - virtual kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept = 0; + virtual kj::Promise<Result<WorkResult>> work() noexcept = 0; virtual void waiteeDone(GoalPtr waitee) { } - virtual WorkResult handleChildOutput(int fd, std::string_view data) - { - abort(); - } - - virtual void handleEOF(int fd) - { - } - - virtual bool respectsTimeouts() - { - return false; - } - void trace(std::string_view s); std::string getName() const @@ -188,13 +178,6 @@ public: return name; } - /** - * Callback in case of a timeout. It should wake up its waiters, - * get rid of any running child processes that are being monitored - * by the worker (important!), etc. - */ - virtual Finished timedOut(Error && ex) = 0; - virtual std::string key() = 0; virtual void cleanup() { } diff --git a/src/libstore/build/local-derivation-goal.cc b/src/libstore/build/local-derivation-goal.cc index f14d09652..2443cfb5a 100644 --- a/src/libstore/build/local-derivation-goal.cc +++ b/src/libstore/build/local-derivation-goal.cc @@ -121,8 +121,6 @@ LocalStore & LocalDerivationGoal::getLocalStore() void LocalDerivationGoal::killChild() { if (pid) { - worker.childTerminated(this); - /* If we're using a build user, then there is a tricky race condition: if we kill the build user before the child has done its setuid() to the build user uid, then it won't be @@ -158,8 +156,11 @@ try { if (!inBuildSlot) { state = &DerivationGoal::tryToBuild; outputLocks.unlock(); - if (0U != settings.maxBuildJobs) { - return {WaitForSlot{}}; + if (worker.localBuilds.capacity() > 0) { + return worker.localBuilds.acquire().then([this](auto token) { + slotToken = std::move(token); + return work(); + }); } if (getMachines().empty()) { throw Error( @@ -214,7 +215,7 @@ try { if (!actLock) actLock = std::make_unique<Activity>(*logger, lvlWarn, actBuildWaiting, fmt("waiting for a free build user ID for '%s'", Magenta(worker.store.printStorePath(drvPath)))); - return {WaitForAWhile{}}; + return waitForAWhile(); } } @@ -243,14 +244,14 @@ try { try { /* Okay, we have to build. */ - auto fds = startBuilder(); + auto promise = startBuilder(); /* This state will be reached when we get EOF on the child's log pipe. */ state = &DerivationGoal::buildDone; started(); - return {WaitForWorld{std::move(fds), true}}; + return {WaitForWorld{std::move(promise)}}; } catch (BuildError & e) { outputLocks.unlock(); @@ -390,7 +391,9 @@ void LocalDerivationGoal::cleanupPostOutputsRegisteredModeNonCheck() cleanupPostOutputsRegisteredModeCheck(); } -std::set<int> LocalDerivationGoal::startBuilder() +// NOTE this one isn't noexcept because it's called from places that expect +// exceptions to signal failure to launch. we should change this some time. +kj::Promise<Outcome<void, Goal::Finished>> LocalDerivationGoal::startBuilder() { if ((buildUser && buildUser->getUIDCount() != 1) #if __linux__ @@ -779,7 +782,7 @@ std::set<int> LocalDerivationGoal::startBuilder() msgs.push_back(std::move(msg)); } - return {builderOutPTY.get()}; + return handleChildOutput(); } diff --git a/src/libstore/build/local-derivation-goal.hh b/src/libstore/build/local-derivation-goal.hh index cd040bc15..6239129ab 100644 --- a/src/libstore/build/local-derivation-goal.hh +++ b/src/libstore/build/local-derivation-goal.hh @@ -218,7 +218,7 @@ struct LocalDerivationGoal : public DerivationGoal /** * Start building a derivation. */ - std::set<int> startBuilder(); + kj::Promise<Outcome<void, Finished>> startBuilder(); /** * Fill in the environment for the builder. diff --git a/src/libstore/build/substitution-goal.cc b/src/libstore/build/substitution-goal.cc index bd0ffcb9b..74a63ca21 100644 --- a/src/libstore/build/substitution-goal.cc +++ b/src/libstore/build/substitution-goal.cc @@ -3,6 +3,8 @@ #include "nar-info.hh" #include "signals.hh" #include "finally.hh" +#include <kj/array.h> +#include <kj/vector.h> namespace nix { @@ -45,9 +47,9 @@ Goal::Finished PathSubstitutionGoal::done( } -kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::work(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::work() noexcept { - return (this->*state)(inBuildSlot); + return (this->*state)(slotToken.valid()); } @@ -160,16 +162,16 @@ try { /* To maintain the closure invariant, we first have to realise the paths referenced by this one. */ - WaitForGoals result; + kj::Vector<std::pair<GoalPtr, kj::Promise<void>>> dependencies; for (auto & i : info->references) if (i != storePath) /* ignore self-references */ - result.goals.insert(worker.goalFactory().makePathSubstitutionGoal(i)); + dependencies.add(worker.goalFactory().makePathSubstitutionGoal(i)); - if (result.goals.empty()) {/* to prevent hang (no wake-up event) */ + if (dependencies.empty()) {/* to prevent hang (no wake-up event) */ return referencesValid(inBuildSlot); } else { state = &PathSubstitutionGoal::referencesValid; - return {std::move(result)}; + return waitForGoals(dependencies.releaseAsArray()); } } catch (...) { return {std::current_exception()}; @@ -203,21 +205,25 @@ try { trace("trying to run"); if (!inBuildSlot) { - return {WaitForSlot{}}; + return worker.substitutions.acquire().then([this](auto token) { + slotToken = std::move(token); + return work(); + }); } maintainRunningSubstitutions = worker.runningSubstitutions.addTemporarily(1); - outPipe.create(); + auto pipe = kj::newPromiseAndCrossThreadFulfiller<void>(); + outPipe = kj::mv(pipe.fulfiller); thr = std::async(std::launch::async, [this]() { + /* Wake up the worker loop when we're done. */ + Finally updateStats([this]() { outPipe->fulfill(); }); + auto & fetchPath = subPath ? *subPath : storePath; try { ReceiveInterrupts receiveInterrupts; - /* Wake up the worker loop when we're done. */ - Finally updateStats([this]() { outPipe.writeSide.close(); }); - Activity act(*logger, actSubstitute, Logger::Fields{worker.store.printStorePath(storePath), sub->getUri()}); PushActivity pact(act.id); @@ -234,7 +240,9 @@ try { }); state = &PathSubstitutionGoal::finished; - return {WaitForWorld{{outPipe.readSide.get()}, true}}; + return {WaitForWorld{ + pipe.promise.then([]() -> Outcome<void, Finished> { return result::success(); }) + }}; } catch (...) { return {std::current_exception()}; } @@ -244,9 +252,8 @@ kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::finished(bool inBuil try { trace("substitute finished"); - worker.childTerminated(this); - try { + slotToken = {}; thr.get(); } catch (std::exception & e) { printError(e.what()); @@ -288,22 +295,13 @@ try { } -Goal::WorkResult PathSubstitutionGoal::handleChildOutput(int fd, std::string_view data) -{ - return StillAlive{}; -} - - void PathSubstitutionGoal::cleanup() { try { if (thr.valid()) { // FIXME: signal worker thread to quit. thr.get(); - worker.childTerminated(this); } - - outPipe.close(); } catch (...) { ignoreException(); } diff --git a/src/libstore/build/substitution-goal.hh b/src/libstore/build/substitution-goal.hh index 3c97b19fd..cef3a4c5c 100644 --- a/src/libstore/build/substitution-goal.hh +++ b/src/libstore/build/substitution-goal.hh @@ -46,7 +46,7 @@ struct PathSubstitutionGoal : public Goal /** * Pipe for the substituter's standard output. */ - Pipe outPipe; + kj::Own<kj::CrossThreadPromiseFulfiller<void>> outPipe; /** * The substituter thread. @@ -90,8 +90,6 @@ public: ); ~PathSubstitutionGoal(); - Finished timedOut(Error && ex) override { abort(); }; - /** * We prepend "a$" to the key name to ensure substitution goals * happen before derivation goals. @@ -101,7 +99,7 @@ public: return "a$" + std::string(storePath.name()) + "$" + worker.store.printStorePath(storePath); } - kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept override; + kj::Promise<Result<WorkResult>> work() noexcept override; /** * The states. @@ -112,11 +110,6 @@ public: kj::Promise<Result<WorkResult>> tryToRun(bool inBuildSlot) noexcept; kj::Promise<Result<WorkResult>> finished(bool inBuildSlot) noexcept; - /** - * Callback used by the worker to write to the log. - */ - WorkResult handleChildOutput(int fd, std::string_view data) override; - /* Called by destructor, can't be overridden */ void cleanup() override final; diff --git a/src/libstore/build/worker.cc b/src/libstore/build/worker.cc index ee45c7e3f..68071a94c 100644 --- a/src/libstore/build/worker.cc +++ b/src/libstore/build/worker.cc @@ -7,10 +7,19 @@ #include "signals.hh" #include "hook-instance.hh" // IWYU pragma: keep -#include <poll.h> - namespace nix { +namespace { +struct ErrorHandler : kj::TaskSet::ErrorHandler +{ + void taskFailed(kj::Exception && e) override + { + printError("unexpected async failure in Worker: %s", kj::str(e).cStr()); + abort(); + } +} errorHandler; +} + Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio) : act(*logger, actRealise) , actDerivations(*logger, actBuilds) @@ -18,11 +27,13 @@ Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio) , store(store) , evalStore(evalStore) , aio(aio) + /* Make sure that we are always allowed to run at least one substitution. + This prevents infinite waiting. */ + , substitutions(std::max<unsigned>(1, settings.maxSubstitutionJobs)) + , localBuilds(settings.maxBuildJobs) + , children(errorHandler) { /* Debugging: prevent recursive workers. */ - nrLocalBuilds = 0; - nrSubstitutions = 0; - lastWokenUp = steady_time_point::min(); } @@ -33,6 +44,7 @@ Worker::~Worker() are in trouble, since goals may call childTerminated() etc. in their destructors). */ topGoals.clear(); + children.clear(); assert(expectedSubstitutions == 0); assert(expectedDownloadSize == 0); @@ -40,26 +52,28 @@ Worker::~Worker() } -std::shared_ptr<DerivationGoal> Worker::makeDerivationGoalCommon( +std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<void>> Worker::makeDerivationGoalCommon( const StorePath & drvPath, const OutputsSpec & wantedOutputs, std::function<std::shared_ptr<DerivationGoal>()> mkDrvGoal) { - std::weak_ptr<DerivationGoal> & goal_weak = derivationGoals[drvPath]; - std::shared_ptr<DerivationGoal> goal = goal_weak.lock(); + auto & goal_weak = derivationGoals[drvPath]; + std::shared_ptr<DerivationGoal> goal = goal_weak.goal.lock(); if (!goal) { goal = mkDrvGoal(); - goal_weak = goal; + goal->notify = std::move(goal_weak.fulfiller); + goal_weak.goal = goal; wakeUp(goal); } else { goal->addWantedOutputs(wantedOutputs); } - return goal; + return {goal, goal_weak.promise->addBranch()}; } -std::shared_ptr<DerivationGoal> Worker::makeDerivationGoal(const StorePath & drvPath, - const OutputsSpec & wantedOutputs, BuildMode buildMode) +std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<void>> Worker::makeDerivationGoal( + const StorePath & drvPath, const OutputsSpec & wantedOutputs, BuildMode buildMode +) { return makeDerivationGoalCommon( drvPath, @@ -77,8 +91,12 @@ std::shared_ptr<DerivationGoal> Worker::makeDerivationGoal(const StorePath & drv } -std::shared_ptr<DerivationGoal> Worker::makeBasicDerivationGoal(const StorePath & drvPath, - const BasicDerivation & drv, const OutputsSpec & wantedOutputs, BuildMode buildMode) +std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<void>> Worker::makeBasicDerivationGoal( + const StorePath & drvPath, + const BasicDerivation & drv, + const OutputsSpec & wantedOutputs, + BuildMode buildMode +) { return makeDerivationGoalCommon( drvPath, @@ -96,55 +114,63 @@ std::shared_ptr<DerivationGoal> Worker::makeBasicDerivationGoal(const StorePath } -std::shared_ptr<PathSubstitutionGoal> Worker::makePathSubstitutionGoal(const StorePath & path, RepairFlag repair, std::optional<ContentAddress> ca) +std::pair<std::shared_ptr<PathSubstitutionGoal>, kj::Promise<void>> +Worker::makePathSubstitutionGoal( + const StorePath & path, RepairFlag repair, std::optional<ContentAddress> ca +) { - std::weak_ptr<PathSubstitutionGoal> & goal_weak = substitutionGoals[path]; - auto goal = goal_weak.lock(); // FIXME + auto & goal_weak = substitutionGoals[path]; + auto goal = goal_weak.goal.lock(); // FIXME if (!goal) { goal = std::make_shared<PathSubstitutionGoal>(path, *this, running, repair, ca); - goal_weak = goal; + goal->notify = std::move(goal_weak.fulfiller); + goal_weak.goal = goal; wakeUp(goal); } - return goal; + return {goal, goal_weak.promise->addBranch()}; } -std::shared_ptr<DrvOutputSubstitutionGoal> Worker::makeDrvOutputSubstitutionGoal(const DrvOutput& id, RepairFlag repair, std::optional<ContentAddress> ca) +std::pair<std::shared_ptr<DrvOutputSubstitutionGoal>, kj::Promise<void>> +Worker::makeDrvOutputSubstitutionGoal( + const DrvOutput & id, RepairFlag repair, std::optional<ContentAddress> ca +) { - std::weak_ptr<DrvOutputSubstitutionGoal> & goal_weak = drvOutputSubstitutionGoals[id]; - auto goal = goal_weak.lock(); // FIXME + auto & goal_weak = drvOutputSubstitutionGoals[id]; + auto goal = goal_weak.goal.lock(); // FIXME if (!goal) { goal = std::make_shared<DrvOutputSubstitutionGoal>(id, *this, running, repair, ca); - goal_weak = goal; + goal->notify = std::move(goal_weak.fulfiller); + goal_weak.goal = goal; wakeUp(goal); } - return goal; + return {goal, goal_weak.promise->addBranch()}; } -GoalPtr Worker::makeGoal(const DerivedPath & req, BuildMode buildMode) +std::pair<GoalPtr, kj::Promise<void>> Worker::makeGoal(const DerivedPath & req, BuildMode buildMode) { return std::visit(overloaded { - [&](const DerivedPath::Built & bfd) -> GoalPtr { + [&](const DerivedPath::Built & bfd) -> std::pair<GoalPtr, kj::Promise<void>> { if (auto bop = std::get_if<DerivedPath::Opaque>(&*bfd.drvPath)) return makeDerivationGoal(bop->path, bfd.outputs, buildMode); else throw UnimplementedError("Building dynamic derivations in one shot is not yet implemented."); }, - [&](const DerivedPath::Opaque & bo) -> GoalPtr { + [&](const DerivedPath::Opaque & bo) -> std::pair<GoalPtr, kj::Promise<void>> { return makePathSubstitutionGoal(bo.path, buildMode == bmRepair ? Repair : NoRepair); }, }, req.raw()); } -template<typename K, typename G> -static void removeGoal(std::shared_ptr<G> goal, std::map<K, std::weak_ptr<G>> & goalMap) +template<typename G> +static void removeGoal(std::shared_ptr<G> goal, auto & goalMap) { /* !!! inefficient */ for (auto i = goalMap.begin(); i != goalMap.end(); ) - if (i->second.lock() == goal) { + if (i->second.goal.lock() == goal) { auto j = i; ++j; goalMap.erase(i); i = j; @@ -165,33 +191,8 @@ void Worker::goalFinished(GoalPtr goal, Goal::Finished & f) hashMismatch |= f.hashMismatch; checkMismatch |= f.checkMismatch; - for (auto & i : goal->waiters) { - if (GoalPtr waiting = i.lock()) { - assert(waiting->waitees.count(goal)); - waiting->waitees.erase(goal); - - waiting->trace(fmt("waitee '%s' done; %d left", goal->name, waiting->waitees.size())); - - if (f.exitCode != Goal::ecSuccess) ++waiting->nrFailed; - if (f.exitCode == Goal::ecNoSubstituters) ++waiting->nrNoSubstituters; - if (f.exitCode == Goal::ecIncompleteClosure) ++waiting->nrIncompleteClosure; - - if (waiting->waitees.empty() || (f.exitCode == Goal::ecFailed && !settings.keepGoing)) { - /* If we failed and keepGoing is not set, we remove all - remaining waitees. */ - for (auto & i : waiting->waitees) { - i->waiters.extract(waiting); - } - waiting->waitees.clear(); - - wakeUp(waiting); - } - - waiting->waiteeDone(goal); - } - } - goal->waiters.clear(); removeGoal(goal); + goal->notify->fulfill(); goal->cleanup(); } @@ -200,20 +201,23 @@ void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how) std::visit( overloaded{ [&](Goal::StillAlive) {}, - [&](Goal::WaitForSlot) { waitForBuildSlot(goal); }, - [&](Goal::WaitForAWhile) { waitForAWhile(goal); }, [&](Goal::ContinueImmediately) { wakeUp(goal); }, - [&](Goal::WaitForGoals & w) { - for (auto & dep : w.goals) { - goal->waitees.insert(dep); - dep->waiters.insert(goal); - } + [&](Goal::WaitForWorld & w) { + childStarted(goal, w.promise.then([](auto r) -> Result<Goal::WorkResult> { + if (r.has_value()) { + return {Goal::ContinueImmediately{}}; + } else if (r.has_error()) { + return {std::move(r).error()}; + } else { + return r.exception(); + } + })); }, - [&](Goal::WaitForWorld & w) { childStarted(goal, w.fds, w.inBuildSlot); }, [&](Goal::Finished & f) { goalFinished(goal, f); }, }, how ); + updateStatistics(); } void Worker::removeGoal(GoalPtr goal) @@ -244,80 +248,27 @@ void Worker::wakeUp(GoalPtr goal) } -void Worker::childStarted(GoalPtr goal, const std::set<int> & fds, - bool inBuildSlot) +void Worker::childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> promise) { - Child child; - child.goal = goal; - child.goal2 = goal.get(); - child.fds = fds; - child.timeStarted = child.lastOutput = steady_time_point::clock::now(); - child.inBuildSlot = inBuildSlot; - children.emplace_back(child); - if (inBuildSlot) { - switch (goal->jobCategory()) { - case JobCategory::Substitution: - nrSubstitutions++; - break; - case JobCategory::Build: - nrLocalBuilds++; - break; - default: - abort(); - } - } + children.add(promise + .then([this, goal](auto result) { + if (result.has_value()) { + handleWorkResult(goal, std::move(result.assume_value())); + } else { + childException = result.assume_error(); + } + }) + .attach(Finally{[this, goal] { + childTerminated(goal); + }})); } -void Worker::childTerminated(Goal * goal) +void Worker::childTerminated(GoalPtr goal) { - auto i = std::find_if(children.begin(), children.end(), - [&](const Child & child) { return child.goal2 == goal; }); - if (i == children.end()) return; - - if (i->inBuildSlot) { - switch (goal->jobCategory()) { - case JobCategory::Substitution: - assert(nrSubstitutions > 0); - nrSubstitutions--; - break; - case JobCategory::Build: - assert(nrLocalBuilds > 0); - nrLocalBuilds--; - break; - default: - abort(); - } + if (childFinished) { + childFinished->fulfill(); } - - children.erase(i); - - /* Wake up goals waiting for a build slot. */ - for (auto & j : wantingToBuild) { - GoalPtr goal = j.lock(); - if (goal) wakeUp(goal); - } - - wantingToBuild.clear(); -} - - -void Worker::waitForBuildSlot(GoalPtr goal) -{ - goal->trace("wait for build slot"); - bool isSubstitutionGoal = goal->jobCategory() == JobCategory::Substitution; - if ((!isSubstitutionGoal && nrLocalBuilds < settings.maxBuildJobs) || - (isSubstitutionGoal && nrSubstitutions < settings.maxSubstitutionJobs)) - wakeUp(goal); /* we can do it right away */ - else - wantingToBuild.insert(goal); -} - - -void Worker::waitForAWhile(GoalPtr goal) -{ - debug("wait for a while"); - waitingForAWhile.insert(goal); } @@ -342,7 +293,7 @@ void Worker::updateStatistics() } } -Goals Worker::run(std::function<Goals (GoalFactory &)> req) +std::vector<GoalPtr> Worker::run(std::function<Targets (GoalFactory &)> req) { auto _topGoals = req(goalFactory()); @@ -352,7 +303,10 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req) updateStatistics(); - topGoals = _topGoals; + topGoals.clear(); + for (auto & [goal, _promise] : _topGoals) { + topGoals.insert(goal); + } debug("entered goal loop"); @@ -375,13 +329,12 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req) awake.clear(); for (auto & goal : awake2) { checkInterrupt(); - /* Make sure that we are always allowed to run at least one substitution. - This prevents infinite waiting. */ - const bool inSlot = goal->jobCategory() == JobCategory::Substitution - ? nrSubstitutions < std::max(1U, (unsigned int) settings.maxSubstitutionJobs) - : nrLocalBuilds < settings.maxBuildJobs; - handleWorkResult(goal, goal->work(inSlot).wait(aio.waitScope).value()); - updateStatistics(); + auto result = goal->work(); + if (result.poll(aio.waitScope)) { + handleWorkResult(goal, result.wait(aio.waitScope).value()); + } else { + childStarted(goal, std::move(result)); + } if (topGoals.empty()) break; // stuff may have been cancelled } @@ -390,169 +343,46 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req) if (topGoals.empty()) break; /* Wait for input. */ - if (!children.empty() || !waitingForAWhile.empty()) + if (!children.isEmpty()) waitForInput(); else { assert(!awake.empty()); } + + if (childException) { + std::rethrow_exception(childException); + } } /* If --keep-going is not set, it's possible that the main goal exited while some of its subgoals were still active. But if --keep-going *is* set, then they must all be finished now. */ assert(!settings.keepGoing || awake.empty()); - assert(!settings.keepGoing || wantingToBuild.empty()); - assert(!settings.keepGoing || children.empty()); + assert(!settings.keepGoing || children.isEmpty()); - return _topGoals; + std::vector<GoalPtr> results; + for (auto & [i, _p] : _topGoals) { + results.push_back(i); + } + return results; } void Worker::waitForInput() { printMsg(lvlVomit, "waiting for children"); - /* Process output from the file descriptors attached to the - children, namely log output and output path creation commands. - We also use this to detect child termination: if we get EOF on - the logger pipe of a build, we assume that the builder has - terminated. */ - - bool useTimeout = false; - long timeout = 0; - auto before = steady_time_point::clock::now(); - - /* If we're monitoring for silence on stdout/stderr, or if there - is a build timeout, then wait for input until the first - deadline for any child. */ - auto nearest = steady_time_point::max(); // nearest deadline - if (settings.minFree.get() != 0) - // Periodicallty wake up to see if we need to run the garbage collector. - nearest = before + std::chrono::seconds(10); - for (auto & i : children) { - if (auto goal = i.goal.lock()) { - if (!goal->respectsTimeouts()) continue; - if (0 != settings.maxSilentTime) - nearest = std::min(nearest, i.lastOutput + std::chrono::seconds(settings.maxSilentTime)); - if (0 != settings.buildTimeout) - nearest = std::min(nearest, i.timeStarted + std::chrono::seconds(settings.buildTimeout)); - } - } - if (nearest != steady_time_point::max()) { - timeout = std::max(1L, (long) std::chrono::duration_cast<std::chrono::seconds>(nearest - before).count()); - useTimeout = true; - } - - /* If we are polling goals that are waiting for a lock, then wake - up after a few seconds at most. */ - if (!waitingForAWhile.empty()) { - useTimeout = true; - if (lastWokenUp == steady_time_point::min() || lastWokenUp > before) lastWokenUp = before; - timeout = std::max(1L, - (long) std::chrono::duration_cast<std::chrono::seconds>( - lastWokenUp + std::chrono::seconds(settings.pollInterval) - before).count()); - } else lastWokenUp = steady_time_point::min(); - - if (useTimeout) - vomit("sleeping %d seconds", timeout); - - /* Use select() to wait for the input side of any logger pipe to - become `available'. Note that `available' (i.e., non-blocking) - includes EOF. */ - std::vector<struct pollfd> pollStatus; - std::map<int, size_t> fdToPollStatus; - for (auto & i : children) { - for (auto & j : i.fds) { - pollStatus.push_back((struct pollfd) { .fd = j, .events = POLLIN }); - fdToPollStatus[j] = pollStatus.size() - 1; - } - } - - if (poll(pollStatus.data(), pollStatus.size(), - useTimeout ? timeout * 1000 : -1) == -1) { - if (errno == EINTR) return; - throw SysError("waiting for input"); - } + auto waitFor = [&]{ + auto pair = kj::newPromiseAndFulfiller<void>(); + this->childFinished = kj::mv(pair.fulfiller); + return kj::mv(pair.promise); + }(); - auto after = steady_time_point::clock::now(); - - /* Process all available file descriptors. FIXME: this is - O(children * fds). */ - decltype(children)::iterator i; - for (auto j = children.begin(); j != children.end(); j = i) { - i = std::next(j); - - checkInterrupt(); - - GoalPtr goal = j->goal.lock(); - assert(goal); - - if (!goal->exitCode.has_value() && - 0 != settings.maxSilentTime && - goal->respectsTimeouts() && - after - j->lastOutput >= std::chrono::seconds(settings.maxSilentTime)) - { - handleWorkResult( - goal, - goal->timedOut(Error( - "%1% timed out after %2% seconds of silence", - goal->getName(), - settings.maxSilentTime - )) - ); - continue; - } - - else if (!goal->exitCode.has_value() && - 0 != settings.buildTimeout && - goal->respectsTimeouts() && - after - j->timeStarted >= std::chrono::seconds(settings.buildTimeout)) - { - handleWorkResult( - goal, - goal->timedOut( - Error("%1% timed out after %2% seconds", goal->getName(), settings.buildTimeout) - ) - ); - continue; - } - - std::set<int> fds2(j->fds); - std::vector<unsigned char> buffer(4096); - for (auto & k : fds2) { - const auto fdPollStatusId = get(fdToPollStatus, k); - assert(fdPollStatusId); - assert(*fdPollStatusId < pollStatus.size()); - if (pollStatus.at(*fdPollStatusId).revents) { - ssize_t rd = ::read(k, buffer.data(), buffer.size()); - // FIXME: is there a cleaner way to handle pt close - // than EIO? Is this even standard? - if (rd == 0 || (rd == -1 && errno == EIO)) { - debug("%1%: got EOF", goal->getName()); - goal->handleEOF(k); - handleWorkResult(goal, Goal::ContinueImmediately{}); - j->fds.erase(k); - } else if (rd == -1) { - if (errno != EINTR) - throw SysError("%s: read failed", goal->getName()); - } else { - printMsg(lvlVomit, "%1%: read %2% bytes", - goal->getName(), rd); - std::string_view data(charptr_cast<char *>(buffer.data()), rd); - j->lastOutput = after; - handleWorkResult(goal, goal->handleChildOutput(k, data)); - } - } - } + if (settings.minFree.get() != 0) { + // Periodicallty wake up to see if we need to run the garbage collector. + waitFor = waitFor.exclusiveJoin(aio.provider->getTimer().afterDelay(10 * kj::SECONDS)); } - if (!waitingForAWhile.empty() && lastWokenUp + std::chrono::seconds(settings.pollInterval) <= after) { - lastWokenUp = after; - for (auto & i : waitingForAWhile) { - GoalPtr goal = i.lock(); - if (goal) wakeUp(goal); - } - waitingForAWhile.clear(); - } + waitFor.wait(aio.waitScope); } diff --git a/src/libstore/build/worker.hh b/src/libstore/build/worker.hh index 6735ea0b9..925d289bf 100644 --- a/src/libstore/build/worker.hh +++ b/src/libstore/build/worker.hh @@ -1,6 +1,7 @@ #pragma once ///@file +#include "async-semaphore.hh" #include "notifying-counter.hh" #include "types.hh" #include "lock.hh" @@ -21,34 +22,16 @@ class DrvOutputSubstitutionGoal; typedef std::chrono::time_point<std::chrono::steady_clock> steady_time_point; -/** - * A mapping used to remember for each child process to what goal it - * belongs, and file descriptors for receiving log data and output - * path creation commands. - */ -struct Child -{ - WeakGoalPtr goal; - Goal * goal2; // ugly hackery - std::set<int> fds; - bool inBuildSlot; - /** - * Time we last got output on stdout/stderr - */ - steady_time_point lastOutput; - steady_time_point timeStarted; -}; - /* Forward definition. */ struct HookInstance; class GoalFactory { public: - virtual std::shared_ptr<DerivationGoal> makeDerivationGoal( + virtual std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<void>> makeDerivationGoal( const StorePath & drvPath, const OutputsSpec & wantedOutputs, BuildMode buildMode = bmNormal ) = 0; - virtual std::shared_ptr<DerivationGoal> makeBasicDerivationGoal( + virtual std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<void>> makeBasicDerivationGoal( const StorePath & drvPath, const BasicDerivation & drv, const OutputsSpec & wantedOutputs, @@ -58,12 +41,14 @@ public: /** * @ref SubstitutionGoal "substitution goal" */ - virtual std::shared_ptr<PathSubstitutionGoal> makePathSubstitutionGoal( + virtual std::pair<std::shared_ptr<PathSubstitutionGoal>, kj::Promise<void>> + makePathSubstitutionGoal( const StorePath & storePath, RepairFlag repair = NoRepair, std::optional<ContentAddress> ca = std::nullopt ) = 0; - virtual std::shared_ptr<DrvOutputSubstitutionGoal> makeDrvOutputSubstitutionGoal( + virtual std::pair<std::shared_ptr<DrvOutputSubstitutionGoal>, kj::Promise<void>> + makeDrvOutputSubstitutionGoal( const DrvOutput & id, RepairFlag repair = NoRepair, std::optional<ContentAddress> ca = std::nullopt @@ -75,7 +60,8 @@ public: * It will be a `DerivationGoal` for a `DerivedPath::Built` or * a `SubstitutionGoal` for a `DerivedPath::Opaque`. */ - virtual GoalPtr makeGoal(const DerivedPath & req, BuildMode buildMode = bmNormal) = 0; + virtual std::pair<GoalPtr, kj::Promise<void>> + makeGoal(const DerivedPath & req, BuildMode buildMode = bmNormal) = 0; }; // elaborate hoax to let goals access factory methods while hiding them from the public @@ -111,44 +97,27 @@ private: */ WeakGoals awake; - /** - * Goals waiting for a build slot. - */ - WeakGoals wantingToBuild; - - /** - * Child processes currently running. - */ - std::list<Child> children; - - /** - * Number of build slots occupied. This includes local builds but does not - * include substitutions or remote builds via the build hook. - */ - unsigned int nrLocalBuilds; - - /** - * Number of substitution slots occupied. - */ - unsigned int nrSubstitutions; - + template<typename G> + struct CachedGoal + { + std::weak_ptr<G> goal; + kj::Own<kj::ForkedPromise<void>> promise; + kj::Own<kj::PromiseFulfiller<void>> fulfiller; + + CachedGoal() + { + auto pf = kj::newPromiseAndFulfiller<void>(); + promise = kj::heap(pf.promise.fork()); + fulfiller = std::move(pf.fulfiller); + } + }; /** * Maps used to prevent multiple instantiations of a goal for the * same derivation / path. */ - std::map<StorePath, std::weak_ptr<DerivationGoal>> derivationGoals; - std::map<StorePath, std::weak_ptr<PathSubstitutionGoal>> substitutionGoals; - std::map<DrvOutput, std::weak_ptr<DrvOutputSubstitutionGoal>> drvOutputSubstitutionGoals; - - /** - * Goals sleeping for a few seconds (polling a lock). - */ - WeakGoals waitingForAWhile; - - /** - * Last time the goals in `waitingForAWhile` where woken up. - */ - steady_time_point lastWokenUp; + std::map<StorePath, CachedGoal<DerivationGoal>> derivationGoals; + std::map<StorePath, CachedGoal<PathSubstitutionGoal>> substitutionGoals; + std::map<DrvOutput, CachedGoal<DrvOutputSubstitutionGoal>> drvOutputSubstitutionGoals; /** * Cache for pathContentsGood(). @@ -179,19 +148,7 @@ private: void goalFinished(GoalPtr goal, Goal::Finished & f); void handleWorkResult(GoalPtr goal, Goal::WorkResult how); - /** - * Put `goal` to sleep until a build slot becomes available (which - * might be right away). - */ - void waitForBuildSlot(GoalPtr goal); - - /** - * Wait for a few seconds and then retry this goal. Used when - * waiting for a lock held by another process. This kind of - * polling is inefficient, but POSIX doesn't really provide a way - * to wait for multiple locks in the main select() loop. - */ - void waitForAWhile(GoalPtr goal); + kj::Own<kj::PromiseFulfiller<void>> childFinished; /** * Wake up a goal (i.e., there is something for it to do). @@ -209,11 +166,14 @@ private: void removeGoal(GoalPtr goal); /** - * Registers a running child process. `inBuildSlot` means that - * the process counts towards the jobs limit. + * Registers a running child process. + */ + void childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> promise); + + /** + * Unregisters a running child process. */ - void childStarted(GoalPtr goal, const std::set<int> & fds, - bool inBuildSlot); + void childTerminated(GoalPtr goal); /** * Pass current stats counters to the logger for progress bar updates. @@ -239,7 +199,13 @@ public: Store & store; Store & evalStore; kj::AsyncIoContext & aio; + AsyncSemaphore substitutions, localBuilds; +private: + kj::TaskSet children; + std::exception_ptr childException; + +public: struct HookState { std::unique_ptr<HookInstance> instance; @@ -277,21 +243,31 @@ public: * @ref DerivationGoal "derivation goal" */ private: - std::shared_ptr<DerivationGoal> makeDerivationGoalCommon( + std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<void>> makeDerivationGoalCommon( const StorePath & drvPath, const OutputsSpec & wantedOutputs, std::function<std::shared_ptr<DerivationGoal>()> mkDrvGoal); - std::shared_ptr<DerivationGoal> makeDerivationGoal( + std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<void>> makeDerivationGoal( const StorePath & drvPath, const OutputsSpec & wantedOutputs, BuildMode buildMode = bmNormal) override; - std::shared_ptr<DerivationGoal> makeBasicDerivationGoal( + std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<void>> makeBasicDerivationGoal( const StorePath & drvPath, const BasicDerivation & drv, const OutputsSpec & wantedOutputs, BuildMode buildMode = bmNormal) override; /** * @ref SubstitutionGoal "substitution goal" */ - std::shared_ptr<PathSubstitutionGoal> makePathSubstitutionGoal(const StorePath & storePath, RepairFlag repair = NoRepair, std::optional<ContentAddress> ca = std::nullopt) override; - std::shared_ptr<DrvOutputSubstitutionGoal> makeDrvOutputSubstitutionGoal(const DrvOutput & id, RepairFlag repair = NoRepair, std::optional<ContentAddress> ca = std::nullopt) override; + std::pair<std::shared_ptr<PathSubstitutionGoal>, kj::Promise<void>> + makePathSubstitutionGoal( + const StorePath & storePath, + RepairFlag repair = NoRepair, + std::optional<ContentAddress> ca = std::nullopt + ) override; + std::pair<std::shared_ptr<DrvOutputSubstitutionGoal>, kj::Promise<void>> + makeDrvOutputSubstitutionGoal( + const DrvOutput & id, + RepairFlag repair = NoRepair, + std::optional<ContentAddress> ca = std::nullopt + ) override; /** * Make a goal corresponding to the `DerivedPath`. @@ -299,18 +275,16 @@ private: * It will be a `DerivationGoal` for a `DerivedPath::Built` or * a `SubstitutionGoal` for a `DerivedPath::Opaque`. */ - GoalPtr makeGoal(const DerivedPath & req, BuildMode buildMode = bmNormal) override; + std::pair<GoalPtr, kj::Promise<void>> + makeGoal(const DerivedPath & req, BuildMode buildMode = bmNormal) override; public: - /** - * Unregisters a running child process. - */ - void childTerminated(Goal * goal); + using Targets = std::map<GoalPtr, kj::Promise<void>>; /** * Loop until the specified top-level goals have finished. */ - Goals run(std::function<Goals (GoalFactory &)> req); + std::vector<GoalPtr> run(std::function<Targets (GoalFactory &)> req); /*** * The exit status in case of failure. diff --git a/src/nix-channel/meson.build b/src/nix-channel/meson.build index 952dfdb78..97b92d789 100644 --- a/src/nix-channel/meson.build +++ b/src/nix-channel/meson.build @@ -1,5 +1 @@ -configure_file( - input : 'unpack-channel.nix', - output : 'unpack-channel.nix', - copy : true, -) +fs.copyfile('unpack-channel.nix') |