aboutsummaryrefslogtreecommitdiff
path: root/src/libstore/build/worker.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstore/build/worker.cc')
-rw-r--r--src/libstore/build/worker.cc193
1 files changed, 63 insertions, 130 deletions
diff --git a/src/libstore/build/worker.cc b/src/libstore/build/worker.cc
index ee45c7e3f..284adbc50 100644
--- a/src/libstore/build/worker.cc
+++ b/src/libstore/build/worker.cc
@@ -7,10 +7,19 @@
#include "signals.hh"
#include "hook-instance.hh" // IWYU pragma: keep
-#include <poll.h>
-
namespace nix {
+namespace {
+struct ErrorHandler : kj::TaskSet::ErrorHandler
+{
+ void taskFailed(kj::Exception && e) override
+ {
+ printError("unexpected async failure in Worker: %s", kj::str(e).cStr());
+ abort();
+ }
+} errorHandler;
+}
+
Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio)
: act(*logger, actRealise)
, actDerivations(*logger, actBuilds)
@@ -18,6 +27,7 @@ Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio)
, store(store)
, evalStore(evalStore)
, aio(aio)
+ , children(errorHandler)
{
/* Debugging: prevent recursive workers. */
nrLocalBuilds = 0;
@@ -33,6 +43,7 @@ Worker::~Worker()
are in trouble, since goals may call childTerminated() etc. in
their destructors). */
topGoals.clear();
+ children.clear();
assert(expectedSubstitutions == 0);
assert(expectedDownloadSize == 0);
@@ -209,7 +220,9 @@ void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how)
dep->waiters.insert(goal);
}
},
- [&](Goal::WaitForWorld & w) { childStarted(goal, w.fds, w.inBuildSlot); },
+ [&](Goal::WaitForWorld & w) {
+ childStarted(goal, std::move(w.promise), w.inBuildSlot);
+ },
[&](Goal::Finished & f) { goalFinished(goal, f); },
},
how
@@ -244,16 +257,22 @@ void Worker::wakeUp(GoalPtr goal)
}
-void Worker::childStarted(GoalPtr goal, const std::set<int> & fds,
+void Worker::childStarted(GoalPtr goal, kj::Promise<Outcome<void, Goal::Finished>> promise,
bool inBuildSlot)
{
- Child child;
- child.goal = goal;
- child.goal2 = goal.get();
- child.fds = fds;
- child.timeStarted = child.lastOutput = steady_time_point::clock::now();
- child.inBuildSlot = inBuildSlot;
- children.emplace_back(child);
+ children.add(promise
+ .then([this, goal](auto result) {
+ if (result.has_value()) {
+ handleWorkResult(goal, Goal::ContinueImmediately{});
+ } else if (result.has_error()) {
+ handleWorkResult(goal, std::move(result.assume_error()));
+ } else {
+ childException = result.assume_exception();
+ }
+ })
+ .attach(Finally{[this, goal, inBuildSlot] {
+ childTerminated(goal, inBuildSlot);
+ }}));
if (inBuildSlot) {
switch (goal->jobCategory()) {
case JobCategory::Substitution:
@@ -269,13 +288,13 @@ void Worker::childStarted(GoalPtr goal, const std::set<int> & fds,
}
-void Worker::childTerminated(Goal * goal)
+void Worker::childTerminated(GoalPtr goal, bool inBuildSlot)
{
- auto i = std::find_if(children.begin(), children.end(),
- [&](const Child & child) { return child.goal2 == goal; });
- if (i == children.end()) return;
+ if (childFinished) {
+ childFinished->fulfill();
+ }
- if (i->inBuildSlot) {
+ if (inBuildSlot) {
switch (goal->jobCategory()) {
case JobCategory::Substitution:
assert(nrSubstitutions > 0);
@@ -290,8 +309,6 @@ void Worker::childTerminated(Goal * goal)
}
}
- children.erase(i);
-
/* Wake up goals waiting for a build slot. */
for (auto & j : wantingToBuild) {
GoalPtr goal = j.lock();
@@ -390,11 +407,15 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req)
if (topGoals.empty()) break;
/* Wait for input. */
- if (!children.empty() || !waitingForAWhile.empty())
+ if (!children.isEmpty() || !waitingForAWhile.empty())
waitForInput();
else {
assert(!awake.empty());
}
+
+ if (childException) {
+ std::rethrow_exception(childException);
+ }
}
/* If --keep-going is not set, it's possible that the main goal
@@ -402,7 +423,7 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req)
--keep-going *is* set, then they must all be finished now. */
assert(!settings.keepGoing || awake.empty());
assert(!settings.keepGoing || wantingToBuild.empty());
- assert(!settings.keepGoing || children.empty());
+ assert(!settings.keepGoing || children.isEmpty());
return _topGoals;
}
@@ -411,139 +432,51 @@ void Worker::waitForInput()
{
printMsg(lvlVomit, "waiting for children");
+ auto childFinished = [&]{
+ auto pair = kj::newPromiseAndFulfiller<void>();
+ this->childFinished = kj::mv(pair.fulfiller);
+ return kj::mv(pair.promise);
+ }();
+
/* Process output from the file descriptors attached to the
children, namely log output and output path creation commands.
We also use this to detect child termination: if we get EOF on
the logger pipe of a build, we assume that the builder has
terminated. */
- bool useTimeout = false;
- long timeout = 0;
+ std::optional<long> timeout = 0;
auto before = steady_time_point::clock::now();
- /* If we're monitoring for silence on stdout/stderr, or if there
- is a build timeout, then wait for input until the first
- deadline for any child. */
- auto nearest = steady_time_point::max(); // nearest deadline
- if (settings.minFree.get() != 0)
- // Periodicallty wake up to see if we need to run the garbage collector.
- nearest = before + std::chrono::seconds(10);
- for (auto & i : children) {
- if (auto goal = i.goal.lock()) {
- if (!goal->respectsTimeouts()) continue;
- if (0 != settings.maxSilentTime)
- nearest = std::min(nearest, i.lastOutput + std::chrono::seconds(settings.maxSilentTime));
- if (0 != settings.buildTimeout)
- nearest = std::min(nearest, i.timeStarted + std::chrono::seconds(settings.buildTimeout));
- }
- }
- if (nearest != steady_time_point::max()) {
- timeout = std::max(1L, (long) std::chrono::duration_cast<std::chrono::seconds>(nearest - before).count());
- useTimeout = true;
+ // Periodicallty wake up to see if we need to run the garbage collector.
+ if (settings.minFree.get() != 0) {
+ timeout = 10;
}
/* If we are polling goals that are waiting for a lock, then wake
up after a few seconds at most. */
if (!waitingForAWhile.empty()) {
- useTimeout = true;
if (lastWokenUp == steady_time_point::min() || lastWokenUp > before) lastWokenUp = before;
timeout = std::max(1L,
(long) std::chrono::duration_cast<std::chrono::seconds>(
lastWokenUp + std::chrono::seconds(settings.pollInterval) - before).count());
} else lastWokenUp = steady_time_point::min();
- if (useTimeout)
- vomit("sleeping %d seconds", timeout);
-
- /* Use select() to wait for the input side of any logger pipe to
- become `available'. Note that `available' (i.e., non-blocking)
- includes EOF. */
- std::vector<struct pollfd> pollStatus;
- std::map<int, size_t> fdToPollStatus;
- for (auto & i : children) {
- for (auto & j : i.fds) {
- pollStatus.push_back((struct pollfd) { .fd = j, .events = POLLIN });
- fdToPollStatus[j] = pollStatus.size() - 1;
- }
- }
-
- if (poll(pollStatus.data(), pollStatus.size(),
- useTimeout ? timeout * 1000 : -1) == -1) {
- if (errno == EINTR) return;
- throw SysError("waiting for input");
- }
-
- auto after = steady_time_point::clock::now();
-
- /* Process all available file descriptors. FIXME: this is
- O(children * fds). */
- decltype(children)::iterator i;
- for (auto j = children.begin(); j != children.end(); j = i) {
- i = std::next(j);
-
- checkInterrupt();
+ if (timeout)
+ vomit("sleeping %d seconds", *timeout);
- GoalPtr goal = j->goal.lock();
- assert(goal);
-
- if (!goal->exitCode.has_value() &&
- 0 != settings.maxSilentTime &&
- goal->respectsTimeouts() &&
- after - j->lastOutput >= std::chrono::seconds(settings.maxSilentTime))
- {
- handleWorkResult(
- goal,
- goal->timedOut(Error(
- "%1% timed out after %2% seconds of silence",
- goal->getName(),
- settings.maxSilentTime
- ))
- );
- continue;
+ auto waitFor = [&] {
+ if (timeout) {
+ return aio.provider->getTimer()
+ .afterDelay(*timeout * kj::SECONDS)
+ .exclusiveJoin(kj::mv(childFinished));
+ } else {
+ return std::move(childFinished);
}
+ }();
- else if (!goal->exitCode.has_value() &&
- 0 != settings.buildTimeout &&
- goal->respectsTimeouts() &&
- after - j->timeStarted >= std::chrono::seconds(settings.buildTimeout))
- {
- handleWorkResult(
- goal,
- goal->timedOut(
- Error("%1% timed out after %2% seconds", goal->getName(), settings.buildTimeout)
- )
- );
- continue;
- }
+ waitFor.wait(aio.waitScope);
- std::set<int> fds2(j->fds);
- std::vector<unsigned char> buffer(4096);
- for (auto & k : fds2) {
- const auto fdPollStatusId = get(fdToPollStatus, k);
- assert(fdPollStatusId);
- assert(*fdPollStatusId < pollStatus.size());
- if (pollStatus.at(*fdPollStatusId).revents) {
- ssize_t rd = ::read(k, buffer.data(), buffer.size());
- // FIXME: is there a cleaner way to handle pt close
- // than EIO? Is this even standard?
- if (rd == 0 || (rd == -1 && errno == EIO)) {
- debug("%1%: got EOF", goal->getName());
- goal->handleEOF(k);
- handleWorkResult(goal, Goal::ContinueImmediately{});
- j->fds.erase(k);
- } else if (rd == -1) {
- if (errno != EINTR)
- throw SysError("%s: read failed", goal->getName());
- } else {
- printMsg(lvlVomit, "%1%: read %2% bytes",
- goal->getName(), rd);
- std::string_view data(charptr_cast<char *>(buffer.data()), rd);
- j->lastOutput = after;
- handleWorkResult(goal, goal->handleChildOutput(k, data));
- }
- }
- }
- }
+ auto after = steady_time_point::clock::now();
if (!waitingForAWhile.empty() && lastWokenUp + std::chrono::seconds(settings.pollInterval) <= after) {
lastWokenUp = after;