aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/manual/meson.build10
-rw-r--r--meson.build9
-rw-r--r--misc/bash/meson.build7
-rw-r--r--misc/fish/meson.build7
-rw-r--r--misc/meson.build6
-rw-r--r--misc/zsh/meson.build7
-rw-r--r--scripts/meson.build7
-rw-r--r--src/libstore/build/derivation-goal.cc234
-rw-r--r--src/libstore/build/derivation-goal.hh30
-rw-r--r--src/libstore/build/drv-output-substitution-goal.cc33
-rw-r--r--src/libstore/build/drv-output-substitution-goal.hh6
-rw-r--r--src/libstore/build/entry-points.cc30
-rw-r--r--src/libstore/build/goal.cc42
-rw-r--r--src/libstore/build/goal.hh69
-rw-r--r--src/libstore/build/local-derivation-goal.cc21
-rw-r--r--src/libstore/build/local-derivation-goal.hh2
-rw-r--r--src/libstore/build/substitution-goal.cc44
-rw-r--r--src/libstore/build/substitution-goal.hh11
-rw-r--r--src/libstore/build/worker.cc398
-rw-r--r--src/libstore/build/worker.hh144
-rw-r--r--src/nix-channel/meson.build6
21 files changed, 542 insertions, 581 deletions
diff --git a/doc/manual/meson.build b/doc/manual/meson.build
index f53d41b5d..38aad55b5 100644
--- a/doc/manual/meson.build
+++ b/doc/manual/meson.build
@@ -126,20 +126,18 @@ manual = custom_target(
'manual',
'markdown',
],
+ install_dir : [
+ datadir / 'doc/nix',
+ false,
+ ],
depfile : 'manual.d',
env : {
'RUST_LOG': 'info',
'MDBOOK_SUBSTITUTE_SEARCH': meson.current_build_dir() / 'src',
},
)
-manual_html = manual[0]
manual_md = manual[1]
-install_subdir(
- manual_html.full_path(),
- install_dir : datadir / 'doc/nix',
-)
-
nix_nested_manpages = [
[ 'nix-env',
[
diff --git a/meson.build b/meson.build
index f89f5a016..ea2050b6b 100644
--- a/meson.build
+++ b/meson.build
@@ -47,6 +47,7 @@
# in the build directory.
project('lix', 'cpp', 'rust',
+ meson_version : '>=1.4.0',
version : run_command('bash', '-c', 'echo -n $(jq -r .version < ./version.json)$VERSION_SUFFIX', check : true).stdout().strip(),
default_options : [
'cpp_std=c++2a',
@@ -593,10 +594,10 @@ run_command(
)
if is_darwin
- configure_file(
- input : 'misc/launchd/org.nixos.nix-daemon.plist.in',
- output : 'org.nixos.nix-daemon.plist',
- copy : true,
+ fs.copyfile(
+ 'misc/launchd/org.nixos.nix-daemon.plist.in',
+ 'org.nixos.nix-daemon.plist',
+ install : true,
install_dir : prefix / 'Library/LaunchDaemons',
)
endif
diff --git a/misc/bash/meson.build b/misc/bash/meson.build
index 75acce2ea..178692536 100644
--- a/misc/bash/meson.build
+++ b/misc/bash/meson.build
@@ -1,8 +1,7 @@
-configure_file(
- input : 'completion.sh',
- output : 'nix',
+fs.copyfile(
+ 'completion.sh',
+ 'nix',
install : true,
install_dir : datadir / 'bash-completion/completions',
install_mode : 'rw-r--r--',
- copy : true,
)
diff --git a/misc/fish/meson.build b/misc/fish/meson.build
index d54de9a13..7f9cd0896 100644
--- a/misc/fish/meson.build
+++ b/misc/fish/meson.build
@@ -1,8 +1,7 @@
-configure_file(
- input : 'completion.fish',
- output : 'nix.fish',
+fs.copyfile(
+ 'completion.fish',
+ 'nix.fish',
install : true,
install_dir : datadir / 'fish/vendor_completions.d',
install_mode : 'rw-r--r--',
- copy : true,
)
diff --git a/misc/meson.build b/misc/meson.build
index bf3c157f7..4e2f6aacf 100644
--- a/misc/meson.build
+++ b/misc/meson.build
@@ -5,8 +5,4 @@ subdir('zsh')
subdir('systemd')
subdir('flake-registry')
-runinpty = configure_file(
- copy : true,
- input : meson.current_source_dir() / 'runinpty.py',
- output : 'runinpty.py',
-)
+runinpty = fs.copyfile('runinpty.py')
diff --git a/misc/zsh/meson.build b/misc/zsh/meson.build
index 8063a5cb8..bd388a31f 100644
--- a/misc/zsh/meson.build
+++ b/misc/zsh/meson.build
@@ -1,10 +1,9 @@
foreach script : [ [ 'completion.zsh', '_nix' ], [ 'run-help-nix' ] ]
- configure_file(
- input : script[0],
- output : script.get(1, script[0]),
+ fs.copyfile(
+ script[0],
+ script.get(1, script[0]),
install : true,
install_dir : datadir / 'zsh/site-functions',
install_mode : 'rw-r--r--',
- copy : true,
)
endforeach
diff --git a/scripts/meson.build b/scripts/meson.build
index c916c8efa..e35c6cbb0 100644
--- a/scripts/meson.build
+++ b/scripts/meson.build
@@ -8,12 +8,7 @@ configure_file(
}
)
-# https://github.com/mesonbuild/meson/issues/860
-configure_file(
- input : 'nix-profile.sh.in',
- output : 'nix-profile.sh.in',
- copy : true,
-)
+fs.copyfile('nix-profile.sh.in')
foreach rc : [ '.sh', '.fish', '-daemon.sh', '-daemon.fish' ]
configure_file(
diff --git a/src/libstore/build/derivation-goal.cc b/src/libstore/build/derivation-goal.cc
index 827c9f541..b8c4d278d 100644
--- a/src/libstore/build/derivation-goal.cc
+++ b/src/libstore/build/derivation-goal.cc
@@ -11,7 +11,13 @@
#include "drv-output-substitution-goal.hh"
#include "strings.hh"
+#include <boost/outcome/try.hpp>
#include <fstream>
+#include <kj/array.h>
+#include <kj/async-unix.h>
+#include <kj/async.h>
+#include <kj/debug.h>
+#include <kj/vector.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
@@ -131,9 +137,9 @@ Goal::Finished DerivationGoal::timedOut(Error && ex)
}
-kj::Promise<Result<Goal::WorkResult>> DerivationGoal::work(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DerivationGoal::work() noexcept
{
- return (this->*state)(inBuildSlot);
+ return (this->*state)(slotToken.valid());
}
void DerivationGoal::addWantedOutputs(const OutputsSpec & outputs)
@@ -170,7 +176,7 @@ try {
state = &DerivationGoal::loadDerivation;
- return {WaitForGoals{{worker.goalFactory().makePathSubstitutionGoal(drvPath)}}};
+ return waitForGoals(worker.goalFactory().makePathSubstitutionGoal(drvPath));
} catch (...) {
return {std::current_exception()};
}
@@ -269,13 +275,13 @@ try {
/* We are first going to try to create the invalid output paths
through substitutes. If that doesn't work, we'll build
them. */
- WaitForGoals result;
+ kj::Vector<std::pair<GoalPtr, kj::Promise<void>>> dependencies;
if (settings.useSubstitutes) {
if (parsedDrv->substitutesAllowed()) {
for (auto & [outputName, status] : initialOutputs) {
if (!status.wanted) continue;
if (!status.known)
- result.goals.insert(
+ dependencies.add(
worker.goalFactory().makeDrvOutputSubstitutionGoal(
DrvOutput{status.outputHash, outputName},
buildMode == bmRepair ? Repair : NoRepair
@@ -283,7 +289,7 @@ try {
);
else {
auto * cap = getDerivationCA(*drv);
- result.goals.insert(worker.goalFactory().makePathSubstitutionGoal(
+ dependencies.add(worker.goalFactory().makePathSubstitutionGoal(
status.known->path,
buildMode == bmRepair ? Repair : NoRepair,
cap ? std::optional { *cap } : std::nullopt));
@@ -294,11 +300,11 @@ try {
}
}
- if (result.goals.empty()) { /* to prevent hang (no wake-up event) */
+ if (dependencies.empty()) { /* to prevent hang (no wake-up event) */
return outputsSubstitutionTried(inBuildSlot);
} else {
state = &DerivationGoal::outputsSubstitutionTried;
- return {std::move(result)};
+ return waitForGoals(dependencies.releaseAsArray());
}
} catch (...) {
return {std::current_exception()};
@@ -380,7 +386,7 @@ try {
produced using a substitute. So we have to build instead. */
kj::Promise<Result<Goal::WorkResult>> DerivationGoal::gaveUpOnSubstitution(bool inBuildSlot) noexcept
try {
- WaitForGoals result;
+ kj::Vector<std::pair<GoalPtr, kj::Promise<void>>> dependencies;
/* At this point we are building all outputs, so if more are wanted there
is no need to restart. */
@@ -393,7 +399,7 @@ try {
addWaiteeDerivedPath = [&](ref<SingleDerivedPath> inputDrv, const DerivedPathMap<StringSet>::ChildNode & inputNode) {
if (!inputNode.value.empty())
- result.goals.insert(worker.goalFactory().makeGoal(
+ dependencies.add(worker.goalFactory().makeGoal(
DerivedPath::Built {
.drvPath = inputDrv,
.outputs = inputNode.value,
@@ -438,14 +444,14 @@ try {
if (!settings.useSubstitutes)
throw Error("dependency '%s' of '%s' does not exist, and substitution is disabled",
worker.store.printStorePath(i), worker.store.printStorePath(drvPath));
- result.goals.insert(worker.goalFactory().makePathSubstitutionGoal(i));
+ dependencies.add(worker.goalFactory().makePathSubstitutionGoal(i));
}
- if (result.goals.empty()) {/* to prevent hang (no wake-up event) */
+ if (dependencies.empty()) {/* to prevent hang (no wake-up event) */
return inputsRealised(inBuildSlot);
} else {
state = &DerivationGoal::inputsRealised;
- return {result};
+ return waitForGoals(dependencies.releaseAsArray());
}
} catch (...) {
return {std::current_exception()};
@@ -488,7 +494,7 @@ try {
}
/* Check each path (slow!). */
- WaitForGoals result;
+ kj::Vector<std::pair<GoalPtr, kj::Promise<void>>> dependencies;
for (auto & i : outputClosure) {
if (worker.pathContentsGood(i)) continue;
printError(
@@ -496,9 +502,9 @@ try {
worker.store.printStorePath(i), worker.store.printStorePath(drvPath));
auto drvPath2 = outputsToDrv.find(i);
if (drvPath2 == outputsToDrv.end())
- result.goals.insert(worker.goalFactory().makePathSubstitutionGoal(i, Repair));
+ dependencies.add(worker.goalFactory().makePathSubstitutionGoal(i, Repair));
else
- result.goals.insert(worker.goalFactory().makeGoal(
+ dependencies.add(worker.goalFactory().makeGoal(
DerivedPath::Built {
.drvPath = makeConstantStorePathRef(drvPath2->second),
.outputs = OutputsSpec::All { },
@@ -506,12 +512,12 @@ try {
bmRepair));
}
- if (result.goals.empty()) {
+ if (dependencies.empty()) {
return {done(BuildResult::AlreadyValid, assertPathValidity())};
}
state = &DerivationGoal::closureRepaired;
- return {result};
+ return waitForGoals(dependencies.releaseAsArray());
} catch (...) {
return {std::current_exception()};
}
@@ -611,11 +617,12 @@ try {
worker.store.printStorePath(pathResolved),
});
- resolvedDrvGoal = worker.goalFactory().makeDerivationGoal(
+ auto dependency = worker.goalFactory().makeDerivationGoal(
pathResolved, wantedOutputs, buildMode);
+ resolvedDrvGoal = dependency.first;
state = &DerivationGoal::resolvedFinished;
- return {WaitForGoals{{resolvedDrvGoal}}};
+ return waitForGoals(std::move(dependency));
}
std::function<void(const StorePath &, const DerivedPathMap<StringSet>::ChildNode &)> accumInputPaths;
@@ -733,7 +740,7 @@ try {
if (!actLock)
actLock = std::make_unique<Activity>(*logger, lvlWarn, actBuildWaiting,
fmt("waiting for lock on %s", Magenta(showPaths(lockFiles))));
- return {WaitForAWhile{}};
+ return waitForAWhile();
}
actLock.reset();
@@ -773,32 +780,32 @@ try {
auto hookReply = tryBuildHook(inBuildSlot);
auto result = std::visit(
overloaded{
- [&](HookReply::Accept & a) -> std::optional<WorkResult> {
+ [&](HookReply::Accept & a) -> std::optional<kj::Promise<Result<WorkResult>>> {
/* Yes, it has started doing so. Wait until we get
EOF from the hook. */
actLock.reset();
buildResult.startTime = time(0); // inexact
state = &DerivationGoal::buildDone;
started();
- return WaitForWorld{std::move(a.fds), false};
+ return {{WaitForWorld{std::move(a.promise)}}};
},
- [&](HookReply::Postpone) -> std::optional<WorkResult> {
+ [&](HookReply::Postpone) -> std::optional<kj::Promise<Result<WorkResult>>> {
/* Not now; wait until at least one child finishes or
the wake-up timeout expires. */
if (!actLock)
actLock = std::make_unique<Activity>(*logger, lvlTalkative, actBuildWaiting,
fmt("waiting for a machine to build '%s'", Magenta(worker.store.printStorePath(drvPath))));
outputLocks.unlock();
- return WaitForAWhile{};
+ return waitForAWhile();
},
- [&](HookReply::Decline) -> std::optional<WorkResult> {
+ [&](HookReply::Decline) -> std::optional<kj::Promise<Result<WorkResult>>> {
/* We should do it ourselves. */
return std::nullopt;
},
},
hookReply);
if (result) {
- return {std::move(*result)};
+ return std::move(*result);
}
}
@@ -977,6 +984,7 @@ kj::Promise<Result<Goal::WorkResult>> DerivationGoal::buildDone(bool inBuildSlot
try {
trace("build done");
+ slotToken = {};
Finally releaseBuildUser([&](){ this->cleanupHookFinally(); });
cleanupPreChildKill();
@@ -992,9 +1000,6 @@ try {
buildResult.timesBuilt++;
buildResult.stopTime = time(0);
- /* So the child is gone now. */
- worker.childTerminated(this);
-
/* Close the read side of the logger pipe. */
closeReadPipes();
@@ -1266,12 +1271,8 @@ HookReply DerivationGoal::tryBuildHook(bool inBuildSlot)
/* Create the log file and pipe. */
Path logFile = openLogFile();
- std::set<int> fds;
- fds.insert(hook->fromHook.get());
- fds.insert(hook->builderOut.get());
builderOutFD = &hook->builderOut;
-
- return HookReply::Accept{std::move(fds)};
+ return HookReply::Accept{handleChildOutput()};
}
@@ -1331,23 +1332,69 @@ void DerivationGoal::closeLogFile()
}
-Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data)
+Goal::Finished DerivationGoal::tooMuchLogs()
{
- assert(builderOutFD);
+ killChild();
+ return done(
+ BuildResult::LogLimitExceeded, {},
+ Error("%s killed after writing more than %d bytes of log output",
+ getName(), settings.maxLogSize));
+}
- auto tooMuchLogs = [&] {
- killChild();
- return done(
- BuildResult::LogLimitExceeded, {},
- Error("%s killed after writing more than %d bytes of log output",
- getName(), settings.maxLogSize));
- };
+struct DerivationGoal::InputStream final : private kj::AsyncObject
+{
+ int fd;
+ kj::UnixEventPort::FdObserver observer;
+
+ InputStream(kj::UnixEventPort & ep, int fd)
+ : fd(fd)
+ , observer(ep, fd, kj::UnixEventPort::FdObserver::OBSERVE_READ)
+ {
+ int flags = fcntl(fd, F_GETFL);
+ if (flags < 0) {
+ throw SysError("fcntl(F_GETFL) failed on fd %i", fd);
+ }
+ if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) {
+ throw SysError("fcntl(F_SETFL) failed on fd %i", fd);
+ }
+ }
+
+ kj::Promise<std::string_view> read(kj::ArrayPtr<char> buffer)
+ {
+ const auto res = ::read(fd, buffer.begin(), buffer.size());
+ // closing a pty endpoint causes EIO on the other endpoint. stock kj streams
+ // do not handle this and throw exceptions we can't ask for errno instead :(
+ // (we can't use `errno` either because kj may well have mangled it by now.)
+ if (res == 0 || (res == -1 && errno == EIO)) {
+ return std::string_view{};
+ }
+
+ KJ_NONBLOCKING_SYSCALL(res) {}
+
+ if (res > 0) {
+ return std::string_view{buffer.begin(), static_cast<size_t>(res)};
+ }
+
+ return observer.whenBecomesReadable().then([this, buffer] {
+ return read(buffer);
+ });
+ }
+};
+
+kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::handleBuilderOutput(InputStream & in) noexcept
+try {
+ auto buf = kj::heapArray<char>(4096);
+ while (true) {
+ auto data = co_await in.read(buf);
+ lastChildActivity = worker.aio.provider->getTimer().now();
+
+ if (data.empty()) {
+ co_return result::success();
+ }
- // local & `ssh://`-builds are dealt with here.
- if (fd == builderOutFD->get()) {
logSize += data.size();
if (settings.maxLogSize && logSize > settings.maxLogSize) {
- return tooMuchLogs();
+ co_return tooMuchLogs();
}
for (auto c : data)
@@ -1362,10 +1409,22 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data
}
if (logSink) (*logSink)(data);
- return StillAlive{};
}
+} catch (...) {
+ co_return std::current_exception();
+}
+
+kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::handleHookOutput(InputStream & in) noexcept
+try {
+ auto buf = kj::heapArray<char>(4096);
+ while (true) {
+ auto data = co_await in.read(buf);
+ lastChildActivity = worker.aio.provider->getTimer().now();
+
+ if (data.empty()) {
+ co_return result::success();
+ }
- if (hook && fd == hook->fromHook.get()) {
for (auto c : data)
if (c == '\n') {
auto json = parseJSONMessage(currentHookLine);
@@ -1381,7 +1440,7 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data
(fields.size() > 0 ? fields[0].get<std::string>() : "") + "\n";
logSize += logLine.size();
if (settings.maxLogSize && logSize > settings.maxLogSize) {
- return tooMuchLogs();
+ co_return tooMuchLogs();
}
(*logSink)(logLine);
} else if (type == resSetPhase && ! fields.is_null()) {
@@ -1405,16 +1464,83 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data
} else
currentHookLine += c;
}
-
- return StillAlive{};
+} catch (...) {
+ co_return std::current_exception();
}
+kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::handleChildOutput() noexcept
+try {
+ assert(builderOutFD);
+
+ auto builderIn = kj::heap<InputStream>(worker.aio.unixEventPort, builderOutFD->get());
+ kj::Own<InputStream> hookIn;
+ if (hook) {
+ hookIn = kj::heap<InputStream>(worker.aio.unixEventPort, hook->fromHook.get());
+ }
+
+ auto handlers = handleChildStreams(*builderIn, hookIn.get()).attach(std::move(builderIn), std::move(hookIn));
-void DerivationGoal::handleEOF(int fd)
+ if (respectsTimeouts() && settings.buildTimeout != 0) {
+ handlers = handlers.exclusiveJoin(
+ worker.aio.provider->getTimer()
+ .afterDelay(settings.buildTimeout.get() * kj::SECONDS)
+ .then([this]() -> Outcome<void, Finished> {
+ return timedOut(
+ Error("%1% timed out after %2% seconds", name, settings.buildTimeout)
+ );
+ })
+ );
+ }
+
+ return handlers.then([this](auto r) -> Outcome<void, Finished> {
+ if (!currentLogLine.empty()) flushLine();
+ return r;
+ });
+} catch (...) {
+ return {std::current_exception()};
+}
+
+kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::monitorForSilence() noexcept
{
- if (!currentLogLine.empty()) flushLine();
+ while (true) {
+ const auto stash = lastChildActivity;
+ auto waitUntil = lastChildActivity + settings.maxSilentTime.get() * kj::SECONDS;
+ co_await worker.aio.provider->getTimer().atTime(waitUntil);
+ if (lastChildActivity == stash) {
+ co_return timedOut(
+ Error("%1% timed out after %2% seconds of silence", name, settings.maxSilentTime)
+ );
+ }
+ }
}
+kj::Promise<Outcome<void, Goal::Finished>>
+DerivationGoal::handleChildStreams(InputStream & builderIn, InputStream * hookIn) noexcept
+{
+ lastChildActivity = worker.aio.provider->getTimer().now();
+
+ auto handlers = kj::joinPromisesFailFast([&] {
+ kj::Vector<kj::Promise<Outcome<void, Finished>>> parts{2};
+
+ parts.add(handleBuilderOutput(builderIn));
+ if (hookIn) {
+ parts.add(handleHookOutput(*hookIn));
+ }
+
+ return parts.releaseAsArray();
+ }());
+
+ if (respectsTimeouts() && settings.maxSilentTime != 0) {
+ handlers = handlers.exclusiveJoin(monitorForSilence().then([](auto r) {
+ return kj::arr(std::move(r));
+ }));
+ }
+
+ for (auto r : co_await handlers) {
+ BOOST_OUTCOME_CO_TRYV(r);
+ }
+ co_return result::success();
+}
void DerivationGoal::flushLine()
{
diff --git a/src/libstore/build/derivation-goal.hh b/src/libstore/build/derivation-goal.hh
index 020388d5a..d60bb0b4c 100644
--- a/src/libstore/build/derivation-goal.hh
+++ b/src/libstore/build/derivation-goal.hh
@@ -8,6 +8,7 @@
#include "store-api.hh"
#include "pathlocks.hh"
#include "goal.hh"
+#include <kj/time.h>
namespace nix {
@@ -17,7 +18,7 @@ struct HookInstance;
struct HookReplyBase {
struct [[nodiscard]] Accept {
- std::set<int> fds;
+ kj::Promise<Outcome<void, Goal::Finished>> promise;
};
struct [[nodiscard]] Decline {};
struct [[nodiscard]] Postpone {};
@@ -70,6 +71,8 @@ struct InitialOutput {
*/
struct DerivationGoal : public Goal
{
+ struct InputStream;
+
/**
* Whether to use an on-disk .drv file.
*/
@@ -242,11 +245,11 @@ struct DerivationGoal : public Goal
BuildMode buildMode = bmNormal);
virtual ~DerivationGoal() noexcept(false);
- Finished timedOut(Error && ex) override;
+ Finished timedOut(Error && ex);
std::string key() override;
- kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept override;
+ kj::Promise<Result<WorkResult>> work() noexcept override;
/**
* Add wanted outputs to an already existing derivation goal.
@@ -312,13 +315,19 @@ struct DerivationGoal : public Goal
virtual void cleanupPostOutputsRegisteredModeCheck();
virtual void cleanupPostOutputsRegisteredModeNonCheck();
- /**
- * Callback used by the worker to write to the log.
- */
- WorkResult handleChildOutput(int fd, std::string_view data) override;
- void handleEOF(int fd) override;
+protected:
+ kj::TimePoint lastChildActivity = kj::minValue;
+
+ kj::Promise<Outcome<void, Finished>> handleChildOutput() noexcept;
+ kj::Promise<Outcome<void, Finished>>
+ handleChildStreams(InputStream & builderIn, InputStream * hookIn) noexcept;
+ kj::Promise<Outcome<void, Finished>> handleBuilderOutput(InputStream & in) noexcept;
+ kj::Promise<Outcome<void, Finished>> handleHookOutput(InputStream & in) noexcept;
+ kj::Promise<Outcome<void, Finished>> monitorForSilence() noexcept;
+ Finished tooMuchLogs();
void flushLine();
+public:
/**
* Wrappers around the corresponding Store methods that first consult the
* derivation. This is currently needed because when there is no drv file
@@ -357,6 +366,11 @@ struct DerivationGoal : public Goal
void waiteeDone(GoalPtr waitee) override;
+ virtual bool respectsTimeouts()
+ {
+ return false;
+ }
+
StorePathSet exportReferences(const StorePathSet & storePaths);
JobCategory jobCategory() const override {
diff --git a/src/libstore/build/drv-output-substitution-goal.cc b/src/libstore/build/drv-output-substitution-goal.cc
index 7986123cc..80b2c4cfb 100644
--- a/src/libstore/build/drv-output-substitution-goal.cc
+++ b/src/libstore/build/drv-output-substitution-goal.cc
@@ -4,6 +4,9 @@
#include "worker.hh"
#include "substitution-goal.hh"
#include "signals.hh"
+#include <kj/array.h>
+#include <kj/async.h>
+#include <kj/vector.h>
namespace nix {
@@ -42,7 +45,10 @@ try {
trace("trying next substituter");
if (!inBuildSlot) {
- return {WaitForSlot{}};
+ return worker.substitutions.acquire().then([this](auto token) {
+ slotToken = std::move(token);
+ return work();
+ });
}
maintainRunningSubstitutions = worker.runningSubstitutions.addTemporarily(1);
@@ -69,25 +75,28 @@ try {
some other error occurs), so it must not touch `this`. So put
the shared state in a separate refcounted object. */
downloadState = std::make_shared<DownloadState>();
- downloadState->outPipe.create();
+ auto pipe = kj::newPromiseAndCrossThreadFulfiller<void>();
+ downloadState->outPipe = kj::mv(pipe.fulfiller);
downloadState->result =
std::async(std::launch::async, [downloadState{downloadState}, id{id}, sub{sub}] {
+ Finally updateStats([&]() { downloadState->outPipe->fulfill(); });
ReceiveInterrupts receiveInterrupts;
- Finally updateStats([&]() { downloadState->outPipe.writeSide.close(); });
return sub->queryRealisation(id);
});
state = &DrvOutputSubstitutionGoal::realisationFetched;
- return {WaitForWorld{{downloadState->outPipe.readSide.get()}, true}};
+ return {WaitForWorld{
+ pipe.promise.then([]() -> Outcome<void, Finished> { return result::success(); })
+ }};
} catch (...) {
return {std::current_exception()};
}
kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::realisationFetched(bool inBuildSlot) noexcept
try {
- worker.childTerminated(this);
maintainRunningSubstitutions.reset();
+ slotToken = {};
try {
outputInfo = downloadState->result.get();
@@ -100,7 +109,7 @@ try {
return tryNext(inBuildSlot);
}
- WaitForGoals result;
+ kj::Vector<std::pair<GoalPtr, kj::Promise<void>>> dependencies;
for (const auto & [depId, depPath] : outputInfo->dependentRealisations) {
if (depId != id) {
if (auto localOutputInfo = worker.store.queryRealisation(depId);
@@ -116,17 +125,17 @@ try {
);
return tryNext(inBuildSlot);
}
- result.goals.insert(worker.goalFactory().makeDrvOutputSubstitutionGoal(depId));
+ dependencies.add(worker.goalFactory().makeDrvOutputSubstitutionGoal(depId));
}
}
- result.goals.insert(worker.goalFactory().makePathSubstitutionGoal(outputInfo->outPath));
+ dependencies.add(worker.goalFactory().makePathSubstitutionGoal(outputInfo->outPath));
- if (result.goals.empty()) {
+ if (dependencies.empty()) {
return outPathValid(inBuildSlot);
} else {
state = &DrvOutputSubstitutionGoal::outPathValid;
- return {std::move(result)};
+ return waitForGoals(dependencies.releaseAsArray());
}
} catch (...) {
return {std::current_exception()};
@@ -166,9 +175,9 @@ std::string DrvOutputSubstitutionGoal::key()
return "a$" + std::string(id.to_string());
}
-kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::work(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::work() noexcept
{
- return (this->*state)(inBuildSlot);
+ return (this->*state)(slotToken.valid());
}
diff --git a/src/libstore/build/drv-output-substitution-goal.hh b/src/libstore/build/drv-output-substitution-goal.hh
index f33196665..805b65bfa 100644
--- a/src/libstore/build/drv-output-substitution-goal.hh
+++ b/src/libstore/build/drv-output-substitution-goal.hh
@@ -45,7 +45,7 @@ class DrvOutputSubstitutionGoal : public Goal {
struct DownloadState
{
- Pipe outPipe;
+ kj::Own<kj::CrossThreadPromiseFulfiller<void>> outPipe;
std::future<std::shared_ptr<const Realisation>> result;
};
@@ -74,11 +74,9 @@ public:
kj::Promise<Result<WorkResult>> outPathValid(bool inBuildSlot) noexcept;
kj::Promise<Result<WorkResult>> finished() noexcept;
- Finished timedOut(Error && ex) override { abort(); };
-
std::string key() override;
- kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept override;
+ kj::Promise<Result<WorkResult>> work() noexcept override;
JobCategory jobCategory() const override {
return JobCategory::Substitution;
diff --git a/src/libstore/build/entry-points.cc b/src/libstore/build/entry-points.cc
index a0f18a02c..27c341295 100644
--- a/src/libstore/build/entry-points.cc
+++ b/src/libstore/build/entry-points.cc
@@ -17,9 +17,9 @@ void Store::buildPaths(const std::vector<DerivedPath> & reqs, BuildMode buildMod
Worker worker(*this, evalStore ? *evalStore : *this, aio);
auto goals = runWorker(worker, [&](GoalFactory & gf) {
- Goals goals;
+ Worker::Targets goals;
for (auto & br : reqs)
- goals.insert(gf.makeGoal(br, buildMode));
+ goals.emplace(gf.makeGoal(br, buildMode));
return goals;
});
@@ -60,11 +60,11 @@ std::vector<KeyedBuildResult> Store::buildPathsWithResults(
std::vector<std::pair<const DerivedPath &, GoalPtr>> state;
auto goals = runWorker(worker, [&](GoalFactory & gf) {
- Goals goals;
+ Worker::Targets goals;
for (const auto & req : reqs) {
auto goal = gf.makeGoal(req, buildMode);
- goals.insert(goal);
- state.push_back({req, goal});
+ state.push_back({req, goal.first});
+ goals.emplace(std::move(goal));
}
return goals;
});
@@ -84,8 +84,10 @@ BuildResult Store::buildDerivation(const StorePath & drvPath, const BasicDerivat
Worker worker(*this, *this, aio);
try {
- auto goals = runWorker(worker, [&](GoalFactory & gf) -> Goals {
- return Goals{gf.makeBasicDerivationGoal(drvPath, drv, OutputsSpec::All{}, buildMode)};
+ auto goals = runWorker(worker, [&](GoalFactory & gf) {
+ Worker::Targets goals;
+ goals.emplace(gf.makeBasicDerivationGoal(drvPath, drv, OutputsSpec::All{}, buildMode));
+ return goals;
});
auto goal = *goals.begin();
return goal->buildResult.restrictTo(DerivedPath::Built {
@@ -110,7 +112,9 @@ void Store::ensurePath(const StorePath & path)
Worker worker(*this, *this, aio);
auto goals = runWorker(worker, [&](GoalFactory & gf) {
- return Goals{gf.makePathSubstitutionGoal(path)};
+ Worker::Targets goals;
+ goals.emplace(gf.makePathSubstitutionGoal(path));
+ return goals;
});
auto goal = *goals.begin();
@@ -130,7 +134,9 @@ void Store::repairPath(const StorePath & path)
Worker worker(*this, *this, aio);
auto goals = runWorker(worker, [&](GoalFactory & gf) {
- return Goals{gf.makePathSubstitutionGoal(path, Repair)};
+ Worker::Targets goals;
+ goals.emplace(gf.makePathSubstitutionGoal(path, Repair));
+ return goals;
});
auto goal = *goals.begin();
@@ -140,14 +146,16 @@ void Store::repairPath(const StorePath & path)
auto info = queryPathInfo(path);
if (info->deriver && isValidPath(*info->deriver)) {
worker.run([&](GoalFactory & gf) {
- return Goals{gf.makeGoal(
+ Worker::Targets goals;
+ goals.emplace(gf.makeGoal(
DerivedPath::Built{
.drvPath = makeConstantStorePathRef(*info->deriver),
// FIXME: Should just build the specific output we need.
.outputs = OutputsSpec::All{},
},
bmRepair
- )};
+ ));
+ return goals;
});
} else
throw Error(worker.failingExitStatus(), "cannot repair path '%s'", printStorePath(path));
diff --git a/src/libstore/build/goal.cc b/src/libstore/build/goal.cc
index 82861ad2b..8a2f4ab35 100644
--- a/src/libstore/build/goal.cc
+++ b/src/libstore/build/goal.cc
@@ -1,4 +1,7 @@
#include "goal.hh"
+#include "async-collect.hh"
+#include "worker.hh"
+#include <kj/time.h>
namespace nix {
@@ -15,4 +18,43 @@ void Goal::trace(std::string_view s)
debug("%1%: %2%", name, s);
}
+kj::Promise<Result<Goal::WorkResult>> Goal::waitForAWhile()
+try {
+ trace("wait for a while");
+ /* If we are polling goals that are waiting for a lock, then wake
+ up after a few seconds at most. */
+ co_await worker.aio.provider->getTimer().afterDelay(settings.pollInterval.get() * kj::SECONDS);
+ co_return ContinueImmediately{};
+} catch (...) {
+ co_return std::current_exception();
+}
+
+kj::Promise<Result<Goal::WorkResult>>
+Goal::waitForGoals(kj::Array<std::pair<GoalPtr, kj::Promise<void>>> dependencies) noexcept
+try {
+ auto left = dependencies.size();
+ auto collectDeps = asyncCollect(std::move(dependencies));
+
+ while (auto item = co_await collectDeps.next()) {
+ left--;
+ auto & dep = *item;
+
+ trace(fmt("waitee '%s' done; %d left", dep->name, left));
+
+ if (dep->exitCode != Goal::ecSuccess) ++nrFailed;
+ if (dep->exitCode == Goal::ecNoSubstituters) ++nrNoSubstituters;
+ if (dep->exitCode == Goal::ecIncompleteClosure) ++nrIncompleteClosure;
+
+ waiteeDone(dep);
+
+ if (dep->exitCode == ecFailed && !settings.keepGoing) {
+ co_return result::success(ContinueImmediately{});
+ }
+ }
+
+ co_return result::success(ContinueImmediately{});
+} catch (...) {
+ co_return result::failure(std::current_exception());
+}
+
}
diff --git a/src/libstore/build/goal.hh b/src/libstore/build/goal.hh
index 189505308..e7a500a00 100644
--- a/src/libstore/build/goal.hh
+++ b/src/libstore/build/goal.hh
@@ -1,10 +1,12 @@
#pragma once
///@file
+#include "async-semaphore.hh"
#include "result.hh"
#include "types.hh"
#include "store-api.hh"
#include "build-result.hh"
+#include <concepts> // IWYU pragma: keep
#include <kj/async.h>
namespace nix {
@@ -70,17 +72,6 @@ struct Goal
const bool isDependency;
/**
- * Goals that this goal is waiting for.
- */
- Goals waitees;
-
- /**
- * Goals waiting for this one to finish. Must use weak pointers
- * here to prevent cycles.
- */
- WeakGoals waiters;
-
- /**
* Number of goals we are/were waiting for that have failed.
*/
size_t nrFailed = 0;
@@ -112,18 +103,20 @@ struct Goal
*/
BuildResult buildResult;
+ // for use by Worker only. will go away once work() is a promise.
+ kj::Own<kj::PromiseFulfiller<void>> notify;
+
+protected:
+ AsyncSemaphore::Token slotToken;
+
public:
+ struct Finished;
+
struct [[nodiscard]] StillAlive {};
- struct [[nodiscard]] WaitForSlot {};
- struct [[nodiscard]] WaitForAWhile {};
struct [[nodiscard]] ContinueImmediately {};
- struct [[nodiscard]] WaitForGoals {
- Goals goals;
- };
struct [[nodiscard]] WaitForWorld {
- std::set<int> fds;
- bool inBuildSlot;
+ kj::Promise<Outcome<void, Finished>> promise;
};
struct [[nodiscard]] Finished {
ExitCode exitCode;
@@ -137,10 +130,7 @@ public:
struct [[nodiscard]] WorkResult : std::variant<
StillAlive,
- WaitForSlot,
- WaitForAWhile,
ContinueImmediately,
- WaitForGoals,
WaitForWorld,
Finished>
{
@@ -148,6 +138,20 @@ public:
using variant::variant;
};
+protected:
+ kj::Promise<Result<WorkResult>> waitForAWhile();
+ kj::Promise<Result<WorkResult>>
+ waitForGoals(kj::Array<std::pair<GoalPtr, kj::Promise<void>>> dependencies) noexcept;
+
+ template<std::derived_from<Goal>... G>
+ kj::Promise<Result<Goal::WorkResult>>
+ waitForGoals(std::pair<std::shared_ptr<G>, kj::Promise<void>>... goals) noexcept
+ {
+ return waitForGoals(kj::arrOf<std::pair<GoalPtr, kj::Promise<void>>>(std::move(goals)...));
+ }
+
+public:
+
/**
* Exception containing an error message, if any.
*/
@@ -163,24 +167,10 @@ public:
trace("goal destroyed");
}
- virtual kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept = 0;
+ virtual kj::Promise<Result<WorkResult>> work() noexcept = 0;
virtual void waiteeDone(GoalPtr waitee) { }
- virtual WorkResult handleChildOutput(int fd, std::string_view data)
- {
- abort();
- }
-
- virtual void handleEOF(int fd)
- {
- }
-
- virtual bool respectsTimeouts()
- {
- return false;
- }
-
void trace(std::string_view s);
std::string getName() const
@@ -188,13 +178,6 @@ public:
return name;
}
- /**
- * Callback in case of a timeout. It should wake up its waiters,
- * get rid of any running child processes that are being monitored
- * by the worker (important!), etc.
- */
- virtual Finished timedOut(Error && ex) = 0;
-
virtual std::string key() = 0;
virtual void cleanup() { }
diff --git a/src/libstore/build/local-derivation-goal.cc b/src/libstore/build/local-derivation-goal.cc
index f14d09652..2443cfb5a 100644
--- a/src/libstore/build/local-derivation-goal.cc
+++ b/src/libstore/build/local-derivation-goal.cc
@@ -121,8 +121,6 @@ LocalStore & LocalDerivationGoal::getLocalStore()
void LocalDerivationGoal::killChild()
{
if (pid) {
- worker.childTerminated(this);
-
/* If we're using a build user, then there is a tricky race
condition: if we kill the build user before the child has
done its setuid() to the build user uid, then it won't be
@@ -158,8 +156,11 @@ try {
if (!inBuildSlot) {
state = &DerivationGoal::tryToBuild;
outputLocks.unlock();
- if (0U != settings.maxBuildJobs) {
- return {WaitForSlot{}};
+ if (worker.localBuilds.capacity() > 0) {
+ return worker.localBuilds.acquire().then([this](auto token) {
+ slotToken = std::move(token);
+ return work();
+ });
}
if (getMachines().empty()) {
throw Error(
@@ -214,7 +215,7 @@ try {
if (!actLock)
actLock = std::make_unique<Activity>(*logger, lvlWarn, actBuildWaiting,
fmt("waiting for a free build user ID for '%s'", Magenta(worker.store.printStorePath(drvPath))));
- return {WaitForAWhile{}};
+ return waitForAWhile();
}
}
@@ -243,14 +244,14 @@ try {
try {
/* Okay, we have to build. */
- auto fds = startBuilder();
+ auto promise = startBuilder();
/* This state will be reached when we get EOF on the child's
log pipe. */
state = &DerivationGoal::buildDone;
started();
- return {WaitForWorld{std::move(fds), true}};
+ return {WaitForWorld{std::move(promise)}};
} catch (BuildError & e) {
outputLocks.unlock();
@@ -390,7 +391,9 @@ void LocalDerivationGoal::cleanupPostOutputsRegisteredModeNonCheck()
cleanupPostOutputsRegisteredModeCheck();
}
-std::set<int> LocalDerivationGoal::startBuilder()
+// NOTE this one isn't noexcept because it's called from places that expect
+// exceptions to signal failure to launch. we should change this some time.
+kj::Promise<Outcome<void, Goal::Finished>> LocalDerivationGoal::startBuilder()
{
if ((buildUser && buildUser->getUIDCount() != 1)
#if __linux__
@@ -779,7 +782,7 @@ std::set<int> LocalDerivationGoal::startBuilder()
msgs.push_back(std::move(msg));
}
- return {builderOutPTY.get()};
+ return handleChildOutput();
}
diff --git a/src/libstore/build/local-derivation-goal.hh b/src/libstore/build/local-derivation-goal.hh
index cd040bc15..6239129ab 100644
--- a/src/libstore/build/local-derivation-goal.hh
+++ b/src/libstore/build/local-derivation-goal.hh
@@ -218,7 +218,7 @@ struct LocalDerivationGoal : public DerivationGoal
/**
* Start building a derivation.
*/
- std::set<int> startBuilder();
+ kj::Promise<Outcome<void, Finished>> startBuilder();
/**
* Fill in the environment for the builder.
diff --git a/src/libstore/build/substitution-goal.cc b/src/libstore/build/substitution-goal.cc
index bd0ffcb9b..74a63ca21 100644
--- a/src/libstore/build/substitution-goal.cc
+++ b/src/libstore/build/substitution-goal.cc
@@ -3,6 +3,8 @@
#include "nar-info.hh"
#include "signals.hh"
#include "finally.hh"
+#include <kj/array.h>
+#include <kj/vector.h>
namespace nix {
@@ -45,9 +47,9 @@ Goal::Finished PathSubstitutionGoal::done(
}
-kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::work(bool inBuildSlot) noexcept
+kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::work() noexcept
{
- return (this->*state)(inBuildSlot);
+ return (this->*state)(slotToken.valid());
}
@@ -160,16 +162,16 @@ try {
/* To maintain the closure invariant, we first have to realise the
paths referenced by this one. */
- WaitForGoals result;
+ kj::Vector<std::pair<GoalPtr, kj::Promise<void>>> dependencies;
for (auto & i : info->references)
if (i != storePath) /* ignore self-references */
- result.goals.insert(worker.goalFactory().makePathSubstitutionGoal(i));
+ dependencies.add(worker.goalFactory().makePathSubstitutionGoal(i));
- if (result.goals.empty()) {/* to prevent hang (no wake-up event) */
+ if (dependencies.empty()) {/* to prevent hang (no wake-up event) */
return referencesValid(inBuildSlot);
} else {
state = &PathSubstitutionGoal::referencesValid;
- return {std::move(result)};
+ return waitForGoals(dependencies.releaseAsArray());
}
} catch (...) {
return {std::current_exception()};
@@ -203,21 +205,25 @@ try {
trace("trying to run");
if (!inBuildSlot) {
- return {WaitForSlot{}};
+ return worker.substitutions.acquire().then([this](auto token) {
+ slotToken = std::move(token);
+ return work();
+ });
}
maintainRunningSubstitutions = worker.runningSubstitutions.addTemporarily(1);
- outPipe.create();
+ auto pipe = kj::newPromiseAndCrossThreadFulfiller<void>();
+ outPipe = kj::mv(pipe.fulfiller);
thr = std::async(std::launch::async, [this]() {
+ /* Wake up the worker loop when we're done. */
+ Finally updateStats([this]() { outPipe->fulfill(); });
+
auto & fetchPath = subPath ? *subPath : storePath;
try {
ReceiveInterrupts receiveInterrupts;
- /* Wake up the worker loop when we're done. */
- Finally updateStats([this]() { outPipe.writeSide.close(); });
-
Activity act(*logger, actSubstitute, Logger::Fields{worker.store.printStorePath(storePath), sub->getUri()});
PushActivity pact(act.id);
@@ -234,7 +240,9 @@ try {
});
state = &PathSubstitutionGoal::finished;
- return {WaitForWorld{{outPipe.readSide.get()}, true}};
+ return {WaitForWorld{
+ pipe.promise.then([]() -> Outcome<void, Finished> { return result::success(); })
+ }};
} catch (...) {
return {std::current_exception()};
}
@@ -244,9 +252,8 @@ kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::finished(bool inBuil
try {
trace("substitute finished");
- worker.childTerminated(this);
-
try {
+ slotToken = {};
thr.get();
} catch (std::exception & e) {
printError(e.what());
@@ -288,22 +295,13 @@ try {
}
-Goal::WorkResult PathSubstitutionGoal::handleChildOutput(int fd, std::string_view data)
-{
- return StillAlive{};
-}
-
-
void PathSubstitutionGoal::cleanup()
{
try {
if (thr.valid()) {
// FIXME: signal worker thread to quit.
thr.get();
- worker.childTerminated(this);
}
-
- outPipe.close();
} catch (...) {
ignoreException();
}
diff --git a/src/libstore/build/substitution-goal.hh b/src/libstore/build/substitution-goal.hh
index 3c97b19fd..cef3a4c5c 100644
--- a/src/libstore/build/substitution-goal.hh
+++ b/src/libstore/build/substitution-goal.hh
@@ -46,7 +46,7 @@ struct PathSubstitutionGoal : public Goal
/**
* Pipe for the substituter's standard output.
*/
- Pipe outPipe;
+ kj::Own<kj::CrossThreadPromiseFulfiller<void>> outPipe;
/**
* The substituter thread.
@@ -90,8 +90,6 @@ public:
);
~PathSubstitutionGoal();
- Finished timedOut(Error && ex) override { abort(); };
-
/**
* We prepend "a$" to the key name to ensure substitution goals
* happen before derivation goals.
@@ -101,7 +99,7 @@ public:
return "a$" + std::string(storePath.name()) + "$" + worker.store.printStorePath(storePath);
}
- kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept override;
+ kj::Promise<Result<WorkResult>> work() noexcept override;
/**
* The states.
@@ -112,11 +110,6 @@ public:
kj::Promise<Result<WorkResult>> tryToRun(bool inBuildSlot) noexcept;
kj::Promise<Result<WorkResult>> finished(bool inBuildSlot) noexcept;
- /**
- * Callback used by the worker to write to the log.
- */
- WorkResult handleChildOutput(int fd, std::string_view data) override;
-
/* Called by destructor, can't be overridden */
void cleanup() override final;
diff --git a/src/libstore/build/worker.cc b/src/libstore/build/worker.cc
index ee45c7e3f..68071a94c 100644
--- a/src/libstore/build/worker.cc
+++ b/src/libstore/build/worker.cc
@@ -7,10 +7,19 @@
#include "signals.hh"
#include "hook-instance.hh" // IWYU pragma: keep
-#include <poll.h>
-
namespace nix {
+namespace {
+struct ErrorHandler : kj::TaskSet::ErrorHandler
+{
+ void taskFailed(kj::Exception && e) override
+ {
+ printError("unexpected async failure in Worker: %s", kj::str(e).cStr());
+ abort();
+ }
+} errorHandler;
+}
+
Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio)
: act(*logger, actRealise)
, actDerivations(*logger, actBuilds)
@@ -18,11 +27,13 @@ Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio)
, store(store)
, evalStore(evalStore)
, aio(aio)
+ /* Make sure that we are always allowed to run at least one substitution.
+ This prevents infinite waiting. */
+ , substitutions(std::max<unsigned>(1, settings.maxSubstitutionJobs))
+ , localBuilds(settings.maxBuildJobs)
+ , children(errorHandler)
{
/* Debugging: prevent recursive workers. */
- nrLocalBuilds = 0;
- nrSubstitutions = 0;
- lastWokenUp = steady_time_point::min();
}
@@ -33,6 +44,7 @@ Worker::~Worker()
are in trouble, since goals may call childTerminated() etc. in
their destructors). */
topGoals.clear();
+ children.clear();
assert(expectedSubstitutions == 0);
assert(expectedDownloadSize == 0);
@@ -40,26 +52,28 @@ Worker::~Worker()
}
-std::shared_ptr<DerivationGoal> Worker::makeDerivationGoalCommon(
+std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<void>> Worker::makeDerivationGoalCommon(
const StorePath & drvPath,
const OutputsSpec & wantedOutputs,
std::function<std::shared_ptr<DerivationGoal>()> mkDrvGoal)
{
- std::weak_ptr<DerivationGoal> & goal_weak = derivationGoals[drvPath];
- std::shared_ptr<DerivationGoal> goal = goal_weak.lock();
+ auto & goal_weak = derivationGoals[drvPath];
+ std::shared_ptr<DerivationGoal> goal = goal_weak.goal.lock();
if (!goal) {
goal = mkDrvGoal();
- goal_weak = goal;
+ goal->notify = std::move(goal_weak.fulfiller);
+ goal_weak.goal = goal;
wakeUp(goal);
} else {
goal->addWantedOutputs(wantedOutputs);
}
- return goal;
+ return {goal, goal_weak.promise->addBranch()};
}
-std::shared_ptr<DerivationGoal> Worker::makeDerivationGoal(const StorePath & drvPath,
- const OutputsSpec & wantedOutputs, BuildMode buildMode)
+std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<void>> Worker::makeDerivationGoal(
+ const StorePath & drvPath, const OutputsSpec & wantedOutputs, BuildMode buildMode
+)
{
return makeDerivationGoalCommon(
drvPath,
@@ -77,8 +91,12 @@ std::shared_ptr<DerivationGoal> Worker::makeDerivationGoal(const StorePath & drv
}
-std::shared_ptr<DerivationGoal> Worker::makeBasicDerivationGoal(const StorePath & drvPath,
- const BasicDerivation & drv, const OutputsSpec & wantedOutputs, BuildMode buildMode)
+std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<void>> Worker::makeBasicDerivationGoal(
+ const StorePath & drvPath,
+ const BasicDerivation & drv,
+ const OutputsSpec & wantedOutputs,
+ BuildMode buildMode
+)
{
return makeDerivationGoalCommon(
drvPath,
@@ -96,55 +114,63 @@ std::shared_ptr<DerivationGoal> Worker::makeBasicDerivationGoal(const StorePath
}
-std::shared_ptr<PathSubstitutionGoal> Worker::makePathSubstitutionGoal(const StorePath & path, RepairFlag repair, std::optional<ContentAddress> ca)
+std::pair<std::shared_ptr<PathSubstitutionGoal>, kj::Promise<void>>
+Worker::makePathSubstitutionGoal(
+ const StorePath & path, RepairFlag repair, std::optional<ContentAddress> ca
+)
{
- std::weak_ptr<PathSubstitutionGoal> & goal_weak = substitutionGoals[path];
- auto goal = goal_weak.lock(); // FIXME
+ auto & goal_weak = substitutionGoals[path];
+ auto goal = goal_weak.goal.lock(); // FIXME
if (!goal) {
goal = std::make_shared<PathSubstitutionGoal>(path, *this, running, repair, ca);
- goal_weak = goal;
+ goal->notify = std::move(goal_weak.fulfiller);
+ goal_weak.goal = goal;
wakeUp(goal);
}
- return goal;
+ return {goal, goal_weak.promise->addBranch()};
}
-std::shared_ptr<DrvOutputSubstitutionGoal> Worker::makeDrvOutputSubstitutionGoal(const DrvOutput& id, RepairFlag repair, std::optional<ContentAddress> ca)
+std::pair<std::shared_ptr<DrvOutputSubstitutionGoal>, kj::Promise<void>>
+Worker::makeDrvOutputSubstitutionGoal(
+ const DrvOutput & id, RepairFlag repair, std::optional<ContentAddress> ca
+)
{
- std::weak_ptr<DrvOutputSubstitutionGoal> & goal_weak = drvOutputSubstitutionGoals[id];
- auto goal = goal_weak.lock(); // FIXME
+ auto & goal_weak = drvOutputSubstitutionGoals[id];
+ auto goal = goal_weak.goal.lock(); // FIXME
if (!goal) {
goal = std::make_shared<DrvOutputSubstitutionGoal>(id, *this, running, repair, ca);
- goal_weak = goal;
+ goal->notify = std::move(goal_weak.fulfiller);
+ goal_weak.goal = goal;
wakeUp(goal);
}
- return goal;
+ return {goal, goal_weak.promise->addBranch()};
}
-GoalPtr Worker::makeGoal(const DerivedPath & req, BuildMode buildMode)
+std::pair<GoalPtr, kj::Promise<void>> Worker::makeGoal(const DerivedPath & req, BuildMode buildMode)
{
return std::visit(overloaded {
- [&](const DerivedPath::Built & bfd) -> GoalPtr {
+ [&](const DerivedPath::Built & bfd) -> std::pair<GoalPtr, kj::Promise<void>> {
if (auto bop = std::get_if<DerivedPath::Opaque>(&*bfd.drvPath))
return makeDerivationGoal(bop->path, bfd.outputs, buildMode);
else
throw UnimplementedError("Building dynamic derivations in one shot is not yet implemented.");
},
- [&](const DerivedPath::Opaque & bo) -> GoalPtr {
+ [&](const DerivedPath::Opaque & bo) -> std::pair<GoalPtr, kj::Promise<void>> {
return makePathSubstitutionGoal(bo.path, buildMode == bmRepair ? Repair : NoRepair);
},
}, req.raw());
}
-template<typename K, typename G>
-static void removeGoal(std::shared_ptr<G> goal, std::map<K, std::weak_ptr<G>> & goalMap)
+template<typename G>
+static void removeGoal(std::shared_ptr<G> goal, auto & goalMap)
{
/* !!! inefficient */
for (auto i = goalMap.begin();
i != goalMap.end(); )
- if (i->second.lock() == goal) {
+ if (i->second.goal.lock() == goal) {
auto j = i; ++j;
goalMap.erase(i);
i = j;
@@ -165,33 +191,8 @@ void Worker::goalFinished(GoalPtr goal, Goal::Finished & f)
hashMismatch |= f.hashMismatch;
checkMismatch |= f.checkMismatch;
- for (auto & i : goal->waiters) {
- if (GoalPtr waiting = i.lock()) {
- assert(waiting->waitees.count(goal));
- waiting->waitees.erase(goal);
-
- waiting->trace(fmt("waitee '%s' done; %d left", goal->name, waiting->waitees.size()));
-
- if (f.exitCode != Goal::ecSuccess) ++waiting->nrFailed;
- if (f.exitCode == Goal::ecNoSubstituters) ++waiting->nrNoSubstituters;
- if (f.exitCode == Goal::ecIncompleteClosure) ++waiting->nrIncompleteClosure;
-
- if (waiting->waitees.empty() || (f.exitCode == Goal::ecFailed && !settings.keepGoing)) {
- /* If we failed and keepGoing is not set, we remove all
- remaining waitees. */
- for (auto & i : waiting->waitees) {
- i->waiters.extract(waiting);
- }
- waiting->waitees.clear();
-
- wakeUp(waiting);
- }
-
- waiting->waiteeDone(goal);
- }
- }
- goal->waiters.clear();
removeGoal(goal);
+ goal->notify->fulfill();
goal->cleanup();
}
@@ -200,20 +201,23 @@ void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how)
std::visit(
overloaded{
[&](Goal::StillAlive) {},
- [&](Goal::WaitForSlot) { waitForBuildSlot(goal); },
- [&](Goal::WaitForAWhile) { waitForAWhile(goal); },
[&](Goal::ContinueImmediately) { wakeUp(goal); },
- [&](Goal::WaitForGoals & w) {
- for (auto & dep : w.goals) {
- goal->waitees.insert(dep);
- dep->waiters.insert(goal);
- }
+ [&](Goal::WaitForWorld & w) {
+ childStarted(goal, w.promise.then([](auto r) -> Result<Goal::WorkResult> {
+ if (r.has_value()) {
+ return {Goal::ContinueImmediately{}};
+ } else if (r.has_error()) {
+ return {std::move(r).error()};
+ } else {
+ return r.exception();
+ }
+ }));
},
- [&](Goal::WaitForWorld & w) { childStarted(goal, w.fds, w.inBuildSlot); },
[&](Goal::Finished & f) { goalFinished(goal, f); },
},
how
);
+ updateStatistics();
}
void Worker::removeGoal(GoalPtr goal)
@@ -244,80 +248,27 @@ void Worker::wakeUp(GoalPtr goal)
}
-void Worker::childStarted(GoalPtr goal, const std::set<int> & fds,
- bool inBuildSlot)
+void Worker::childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> promise)
{
- Child child;
- child.goal = goal;
- child.goal2 = goal.get();
- child.fds = fds;
- child.timeStarted = child.lastOutput = steady_time_point::clock::now();
- child.inBuildSlot = inBuildSlot;
- children.emplace_back(child);
- if (inBuildSlot) {
- switch (goal->jobCategory()) {
- case JobCategory::Substitution:
- nrSubstitutions++;
- break;
- case JobCategory::Build:
- nrLocalBuilds++;
- break;
- default:
- abort();
- }
- }
+ children.add(promise
+ .then([this, goal](auto result) {
+ if (result.has_value()) {
+ handleWorkResult(goal, std::move(result.assume_value()));
+ } else {
+ childException = result.assume_error();
+ }
+ })
+ .attach(Finally{[this, goal] {
+ childTerminated(goal);
+ }}));
}
-void Worker::childTerminated(Goal * goal)
+void Worker::childTerminated(GoalPtr goal)
{
- auto i = std::find_if(children.begin(), children.end(),
- [&](const Child & child) { return child.goal2 == goal; });
- if (i == children.end()) return;
-
- if (i->inBuildSlot) {
- switch (goal->jobCategory()) {
- case JobCategory::Substitution:
- assert(nrSubstitutions > 0);
- nrSubstitutions--;
- break;
- case JobCategory::Build:
- assert(nrLocalBuilds > 0);
- nrLocalBuilds--;
- break;
- default:
- abort();
- }
+ if (childFinished) {
+ childFinished->fulfill();
}
-
- children.erase(i);
-
- /* Wake up goals waiting for a build slot. */
- for (auto & j : wantingToBuild) {
- GoalPtr goal = j.lock();
- if (goal) wakeUp(goal);
- }
-
- wantingToBuild.clear();
-}
-
-
-void Worker::waitForBuildSlot(GoalPtr goal)
-{
- goal->trace("wait for build slot");
- bool isSubstitutionGoal = goal->jobCategory() == JobCategory::Substitution;
- if ((!isSubstitutionGoal && nrLocalBuilds < settings.maxBuildJobs) ||
- (isSubstitutionGoal && nrSubstitutions < settings.maxSubstitutionJobs))
- wakeUp(goal); /* we can do it right away */
- else
- wantingToBuild.insert(goal);
-}
-
-
-void Worker::waitForAWhile(GoalPtr goal)
-{
- debug("wait for a while");
- waitingForAWhile.insert(goal);
}
@@ -342,7 +293,7 @@ void Worker::updateStatistics()
}
}
-Goals Worker::run(std::function<Goals (GoalFactory &)> req)
+std::vector<GoalPtr> Worker::run(std::function<Targets (GoalFactory &)> req)
{
auto _topGoals = req(goalFactory());
@@ -352,7 +303,10 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req)
updateStatistics();
- topGoals = _topGoals;
+ topGoals.clear();
+ for (auto & [goal, _promise] : _topGoals) {
+ topGoals.insert(goal);
+ }
debug("entered goal loop");
@@ -375,13 +329,12 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req)
awake.clear();
for (auto & goal : awake2) {
checkInterrupt();
- /* Make sure that we are always allowed to run at least one substitution.
- This prevents infinite waiting. */
- const bool inSlot = goal->jobCategory() == JobCategory::Substitution
- ? nrSubstitutions < std::max(1U, (unsigned int) settings.maxSubstitutionJobs)
- : nrLocalBuilds < settings.maxBuildJobs;
- handleWorkResult(goal, goal->work(inSlot).wait(aio.waitScope).value());
- updateStatistics();
+ auto result = goal->work();
+ if (result.poll(aio.waitScope)) {
+ handleWorkResult(goal, result.wait(aio.waitScope).value());
+ } else {
+ childStarted(goal, std::move(result));
+ }
if (topGoals.empty()) break; // stuff may have been cancelled
}
@@ -390,169 +343,46 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req)
if (topGoals.empty()) break;
/* Wait for input. */
- if (!children.empty() || !waitingForAWhile.empty())
+ if (!children.isEmpty())
waitForInput();
else {
assert(!awake.empty());
}
+
+ if (childException) {
+ std::rethrow_exception(childException);
+ }
}
/* If --keep-going is not set, it's possible that the main goal
exited while some of its subgoals were still active. But if
--keep-going *is* set, then they must all be finished now. */
assert(!settings.keepGoing || awake.empty());
- assert(!settings.keepGoing || wantingToBuild.empty());
- assert(!settings.keepGoing || children.empty());
+ assert(!settings.keepGoing || children.isEmpty());
- return _topGoals;
+ std::vector<GoalPtr> results;
+ for (auto & [i, _p] : _topGoals) {
+ results.push_back(i);
+ }
+ return results;
}
void Worker::waitForInput()
{
printMsg(lvlVomit, "waiting for children");
- /* Process output from the file descriptors attached to the
- children, namely log output and output path creation commands.
- We also use this to detect child termination: if we get EOF on
- the logger pipe of a build, we assume that the builder has
- terminated. */
-
- bool useTimeout = false;
- long timeout = 0;
- auto before = steady_time_point::clock::now();
-
- /* If we're monitoring for silence on stdout/stderr, or if there
- is a build timeout, then wait for input until the first
- deadline for any child. */
- auto nearest = steady_time_point::max(); // nearest deadline
- if (settings.minFree.get() != 0)
- // Periodicallty wake up to see if we need to run the garbage collector.
- nearest = before + std::chrono::seconds(10);
- for (auto & i : children) {
- if (auto goal = i.goal.lock()) {
- if (!goal->respectsTimeouts()) continue;
- if (0 != settings.maxSilentTime)
- nearest = std::min(nearest, i.lastOutput + std::chrono::seconds(settings.maxSilentTime));
- if (0 != settings.buildTimeout)
- nearest = std::min(nearest, i.timeStarted + std::chrono::seconds(settings.buildTimeout));
- }
- }
- if (nearest != steady_time_point::max()) {
- timeout = std::max(1L, (long) std::chrono::duration_cast<std::chrono::seconds>(nearest - before).count());
- useTimeout = true;
- }
-
- /* If we are polling goals that are waiting for a lock, then wake
- up after a few seconds at most. */
- if (!waitingForAWhile.empty()) {
- useTimeout = true;
- if (lastWokenUp == steady_time_point::min() || lastWokenUp > before) lastWokenUp = before;
- timeout = std::max(1L,
- (long) std::chrono::duration_cast<std::chrono::seconds>(
- lastWokenUp + std::chrono::seconds(settings.pollInterval) - before).count());
- } else lastWokenUp = steady_time_point::min();
-
- if (useTimeout)
- vomit("sleeping %d seconds", timeout);
-
- /* Use select() to wait for the input side of any logger pipe to
- become `available'. Note that `available' (i.e., non-blocking)
- includes EOF. */
- std::vector<struct pollfd> pollStatus;
- std::map<int, size_t> fdToPollStatus;
- for (auto & i : children) {
- for (auto & j : i.fds) {
- pollStatus.push_back((struct pollfd) { .fd = j, .events = POLLIN });
- fdToPollStatus[j] = pollStatus.size() - 1;
- }
- }
-
- if (poll(pollStatus.data(), pollStatus.size(),
- useTimeout ? timeout * 1000 : -1) == -1) {
- if (errno == EINTR) return;
- throw SysError("waiting for input");
- }
+ auto waitFor = [&]{
+ auto pair = kj::newPromiseAndFulfiller<void>();
+ this->childFinished = kj::mv(pair.fulfiller);
+ return kj::mv(pair.promise);
+ }();
- auto after = steady_time_point::clock::now();
-
- /* Process all available file descriptors. FIXME: this is
- O(children * fds). */
- decltype(children)::iterator i;
- for (auto j = children.begin(); j != children.end(); j = i) {
- i = std::next(j);
-
- checkInterrupt();
-
- GoalPtr goal = j->goal.lock();
- assert(goal);
-
- if (!goal->exitCode.has_value() &&
- 0 != settings.maxSilentTime &&
- goal->respectsTimeouts() &&
- after - j->lastOutput >= std::chrono::seconds(settings.maxSilentTime))
- {
- handleWorkResult(
- goal,
- goal->timedOut(Error(
- "%1% timed out after %2% seconds of silence",
- goal->getName(),
- settings.maxSilentTime
- ))
- );
- continue;
- }
-
- else if (!goal->exitCode.has_value() &&
- 0 != settings.buildTimeout &&
- goal->respectsTimeouts() &&
- after - j->timeStarted >= std::chrono::seconds(settings.buildTimeout))
- {
- handleWorkResult(
- goal,
- goal->timedOut(
- Error("%1% timed out after %2% seconds", goal->getName(), settings.buildTimeout)
- )
- );
- continue;
- }
-
- std::set<int> fds2(j->fds);
- std::vector<unsigned char> buffer(4096);
- for (auto & k : fds2) {
- const auto fdPollStatusId = get(fdToPollStatus, k);
- assert(fdPollStatusId);
- assert(*fdPollStatusId < pollStatus.size());
- if (pollStatus.at(*fdPollStatusId).revents) {
- ssize_t rd = ::read(k, buffer.data(), buffer.size());
- // FIXME: is there a cleaner way to handle pt close
- // than EIO? Is this even standard?
- if (rd == 0 || (rd == -1 && errno == EIO)) {
- debug("%1%: got EOF", goal->getName());
- goal->handleEOF(k);
- handleWorkResult(goal, Goal::ContinueImmediately{});
- j->fds.erase(k);
- } else if (rd == -1) {
- if (errno != EINTR)
- throw SysError("%s: read failed", goal->getName());
- } else {
- printMsg(lvlVomit, "%1%: read %2% bytes",
- goal->getName(), rd);
- std::string_view data(charptr_cast<char *>(buffer.data()), rd);
- j->lastOutput = after;
- handleWorkResult(goal, goal->handleChildOutput(k, data));
- }
- }
- }
+ if (settings.minFree.get() != 0) {
+ // Periodicallty wake up to see if we need to run the garbage collector.
+ waitFor = waitFor.exclusiveJoin(aio.provider->getTimer().afterDelay(10 * kj::SECONDS));
}
- if (!waitingForAWhile.empty() && lastWokenUp + std::chrono::seconds(settings.pollInterval) <= after) {
- lastWokenUp = after;
- for (auto & i : waitingForAWhile) {
- GoalPtr goal = i.lock();
- if (goal) wakeUp(goal);
- }
- waitingForAWhile.clear();
- }
+ waitFor.wait(aio.waitScope);
}
diff --git a/src/libstore/build/worker.hh b/src/libstore/build/worker.hh
index 6735ea0b9..925d289bf 100644
--- a/src/libstore/build/worker.hh
+++ b/src/libstore/build/worker.hh
@@ -1,6 +1,7 @@
#pragma once
///@file
+#include "async-semaphore.hh"
#include "notifying-counter.hh"
#include "types.hh"
#include "lock.hh"
@@ -21,34 +22,16 @@ class DrvOutputSubstitutionGoal;
typedef std::chrono::time_point<std::chrono::steady_clock> steady_time_point;
-/**
- * A mapping used to remember for each child process to what goal it
- * belongs, and file descriptors for receiving log data and output
- * path creation commands.
- */
-struct Child
-{
- WeakGoalPtr goal;
- Goal * goal2; // ugly hackery
- std::set<int> fds;
- bool inBuildSlot;
- /**
- * Time we last got output on stdout/stderr
- */
- steady_time_point lastOutput;
- steady_time_point timeStarted;
-};
-
/* Forward definition. */
struct HookInstance;
class GoalFactory
{
public:
- virtual std::shared_ptr<DerivationGoal> makeDerivationGoal(
+ virtual std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<void>> makeDerivationGoal(
const StorePath & drvPath, const OutputsSpec & wantedOutputs, BuildMode buildMode = bmNormal
) = 0;
- virtual std::shared_ptr<DerivationGoal> makeBasicDerivationGoal(
+ virtual std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<void>> makeBasicDerivationGoal(
const StorePath & drvPath,
const BasicDerivation & drv,
const OutputsSpec & wantedOutputs,
@@ -58,12 +41,14 @@ public:
/**
* @ref SubstitutionGoal "substitution goal"
*/
- virtual std::shared_ptr<PathSubstitutionGoal> makePathSubstitutionGoal(
+ virtual std::pair<std::shared_ptr<PathSubstitutionGoal>, kj::Promise<void>>
+ makePathSubstitutionGoal(
const StorePath & storePath,
RepairFlag repair = NoRepair,
std::optional<ContentAddress> ca = std::nullopt
) = 0;
- virtual std::shared_ptr<DrvOutputSubstitutionGoal> makeDrvOutputSubstitutionGoal(
+ virtual std::pair<std::shared_ptr<DrvOutputSubstitutionGoal>, kj::Promise<void>>
+ makeDrvOutputSubstitutionGoal(
const DrvOutput & id,
RepairFlag repair = NoRepair,
std::optional<ContentAddress> ca = std::nullopt
@@ -75,7 +60,8 @@ public:
* It will be a `DerivationGoal` for a `DerivedPath::Built` or
* a `SubstitutionGoal` for a `DerivedPath::Opaque`.
*/
- virtual GoalPtr makeGoal(const DerivedPath & req, BuildMode buildMode = bmNormal) = 0;
+ virtual std::pair<GoalPtr, kj::Promise<void>>
+ makeGoal(const DerivedPath & req, BuildMode buildMode = bmNormal) = 0;
};
// elaborate hoax to let goals access factory methods while hiding them from the public
@@ -111,44 +97,27 @@ private:
*/
WeakGoals awake;
- /**
- * Goals waiting for a build slot.
- */
- WeakGoals wantingToBuild;
-
- /**
- * Child processes currently running.
- */
- std::list<Child> children;
-
- /**
- * Number of build slots occupied. This includes local builds but does not
- * include substitutions or remote builds via the build hook.
- */
- unsigned int nrLocalBuilds;
-
- /**
- * Number of substitution slots occupied.
- */
- unsigned int nrSubstitutions;
-
+ template<typename G>
+ struct CachedGoal
+ {
+ std::weak_ptr<G> goal;
+ kj::Own<kj::ForkedPromise<void>> promise;
+ kj::Own<kj::PromiseFulfiller<void>> fulfiller;
+
+ CachedGoal()
+ {
+ auto pf = kj::newPromiseAndFulfiller<void>();
+ promise = kj::heap(pf.promise.fork());
+ fulfiller = std::move(pf.fulfiller);
+ }
+ };
/**
* Maps used to prevent multiple instantiations of a goal for the
* same derivation / path.
*/
- std::map<StorePath, std::weak_ptr<DerivationGoal>> derivationGoals;
- std::map<StorePath, std::weak_ptr<PathSubstitutionGoal>> substitutionGoals;
- std::map<DrvOutput, std::weak_ptr<DrvOutputSubstitutionGoal>> drvOutputSubstitutionGoals;
-
- /**
- * Goals sleeping for a few seconds (polling a lock).
- */
- WeakGoals waitingForAWhile;
-
- /**
- * Last time the goals in `waitingForAWhile` where woken up.
- */
- steady_time_point lastWokenUp;
+ std::map<StorePath, CachedGoal<DerivationGoal>> derivationGoals;
+ std::map<StorePath, CachedGoal<PathSubstitutionGoal>> substitutionGoals;
+ std::map<DrvOutput, CachedGoal<DrvOutputSubstitutionGoal>> drvOutputSubstitutionGoals;
/**
* Cache for pathContentsGood().
@@ -179,19 +148,7 @@ private:
void goalFinished(GoalPtr goal, Goal::Finished & f);
void handleWorkResult(GoalPtr goal, Goal::WorkResult how);
- /**
- * Put `goal` to sleep until a build slot becomes available (which
- * might be right away).
- */
- void waitForBuildSlot(GoalPtr goal);
-
- /**
- * Wait for a few seconds and then retry this goal. Used when
- * waiting for a lock held by another process. This kind of
- * polling is inefficient, but POSIX doesn't really provide a way
- * to wait for multiple locks in the main select() loop.
- */
- void waitForAWhile(GoalPtr goal);
+ kj::Own<kj::PromiseFulfiller<void>> childFinished;
/**
* Wake up a goal (i.e., there is something for it to do).
@@ -209,11 +166,14 @@ private:
void removeGoal(GoalPtr goal);
/**
- * Registers a running child process. `inBuildSlot` means that
- * the process counts towards the jobs limit.
+ * Registers a running child process.
+ */
+ void childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> promise);
+
+ /**
+ * Unregisters a running child process.
*/
- void childStarted(GoalPtr goal, const std::set<int> & fds,
- bool inBuildSlot);
+ void childTerminated(GoalPtr goal);
/**
* Pass current stats counters to the logger for progress bar updates.
@@ -239,7 +199,13 @@ public:
Store & store;
Store & evalStore;
kj::AsyncIoContext & aio;
+ AsyncSemaphore substitutions, localBuilds;
+private:
+ kj::TaskSet children;
+ std::exception_ptr childException;
+
+public:
struct HookState {
std::unique_ptr<HookInstance> instance;
@@ -277,21 +243,31 @@ public:
* @ref DerivationGoal "derivation goal"
*/
private:
- std::shared_ptr<DerivationGoal> makeDerivationGoalCommon(
+ std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<void>> makeDerivationGoalCommon(
const StorePath & drvPath, const OutputsSpec & wantedOutputs,
std::function<std::shared_ptr<DerivationGoal>()> mkDrvGoal);
- std::shared_ptr<DerivationGoal> makeDerivationGoal(
+ std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<void>> makeDerivationGoal(
const StorePath & drvPath,
const OutputsSpec & wantedOutputs, BuildMode buildMode = bmNormal) override;
- std::shared_ptr<DerivationGoal> makeBasicDerivationGoal(
+ std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<void>> makeBasicDerivationGoal(
const StorePath & drvPath, const BasicDerivation & drv,
const OutputsSpec & wantedOutputs, BuildMode buildMode = bmNormal) override;
/**
* @ref SubstitutionGoal "substitution goal"
*/
- std::shared_ptr<PathSubstitutionGoal> makePathSubstitutionGoal(const StorePath & storePath, RepairFlag repair = NoRepair, std::optional<ContentAddress> ca = std::nullopt) override;
- std::shared_ptr<DrvOutputSubstitutionGoal> makeDrvOutputSubstitutionGoal(const DrvOutput & id, RepairFlag repair = NoRepair, std::optional<ContentAddress> ca = std::nullopt) override;
+ std::pair<std::shared_ptr<PathSubstitutionGoal>, kj::Promise<void>>
+ makePathSubstitutionGoal(
+ const StorePath & storePath,
+ RepairFlag repair = NoRepair,
+ std::optional<ContentAddress> ca = std::nullopt
+ ) override;
+ std::pair<std::shared_ptr<DrvOutputSubstitutionGoal>, kj::Promise<void>>
+ makeDrvOutputSubstitutionGoal(
+ const DrvOutput & id,
+ RepairFlag repair = NoRepair,
+ std::optional<ContentAddress> ca = std::nullopt
+ ) override;
/**
* Make a goal corresponding to the `DerivedPath`.
@@ -299,18 +275,16 @@ private:
* It will be a `DerivationGoal` for a `DerivedPath::Built` or
* a `SubstitutionGoal` for a `DerivedPath::Opaque`.
*/
- GoalPtr makeGoal(const DerivedPath & req, BuildMode buildMode = bmNormal) override;
+ std::pair<GoalPtr, kj::Promise<void>>
+ makeGoal(const DerivedPath & req, BuildMode buildMode = bmNormal) override;
public:
- /**
- * Unregisters a running child process.
- */
- void childTerminated(Goal * goal);
+ using Targets = std::map<GoalPtr, kj::Promise<void>>;
/**
* Loop until the specified top-level goals have finished.
*/
- Goals run(std::function<Goals (GoalFactory &)> req);
+ std::vector<GoalPtr> run(std::function<Targets (GoalFactory &)> req);
/***
* The exit status in case of failure.
diff --git a/src/nix-channel/meson.build b/src/nix-channel/meson.build
index 952dfdb78..97b92d789 100644
--- a/src/nix-channel/meson.build
+++ b/src/nix-channel/meson.build
@@ -1,5 +1 @@
-configure_file(
- input : 'unpack-channel.nix',
- output : 'unpack-channel.nix',
- copy : true,
-)
+fs.copyfile('unpack-channel.nix')