aboutsummaryrefslogtreecommitdiff
path: root/src/libstore/build
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstore/build')
-rw-r--r--src/libstore/build/derivation-goal.cc177
-rw-r--r--src/libstore/build/derivation-goal.hh28
-rw-r--r--src/libstore/build/drv-output-substitution-goal.cc10
-rw-r--r--src/libstore/build/drv-output-substitution-goal.hh4
-rw-r--r--src/libstore/build/goal.hh25
-rw-r--r--src/libstore/build/local-derivation-goal.cc12
-rw-r--r--src/libstore/build/local-derivation-goal.hh2
-rw-r--r--src/libstore/build/substitution-goal.cc24
-rw-r--r--src/libstore/build/substitution-goal.hh9
-rw-r--r--src/libstore/build/worker.cc193
-rw-r--r--src/libstore/build/worker.hh42
11 files changed, 272 insertions, 254 deletions
diff --git a/src/libstore/build/derivation-goal.cc b/src/libstore/build/derivation-goal.cc
index 827c9f541..f40611b31 100644
--- a/src/libstore/build/derivation-goal.cc
+++ b/src/libstore/build/derivation-goal.cc
@@ -11,7 +11,10 @@
#include "drv-output-substitution-goal.hh"
#include "strings.hh"
+#include <boost/outcome/try.hpp>
#include <fstream>
+#include <kj/async-unix.h>
+#include <kj/debug.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
@@ -780,7 +783,7 @@ try {
buildResult.startTime = time(0); // inexact
state = &DerivationGoal::buildDone;
started();
- return WaitForWorld{std::move(a.fds), false};
+ return WaitForWorld{std::move(a.promise), false};
},
[&](HookReply::Postpone) -> std::optional<WorkResult> {
/* Not now; wait until at least one child finishes or
@@ -992,9 +995,6 @@ try {
buildResult.timesBuilt++;
buildResult.stopTime = time(0);
- /* So the child is gone now. */
- worker.childTerminated(this);
-
/* Close the read side of the logger pipe. */
closeReadPipes();
@@ -1266,12 +1266,8 @@ HookReply DerivationGoal::tryBuildHook(bool inBuildSlot)
/* Create the log file and pipe. */
Path logFile = openLogFile();
- std::set<int> fds;
- fds.insert(hook->fromHook.get());
- fds.insert(hook->builderOut.get());
builderOutFD = &hook->builderOut;
-
- return HookReply::Accept{std::move(fds)};
+ return HookReply::Accept{handleChildOutput()};
}
@@ -1331,23 +1327,69 @@ void DerivationGoal::closeLogFile()
}
-Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data)
+Goal::Finished DerivationGoal::tooMuchLogs()
{
- assert(builderOutFD);
+ killChild();
+ return done(
+ BuildResult::LogLimitExceeded, {},
+ Error("%s killed after writing more than %d bytes of log output",
+ getName(), settings.maxLogSize));
+}
- auto tooMuchLogs = [&] {
- killChild();
- return done(
- BuildResult::LogLimitExceeded, {},
- Error("%s killed after writing more than %d bytes of log output",
- getName(), settings.maxLogSize));
- };
+struct DerivationGoal::InputStream final : private kj::AsyncObject
+{
+ int fd;
+ kj::UnixEventPort::FdObserver observer;
+
+ InputStream(kj::UnixEventPort & ep, int fd)
+ : fd(fd)
+ , observer(ep, fd, kj::UnixEventPort::FdObserver::OBSERVE_READ)
+ {
+ int flags = fcntl(fd, F_GETFL);
+ if (flags < 0) {
+ throw SysError("fcntl(F_GETFL) failed on fd %i", fd);
+ }
+ if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) {
+ throw SysError("fcntl(F_SETFL) failed on fd %i", fd);
+ }
+ }
+
+ kj::Promise<std::string_view> read(kj::ArrayPtr<char> buffer)
+ {
+ const auto res = ::read(fd, buffer.begin(), buffer.size());
+ // closing a pty endpoint causes EIO on the other endpoint. stock kj streams
+ // do not handle this and throw exceptions we can't ask for errno instead :(
+ // (we can't use `errno` either because kj may well have mangled it by now.)
+ if (res == 0 || (res == -1 && errno == EIO)) {
+ return std::string_view{};
+ }
+
+ KJ_NONBLOCKING_SYSCALL(res) {}
+
+ if (res > 0) {
+ return std::string_view{buffer.begin(), static_cast<size_t>(res)};
+ }
+
+ return observer.whenBecomesReadable().then([this, buffer] {
+ return read(buffer);
+ });
+ }
+};
+
+kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::handleBuilderOutput(InputStream & in) noexcept
+try {
+ auto buf = kj::heapArray<char>(4096);
+ while (true) {
+ auto data = co_await in.read(buf);
+ lastChildActivity = worker.aio.provider->getTimer().now();
+
+ if (data.empty()) {
+ co_return result::success();
+ }
- // local & `ssh://`-builds are dealt with here.
- if (fd == builderOutFD->get()) {
logSize += data.size();
if (settings.maxLogSize && logSize > settings.maxLogSize) {
- return tooMuchLogs();
+ co_return tooMuchLogs();
}
for (auto c : data)
@@ -1362,10 +1404,22 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data
}
if (logSink) (*logSink)(data);
- return StillAlive{};
}
+} catch (...) {
+ co_return std::current_exception();
+}
+
+kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::handleHookOutput(InputStream & in) noexcept
+try {
+ auto buf = kj::heapArray<char>(4096);
+ while (true) {
+ auto data = co_await in.read(buf);
+ lastChildActivity = worker.aio.provider->getTimer().now();
+
+ if (data.empty()) {
+ co_return result::success();
+ }
- if (hook && fd == hook->fromHook.get()) {
for (auto c : data)
if (c == '\n') {
auto json = parseJSONMessage(currentHookLine);
@@ -1381,7 +1435,7 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data
(fields.size() > 0 ? fields[0].get<std::string>() : "") + "\n";
logSize += logLine.size();
if (settings.maxLogSize && logSize > settings.maxLogSize) {
- return tooMuchLogs();
+ co_return tooMuchLogs();
}
(*logSink)(logLine);
} else if (type == resSetPhase && ! fields.is_null()) {
@@ -1405,16 +1459,83 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data
} else
currentHookLine += c;
}
-
- return StillAlive{};
+} catch (...) {
+ co_return std::current_exception();
}
+kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::handleChildOutput() noexcept
+try {
+ assert(builderOutFD);
+
+ auto builderIn = kj::heap<InputStream>(worker.aio.unixEventPort, builderOutFD->get());
+ kj::Own<InputStream> hookIn;
+ if (hook) {
+ hookIn = kj::heap<InputStream>(worker.aio.unixEventPort, hook->fromHook.get());
+ }
+
+ auto handlers = handleChildStreams(*builderIn, hookIn.get()).attach(std::move(builderIn), std::move(hookIn));
-void DerivationGoal::handleEOF(int fd)
+ if (respectsTimeouts() && settings.buildTimeout != 0) {
+ handlers = handlers.exclusiveJoin(
+ worker.aio.provider->getTimer()
+ .afterDelay(settings.buildTimeout.get() * kj::SECONDS)
+ .then([this]() -> Outcome<void, Finished> {
+ return timedOut(
+ Error("%1% timed out after %2% seconds", name, settings.buildTimeout)
+ );
+ })
+ );
+ }
+
+ return handlers.then([this](auto r) -> Outcome<void, Finished> {
+ if (!currentLogLine.empty()) flushLine();
+ return r;
+ });
+} catch (...) {
+ return {std::current_exception()};
+}
+
+kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::monitorForSilence() noexcept
{
- if (!currentLogLine.empty()) flushLine();
+ while (true) {
+ const auto stash = lastChildActivity;
+ auto waitUntil = lastChildActivity + settings.maxSilentTime.get() * kj::SECONDS;
+ co_await worker.aio.provider->getTimer().atTime(waitUntil);
+ if (lastChildActivity == stash) {
+ co_return timedOut(
+ Error("%1% timed out after %2% seconds of silence", name, settings.maxSilentTime)
+ );
+ }
+ }
}
+kj::Promise<Outcome<void, Goal::Finished>>
+DerivationGoal::handleChildStreams(InputStream & builderIn, InputStream * hookIn) noexcept
+{
+ lastChildActivity = worker.aio.provider->getTimer().now();
+
+ auto handlers = kj::joinPromisesFailFast([&] {
+ kj::Vector<kj::Promise<Outcome<void, Finished>>> parts{2};
+
+ parts.add(handleBuilderOutput(builderIn));
+ if (hookIn) {
+ parts.add(handleHookOutput(*hookIn));
+ }
+
+ return parts.releaseAsArray();
+ }());
+
+ if (respectsTimeouts() && settings.maxSilentTime != 0) {
+ handlers = handlers.exclusiveJoin(monitorForSilence().then([](auto r) {
+ return kj::arr(std::move(r));
+ }));
+ }
+
+ for (auto r : co_await handlers) {
+ BOOST_OUTCOME_CO_TRYV(r);
+ }
+ co_return result::success();
+}
void DerivationGoal::flushLine()
{
diff --git a/src/libstore/build/derivation-goal.hh b/src/libstore/build/derivation-goal.hh
index 020388d5a..46b07fc0b 100644
--- a/src/libstore/build/derivation-goal.hh
+++ b/src/libstore/build/derivation-goal.hh
@@ -8,6 +8,7 @@
#include "store-api.hh"
#include "pathlocks.hh"
#include "goal.hh"
+#include <kj/time.h>
namespace nix {
@@ -17,7 +18,7 @@ struct HookInstance;
struct HookReplyBase {
struct [[nodiscard]] Accept {
- std::set<int> fds;
+ kj::Promise<Outcome<void, Goal::Finished>> promise;
};
struct [[nodiscard]] Decline {};
struct [[nodiscard]] Postpone {};
@@ -70,6 +71,8 @@ struct InitialOutput {
*/
struct DerivationGoal : public Goal
{
+ struct InputStream;
+
/**
* Whether to use an on-disk .drv file.
*/
@@ -242,7 +245,7 @@ struct DerivationGoal : public Goal
BuildMode buildMode = bmNormal);
virtual ~DerivationGoal() noexcept(false);
- Finished timedOut(Error && ex) override;
+ Finished timedOut(Error && ex);
std::string key() override;
@@ -312,13 +315,19 @@ struct DerivationGoal : public Goal
virtual void cleanupPostOutputsRegisteredModeCheck();
virtual void cleanupPostOutputsRegisteredModeNonCheck();
- /**
- * Callback used by the worker to write to the log.
- */
- WorkResult handleChildOutput(int fd, std::string_view data) override;
- void handleEOF(int fd) override;
+protected:
+ kj::TimePoint lastChildActivity = kj::minValue;
+
+ kj::Promise<Outcome<void, Finished>> handleChildOutput() noexcept;
+ kj::Promise<Outcome<void, Finished>>
+ handleChildStreams(InputStream & builderIn, InputStream * hookIn) noexcept;
+ kj::Promise<Outcome<void, Finished>> handleBuilderOutput(InputStream & in) noexcept;
+ kj::Promise<Outcome<void, Finished>> handleHookOutput(InputStream & in) noexcept;
+ kj::Promise<Outcome<void, Finished>> monitorForSilence() noexcept;
+ Finished tooMuchLogs();
void flushLine();
+public:
/**
* Wrappers around the corresponding Store methods that first consult the
* derivation. This is currently needed because when there is no drv file
@@ -357,6 +366,11 @@ struct DerivationGoal : public Goal
void waiteeDone(GoalPtr waitee) override;
+ virtual bool respectsTimeouts()
+ {
+ return false;
+ }
+
StorePathSet exportReferences(const StorePathSet & storePaths);
JobCategory jobCategory() const override {
diff --git a/src/libstore/build/drv-output-substitution-goal.cc b/src/libstore/build/drv-output-substitution-goal.cc
index 7986123cc..fdee53699 100644
--- a/src/libstore/build/drv-output-substitution-goal.cc
+++ b/src/libstore/build/drv-output-substitution-goal.cc
@@ -69,24 +69,26 @@ try {
some other error occurs), so it must not touch `this`. So put
the shared state in a separate refcounted object. */
downloadState = std::make_shared<DownloadState>();
- downloadState->outPipe.create();
+ auto pipe = kj::newPromiseAndCrossThreadFulfiller<void>();
+ downloadState->outPipe = kj::mv(pipe.fulfiller);
downloadState->result =
std::async(std::launch::async, [downloadState{downloadState}, id{id}, sub{sub}] {
+ Finally updateStats([&]() { downloadState->outPipe->fulfill(); });
ReceiveInterrupts receiveInterrupts;
- Finally updateStats([&]() { downloadState->outPipe.writeSide.close(); });
return sub->queryRealisation(id);
});
state = &DrvOutputSubstitutionGoal::realisationFetched;
- return {WaitForWorld{{downloadState->outPipe.readSide.get()}, true}};
+ return {WaitForWorld{
+ pipe.promise.then([]() -> Outcome<void, Finished> { return result::success(); }), true
+ }};
} catch (...) {
return {std::current_exception()};
}
kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::realisationFetched(bool inBuildSlot) noexcept
try {
- worker.childTerminated(this);
maintainRunningSubstitutions.reset();
try {
diff --git a/src/libstore/build/drv-output-substitution-goal.hh b/src/libstore/build/drv-output-substitution-goal.hh
index f33196665..a35bf67ee 100644
--- a/src/libstore/build/drv-output-substitution-goal.hh
+++ b/src/libstore/build/drv-output-substitution-goal.hh
@@ -45,7 +45,7 @@ class DrvOutputSubstitutionGoal : public Goal {
struct DownloadState
{
- Pipe outPipe;
+ kj::Own<kj::CrossThreadPromiseFulfiller<void>> outPipe;
std::future<std::shared_ptr<const Realisation>> result;
};
@@ -74,8 +74,6 @@ public:
kj::Promise<Result<WorkResult>> outPathValid(bool inBuildSlot) noexcept;
kj::Promise<Result<WorkResult>> finished() noexcept;
- Finished timedOut(Error && ex) override { abort(); };
-
std::string key() override;
kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept override;
diff --git a/src/libstore/build/goal.hh b/src/libstore/build/goal.hh
index 189505308..3f6e8396e 100644
--- a/src/libstore/build/goal.hh
+++ b/src/libstore/build/goal.hh
@@ -114,6 +114,8 @@ struct Goal
public:
+ struct Finished;
+
struct [[nodiscard]] StillAlive {};
struct [[nodiscard]] WaitForSlot {};
struct [[nodiscard]] WaitForAWhile {};
@@ -122,7 +124,7 @@ public:
Goals goals;
};
struct [[nodiscard]] WaitForWorld {
- std::set<int> fds;
+ kj::Promise<Outcome<void, Finished>> promise;
bool inBuildSlot;
};
struct [[nodiscard]] Finished {
@@ -167,20 +169,6 @@ public:
virtual void waiteeDone(GoalPtr waitee) { }
- virtual WorkResult handleChildOutput(int fd, std::string_view data)
- {
- abort();
- }
-
- virtual void handleEOF(int fd)
- {
- }
-
- virtual bool respectsTimeouts()
- {
- return false;
- }
-
void trace(std::string_view s);
std::string getName() const
@@ -188,13 +176,6 @@ public:
return name;
}
- /**
- * Callback in case of a timeout. It should wake up its waiters,
- * get rid of any running child processes that are being monitored
- * by the worker (important!), etc.
- */
- virtual Finished timedOut(Error && ex) = 0;
-
virtual std::string key() = 0;
virtual void cleanup() { }
diff --git a/src/libstore/build/local-derivation-goal.cc b/src/libstore/build/local-derivation-goal.cc
index f14d09652..040fa7461 100644
--- a/src/libstore/build/local-derivation-goal.cc
+++ b/src/libstore/build/local-derivation-goal.cc
@@ -121,8 +121,6 @@ LocalStore & LocalDerivationGoal::getLocalStore()
void LocalDerivationGoal::killChild()
{
if (pid) {
- worker.childTerminated(this);
-
/* If we're using a build user, then there is a tricky race
condition: if we kill the build user before the child has
done its setuid() to the build user uid, then it won't be
@@ -243,14 +241,14 @@ try {
try {
/* Okay, we have to build. */
- auto fds = startBuilder();
+ auto promise = startBuilder();
/* This state will be reached when we get EOF on the child's
log pipe. */
state = &DerivationGoal::buildDone;
started();
- return {WaitForWorld{std::move(fds), true}};
+ return {WaitForWorld{std::move(promise), true}};
} catch (BuildError & e) {
outputLocks.unlock();
@@ -390,7 +388,9 @@ void LocalDerivationGoal::cleanupPostOutputsRegisteredModeNonCheck()
cleanupPostOutputsRegisteredModeCheck();
}
-std::set<int> LocalDerivationGoal::startBuilder()
+// NOTE this one isn't noexcept because it's called from places that expect
+// exceptions to signal failure to launch. we should change this some time.
+kj::Promise<Outcome<void, Goal::Finished>> LocalDerivationGoal::startBuilder()
{
if ((buildUser && buildUser->getUIDCount() != 1)
#if __linux__
@@ -779,7 +779,7 @@ std::set<int> LocalDerivationGoal::startBuilder()
msgs.push_back(std::move(msg));
}
- return {builderOutPTY.get()};
+ return handleChildOutput();
}
diff --git a/src/libstore/build/local-derivation-goal.hh b/src/libstore/build/local-derivation-goal.hh
index cd040bc15..6239129ab 100644
--- a/src/libstore/build/local-derivation-goal.hh
+++ b/src/libstore/build/local-derivation-goal.hh
@@ -218,7 +218,7 @@ struct LocalDerivationGoal : public DerivationGoal
/**
* Start building a derivation.
*/
- std::set<int> startBuilder();
+ kj::Promise<Outcome<void, Finished>> startBuilder();
/**
* Fill in the environment for the builder.
diff --git a/src/libstore/build/substitution-goal.cc b/src/libstore/build/substitution-goal.cc
index bd0ffcb9b..058f858d4 100644
--- a/src/libstore/build/substitution-goal.cc
+++ b/src/libstore/build/substitution-goal.cc
@@ -208,16 +208,17 @@ try {
maintainRunningSubstitutions = worker.runningSubstitutions.addTemporarily(1);
- outPipe.create();
+ auto pipe = kj::newPromiseAndCrossThreadFulfiller<void>();
+ outPipe = kj::mv(pipe.fulfiller);
thr = std::async(std::launch::async, [this]() {
+ /* Wake up the worker loop when we're done. */
+ Finally updateStats([this]() { outPipe->fulfill(); });
+
auto & fetchPath = subPath ? *subPath : storePath;
try {
ReceiveInterrupts receiveInterrupts;
- /* Wake up the worker loop when we're done. */
- Finally updateStats([this]() { outPipe.writeSide.close(); });
-
Activity act(*logger, actSubstitute, Logger::Fields{worker.store.printStorePath(storePath), sub->getUri()});
PushActivity pact(act.id);
@@ -234,7 +235,9 @@ try {
});
state = &PathSubstitutionGoal::finished;
- return {WaitForWorld{{outPipe.readSide.get()}, true}};
+ return {WaitForWorld{
+ pipe.promise.then([]() -> Outcome<void, Finished> { return result::success(); }), true
+ }};
} catch (...) {
return {std::current_exception()};
}
@@ -244,8 +247,6 @@ kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::finished(bool inBuil
try {
trace("substitute finished");
- worker.childTerminated(this);
-
try {
thr.get();
} catch (std::exception & e) {
@@ -288,22 +289,13 @@ try {
}
-Goal::WorkResult PathSubstitutionGoal::handleChildOutput(int fd, std::string_view data)
-{
- return StillAlive{};
-}
-
-
void PathSubstitutionGoal::cleanup()
{
try {
if (thr.valid()) {
// FIXME: signal worker thread to quit.
thr.get();
- worker.childTerminated(this);
}
-
- outPipe.close();
} catch (...) {
ignoreException();
}
diff --git a/src/libstore/build/substitution-goal.hh b/src/libstore/build/substitution-goal.hh
index 3c97b19fd..91e256fd7 100644
--- a/src/libstore/build/substitution-goal.hh
+++ b/src/libstore/build/substitution-goal.hh
@@ -46,7 +46,7 @@ struct PathSubstitutionGoal : public Goal
/**
* Pipe for the substituter's standard output.
*/
- Pipe outPipe;
+ kj::Own<kj::CrossThreadPromiseFulfiller<void>> outPipe;
/**
* The substituter thread.
@@ -90,8 +90,6 @@ public:
);
~PathSubstitutionGoal();
- Finished timedOut(Error && ex) override { abort(); };
-
/**
* We prepend "a$" to the key name to ensure substitution goals
* happen before derivation goals.
@@ -112,11 +110,6 @@ public:
kj::Promise<Result<WorkResult>> tryToRun(bool inBuildSlot) noexcept;
kj::Promise<Result<WorkResult>> finished(bool inBuildSlot) noexcept;
- /**
- * Callback used by the worker to write to the log.
- */
- WorkResult handleChildOutput(int fd, std::string_view data) override;
-
/* Called by destructor, can't be overridden */
void cleanup() override final;
diff --git a/src/libstore/build/worker.cc b/src/libstore/build/worker.cc
index ee45c7e3f..284adbc50 100644
--- a/src/libstore/build/worker.cc
+++ b/src/libstore/build/worker.cc
@@ -7,10 +7,19 @@
#include "signals.hh"
#include "hook-instance.hh" // IWYU pragma: keep
-#include <poll.h>
-
namespace nix {
+namespace {
+struct ErrorHandler : kj::TaskSet::ErrorHandler
+{
+ void taskFailed(kj::Exception && e) override
+ {
+ printError("unexpected async failure in Worker: %s", kj::str(e).cStr());
+ abort();
+ }
+} errorHandler;
+}
+
Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio)
: act(*logger, actRealise)
, actDerivations(*logger, actBuilds)
@@ -18,6 +27,7 @@ Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio)
, store(store)
, evalStore(evalStore)
, aio(aio)
+ , children(errorHandler)
{
/* Debugging: prevent recursive workers. */
nrLocalBuilds = 0;
@@ -33,6 +43,7 @@ Worker::~Worker()
are in trouble, since goals may call childTerminated() etc. in
their destructors). */
topGoals.clear();
+ children.clear();
assert(expectedSubstitutions == 0);
assert(expectedDownloadSize == 0);
@@ -209,7 +220,9 @@ void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how)
dep->waiters.insert(goal);
}
},
- [&](Goal::WaitForWorld & w) { childStarted(goal, w.fds, w.inBuildSlot); },
+ [&](Goal::WaitForWorld & w) {
+ childStarted(goal, std::move(w.promise), w.inBuildSlot);
+ },
[&](Goal::Finished & f) { goalFinished(goal, f); },
},
how
@@ -244,16 +257,22 @@ void Worker::wakeUp(GoalPtr goal)
}
-void Worker::childStarted(GoalPtr goal, const std::set<int> & fds,
+void Worker::childStarted(GoalPtr goal, kj::Promise<Outcome<void, Goal::Finished>> promise,
bool inBuildSlot)
{
- Child child;
- child.goal = goal;
- child.goal2 = goal.get();
- child.fds = fds;
- child.timeStarted = child.lastOutput = steady_time_point::clock::now();
- child.inBuildSlot = inBuildSlot;
- children.emplace_back(child);
+ children.add(promise
+ .then([this, goal](auto result) {
+ if (result.has_value()) {
+ handleWorkResult(goal, Goal::ContinueImmediately{});
+ } else if (result.has_error()) {
+ handleWorkResult(goal, std::move(result.assume_error()));
+ } else {
+ childException = result.assume_exception();
+ }
+ })
+ .attach(Finally{[this, goal, inBuildSlot] {
+ childTerminated(goal, inBuildSlot);
+ }}));
if (inBuildSlot) {
switch (goal->jobCategory()) {
case JobCategory::Substitution:
@@ -269,13 +288,13 @@ void Worker::childStarted(GoalPtr goal, const std::set<int> & fds,
}
-void Worker::childTerminated(Goal * goal)
+void Worker::childTerminated(GoalPtr goal, bool inBuildSlot)
{
- auto i = std::find_if(children.begin(), children.end(),
- [&](const Child & child) { return child.goal2 == goal; });
- if (i == children.end()) return;
+ if (childFinished) {
+ childFinished->fulfill();
+ }
- if (i->inBuildSlot) {
+ if (inBuildSlot) {
switch (goal->jobCategory()) {
case JobCategory::Substitution:
assert(nrSubstitutions > 0);
@@ -290,8 +309,6 @@ void Worker::childTerminated(Goal * goal)
}
}
- children.erase(i);
-
/* Wake up goals waiting for a build slot. */
for (auto & j : wantingToBuild) {
GoalPtr goal = j.lock();
@@ -390,11 +407,15 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req)
if (topGoals.empty()) break;
/* Wait for input. */
- if (!children.empty() || !waitingForAWhile.empty())
+ if (!children.isEmpty() || !waitingForAWhile.empty())
waitForInput();
else {
assert(!awake.empty());
}
+
+ if (childException) {
+ std::rethrow_exception(childException);
+ }
}
/* If --keep-going is not set, it's possible that the main goal
@@ -402,7 +423,7 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req)
--keep-going *is* set, then they must all be finished now. */
assert(!settings.keepGoing || awake.empty());
assert(!settings.keepGoing || wantingToBuild.empty());
- assert(!settings.keepGoing || children.empty());
+ assert(!settings.keepGoing || children.isEmpty());
return _topGoals;
}
@@ -411,139 +432,51 @@ void Worker::waitForInput()
{
printMsg(lvlVomit, "waiting for children");
+ auto childFinished = [&]{
+ auto pair = kj::newPromiseAndFulfiller<void>();
+ this->childFinished = kj::mv(pair.fulfiller);
+ return kj::mv(pair.promise);
+ }();
+
/* Process output from the file descriptors attached to the
children, namely log output and output path creation commands.
We also use this to detect child termination: if we get EOF on
the logger pipe of a build, we assume that the builder has
terminated. */
- bool useTimeout = false;
- long timeout = 0;
+ std::optional<long> timeout = 0;
auto before = steady_time_point::clock::now();
- /* If we're monitoring for silence on stdout/stderr, or if there
- is a build timeout, then wait for input until the first
- deadline for any child. */
- auto nearest = steady_time_point::max(); // nearest deadline
- if (settings.minFree.get() != 0)
- // Periodicallty wake up to see if we need to run the garbage collector.
- nearest = before + std::chrono::seconds(10);
- for (auto & i : children) {
- if (auto goal = i.goal.lock()) {
- if (!goal->respectsTimeouts()) continue;
- if (0 != settings.maxSilentTime)
- nearest = std::min(nearest, i.lastOutput + std::chrono::seconds(settings.maxSilentTime));
- if (0 != settings.buildTimeout)
- nearest = std::min(nearest, i.timeStarted + std::chrono::seconds(settings.buildTimeout));
- }
- }
- if (nearest != steady_time_point::max()) {
- timeout = std::max(1L, (long) std::chrono::duration_cast<std::chrono::seconds>(nearest - before).count());
- useTimeout = true;
+ // Periodicallty wake up to see if we need to run the garbage collector.
+ if (settings.minFree.get() != 0) {
+ timeout = 10;
}
/* If we are polling goals that are waiting for a lock, then wake
up after a few seconds at most. */
if (!waitingForAWhile.empty()) {
- useTimeout = true;
if (lastWokenUp == steady_time_point::min() || lastWokenUp > before) lastWokenUp = before;
timeout = std::max(1L,
(long) std::chrono::duration_cast<std::chrono::seconds>(
lastWokenUp + std::chrono::seconds(settings.pollInterval) - before).count());
} else lastWokenUp = steady_time_point::min();
- if (useTimeout)
- vomit("sleeping %d seconds", timeout);
-
- /* Use select() to wait for the input side of any logger pipe to
- become `available'. Note that `available' (i.e., non-blocking)
- includes EOF. */
- std::vector<struct pollfd> pollStatus;
- std::map<int, size_t> fdToPollStatus;
- for (auto & i : children) {
- for (auto & j : i.fds) {
- pollStatus.push_back((struct pollfd) { .fd = j, .events = POLLIN });
- fdToPollStatus[j] = pollStatus.size() - 1;
- }
- }
-
- if (poll(pollStatus.data(), pollStatus.size(),
- useTimeout ? timeout * 1000 : -1) == -1) {
- if (errno == EINTR) return;
- throw SysError("waiting for input");
- }
-
- auto after = steady_time_point::clock::now();
-
- /* Process all available file descriptors. FIXME: this is
- O(children * fds). */
- decltype(children)::iterator i;
- for (auto j = children.begin(); j != children.end(); j = i) {
- i = std::next(j);
-
- checkInterrupt();
+ if (timeout)
+ vomit("sleeping %d seconds", *timeout);
- GoalPtr goal = j->goal.lock();
- assert(goal);
-
- if (!goal->exitCode.has_value() &&
- 0 != settings.maxSilentTime &&
- goal->respectsTimeouts() &&
- after - j->lastOutput >= std::chrono::seconds(settings.maxSilentTime))
- {
- handleWorkResult(
- goal,
- goal->timedOut(Error(
- "%1% timed out after %2% seconds of silence",
- goal->getName(),
- settings.maxSilentTime
- ))
- );
- continue;
+ auto waitFor = [&] {
+ if (timeout) {
+ return aio.provider->getTimer()
+ .afterDelay(*timeout * kj::SECONDS)
+ .exclusiveJoin(kj::mv(childFinished));
+ } else {
+ return std::move(childFinished);
}
+ }();
- else if (!goal->exitCode.has_value() &&
- 0 != settings.buildTimeout &&
- goal->respectsTimeouts() &&
- after - j->timeStarted >= std::chrono::seconds(settings.buildTimeout))
- {
- handleWorkResult(
- goal,
- goal->timedOut(
- Error("%1% timed out after %2% seconds", goal->getName(), settings.buildTimeout)
- )
- );
- continue;
- }
+ waitFor.wait(aio.waitScope);
- std::set<int> fds2(j->fds);
- std::vector<unsigned char> buffer(4096);
- for (auto & k : fds2) {
- const auto fdPollStatusId = get(fdToPollStatus, k);
- assert(fdPollStatusId);
- assert(*fdPollStatusId < pollStatus.size());
- if (pollStatus.at(*fdPollStatusId).revents) {
- ssize_t rd = ::read(k, buffer.data(), buffer.size());
- // FIXME: is there a cleaner way to handle pt close
- // than EIO? Is this even standard?
- if (rd == 0 || (rd == -1 && errno == EIO)) {
- debug("%1%: got EOF", goal->getName());
- goal->handleEOF(k);
- handleWorkResult(goal, Goal::ContinueImmediately{});
- j->fds.erase(k);
- } else if (rd == -1) {
- if (errno != EINTR)
- throw SysError("%s: read failed", goal->getName());
- } else {
- printMsg(lvlVomit, "%1%: read %2% bytes",
- goal->getName(), rd);
- std::string_view data(charptr_cast<char *>(buffer.data()), rd);
- j->lastOutput = after;
- handleWorkResult(goal, goal->handleChildOutput(k, data));
- }
- }
- }
- }
+ auto after = steady_time_point::clock::now();
if (!waitingForAWhile.empty() && lastWokenUp + std::chrono::seconds(settings.pollInterval) <= after) {
lastWokenUp = after;
diff --git a/src/libstore/build/worker.hh b/src/libstore/build/worker.hh
index 6735ea0b9..37d80ba7b 100644
--- a/src/libstore/build/worker.hh
+++ b/src/libstore/build/worker.hh
@@ -21,24 +21,6 @@ class DrvOutputSubstitutionGoal;
typedef std::chrono::time_point<std::chrono::steady_clock> steady_time_point;
-/**
- * A mapping used to remember for each child process to what goal it
- * belongs, and file descriptors for receiving log data and output
- * path creation commands.
- */
-struct Child
-{
- WeakGoalPtr goal;
- Goal * goal2; // ugly hackery
- std::set<int> fds;
- bool inBuildSlot;
- /**
- * Time we last got output on stdout/stderr
- */
- steady_time_point lastOutput;
- steady_time_point timeStarted;
-};
-
/* Forward definition. */
struct HookInstance;
@@ -117,11 +99,6 @@ private:
WeakGoals wantingToBuild;
/**
- * Child processes currently running.
- */
- std::list<Child> children;
-
- /**
* Number of build slots occupied. This includes local builds but does not
* include substitutions or remote builds via the build hook.
*/
@@ -179,6 +156,8 @@ private:
void goalFinished(GoalPtr goal, Goal::Finished & f);
void handleWorkResult(GoalPtr goal, Goal::WorkResult how);
+ kj::Own<kj::PromiseFulfiller<void>> childFinished;
+
/**
* Put `goal` to sleep until a build slot becomes available (which
* might be right away).
@@ -212,10 +191,15 @@ private:
* Registers a running child process. `inBuildSlot` means that
* the process counts towards the jobs limit.
*/
- void childStarted(GoalPtr goal, const std::set<int> & fds,
+ void childStarted(GoalPtr goal, kj::Promise<Outcome<void, Goal::Finished>> promise,
bool inBuildSlot);
/**
+ * Unregisters a running child process.
+ */
+ void childTerminated(GoalPtr goal, bool inBuildSlot);
+
+ /**
* Pass current stats counters to the logger for progress bar updates.
*/
void updateStatistics();
@@ -240,6 +224,11 @@ public:
Store & evalStore;
kj::AsyncIoContext & aio;
+private:
+ kj::TaskSet children;
+ std::exception_ptr childException;
+
+public:
struct HookState {
std::unique_ptr<HookInstance> instance;
@@ -303,11 +292,6 @@ private:
public:
/**
- * Unregisters a running child process.
- */
- void childTerminated(Goal * goal);
-
- /**
* Loop until the specified top-level goals have finished.
*/
Goals run(std::function<Goals (GoalFactory &)> req);