diff options
Diffstat (limited to 'src/libstore/build/worker.cc')
-rw-r--r-- | src/libstore/build/worker.cc | 193 |
1 files changed, 63 insertions, 130 deletions
diff --git a/src/libstore/build/worker.cc b/src/libstore/build/worker.cc index ee45c7e3f..284adbc50 100644 --- a/src/libstore/build/worker.cc +++ b/src/libstore/build/worker.cc @@ -7,10 +7,19 @@ #include "signals.hh" #include "hook-instance.hh" // IWYU pragma: keep -#include <poll.h> - namespace nix { +namespace { +struct ErrorHandler : kj::TaskSet::ErrorHandler +{ + void taskFailed(kj::Exception && e) override + { + printError("unexpected async failure in Worker: %s", kj::str(e).cStr()); + abort(); + } +} errorHandler; +} + Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio) : act(*logger, actRealise) , actDerivations(*logger, actBuilds) @@ -18,6 +27,7 @@ Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio) , store(store) , evalStore(evalStore) , aio(aio) + , children(errorHandler) { /* Debugging: prevent recursive workers. */ nrLocalBuilds = 0; @@ -33,6 +43,7 @@ Worker::~Worker() are in trouble, since goals may call childTerminated() etc. in their destructors). */ topGoals.clear(); + children.clear(); assert(expectedSubstitutions == 0); assert(expectedDownloadSize == 0); @@ -209,7 +220,9 @@ void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how) dep->waiters.insert(goal); } }, - [&](Goal::WaitForWorld & w) { childStarted(goal, w.fds, w.inBuildSlot); }, + [&](Goal::WaitForWorld & w) { + childStarted(goal, std::move(w.promise), w.inBuildSlot); + }, [&](Goal::Finished & f) { goalFinished(goal, f); }, }, how @@ -244,16 +257,22 @@ void Worker::wakeUp(GoalPtr goal) } -void Worker::childStarted(GoalPtr goal, const std::set<int> & fds, +void Worker::childStarted(GoalPtr goal, kj::Promise<Outcome<void, Goal::Finished>> promise, bool inBuildSlot) { - Child child; - child.goal = goal; - child.goal2 = goal.get(); - child.fds = fds; - child.timeStarted = child.lastOutput = steady_time_point::clock::now(); - child.inBuildSlot = inBuildSlot; - children.emplace_back(child); + children.add(promise + .then([this, goal](auto result) { + if (result.has_value()) { + handleWorkResult(goal, Goal::ContinueImmediately{}); + } else if (result.has_error()) { + handleWorkResult(goal, std::move(result.assume_error())); + } else { + childException = result.assume_exception(); + } + }) + .attach(Finally{[this, goal, inBuildSlot] { + childTerminated(goal, inBuildSlot); + }})); if (inBuildSlot) { switch (goal->jobCategory()) { case JobCategory::Substitution: @@ -269,13 +288,13 @@ void Worker::childStarted(GoalPtr goal, const std::set<int> & fds, } -void Worker::childTerminated(Goal * goal) +void Worker::childTerminated(GoalPtr goal, bool inBuildSlot) { - auto i = std::find_if(children.begin(), children.end(), - [&](const Child & child) { return child.goal2 == goal; }); - if (i == children.end()) return; + if (childFinished) { + childFinished->fulfill(); + } - if (i->inBuildSlot) { + if (inBuildSlot) { switch (goal->jobCategory()) { case JobCategory::Substitution: assert(nrSubstitutions > 0); @@ -290,8 +309,6 @@ void Worker::childTerminated(Goal * goal) } } - children.erase(i); - /* Wake up goals waiting for a build slot. */ for (auto & j : wantingToBuild) { GoalPtr goal = j.lock(); @@ -390,11 +407,15 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req) if (topGoals.empty()) break; /* Wait for input. */ - if (!children.empty() || !waitingForAWhile.empty()) + if (!children.isEmpty() || !waitingForAWhile.empty()) waitForInput(); else { assert(!awake.empty()); } + + if (childException) { + std::rethrow_exception(childException); + } } /* If --keep-going is not set, it's possible that the main goal @@ -402,7 +423,7 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req) --keep-going *is* set, then they must all be finished now. */ assert(!settings.keepGoing || awake.empty()); assert(!settings.keepGoing || wantingToBuild.empty()); - assert(!settings.keepGoing || children.empty()); + assert(!settings.keepGoing || children.isEmpty()); return _topGoals; } @@ -411,139 +432,51 @@ void Worker::waitForInput() { printMsg(lvlVomit, "waiting for children"); + auto childFinished = [&]{ + auto pair = kj::newPromiseAndFulfiller<void>(); + this->childFinished = kj::mv(pair.fulfiller); + return kj::mv(pair.promise); + }(); + /* Process output from the file descriptors attached to the children, namely log output and output path creation commands. We also use this to detect child termination: if we get EOF on the logger pipe of a build, we assume that the builder has terminated. */ - bool useTimeout = false; - long timeout = 0; + std::optional<long> timeout = 0; auto before = steady_time_point::clock::now(); - /* If we're monitoring for silence on stdout/stderr, or if there - is a build timeout, then wait for input until the first - deadline for any child. */ - auto nearest = steady_time_point::max(); // nearest deadline - if (settings.minFree.get() != 0) - // Periodicallty wake up to see if we need to run the garbage collector. - nearest = before + std::chrono::seconds(10); - for (auto & i : children) { - if (auto goal = i.goal.lock()) { - if (!goal->respectsTimeouts()) continue; - if (0 != settings.maxSilentTime) - nearest = std::min(nearest, i.lastOutput + std::chrono::seconds(settings.maxSilentTime)); - if (0 != settings.buildTimeout) - nearest = std::min(nearest, i.timeStarted + std::chrono::seconds(settings.buildTimeout)); - } - } - if (nearest != steady_time_point::max()) { - timeout = std::max(1L, (long) std::chrono::duration_cast<std::chrono::seconds>(nearest - before).count()); - useTimeout = true; + // Periodicallty wake up to see if we need to run the garbage collector. + if (settings.minFree.get() != 0) { + timeout = 10; } /* If we are polling goals that are waiting for a lock, then wake up after a few seconds at most. */ if (!waitingForAWhile.empty()) { - useTimeout = true; if (lastWokenUp == steady_time_point::min() || lastWokenUp > before) lastWokenUp = before; timeout = std::max(1L, (long) std::chrono::duration_cast<std::chrono::seconds>( lastWokenUp + std::chrono::seconds(settings.pollInterval) - before).count()); } else lastWokenUp = steady_time_point::min(); - if (useTimeout) - vomit("sleeping %d seconds", timeout); - - /* Use select() to wait for the input side of any logger pipe to - become `available'. Note that `available' (i.e., non-blocking) - includes EOF. */ - std::vector<struct pollfd> pollStatus; - std::map<int, size_t> fdToPollStatus; - for (auto & i : children) { - for (auto & j : i.fds) { - pollStatus.push_back((struct pollfd) { .fd = j, .events = POLLIN }); - fdToPollStatus[j] = pollStatus.size() - 1; - } - } - - if (poll(pollStatus.data(), pollStatus.size(), - useTimeout ? timeout * 1000 : -1) == -1) { - if (errno == EINTR) return; - throw SysError("waiting for input"); - } - - auto after = steady_time_point::clock::now(); - - /* Process all available file descriptors. FIXME: this is - O(children * fds). */ - decltype(children)::iterator i; - for (auto j = children.begin(); j != children.end(); j = i) { - i = std::next(j); - - checkInterrupt(); + if (timeout) + vomit("sleeping %d seconds", *timeout); - GoalPtr goal = j->goal.lock(); - assert(goal); - - if (!goal->exitCode.has_value() && - 0 != settings.maxSilentTime && - goal->respectsTimeouts() && - after - j->lastOutput >= std::chrono::seconds(settings.maxSilentTime)) - { - handleWorkResult( - goal, - goal->timedOut(Error( - "%1% timed out after %2% seconds of silence", - goal->getName(), - settings.maxSilentTime - )) - ); - continue; + auto waitFor = [&] { + if (timeout) { + return aio.provider->getTimer() + .afterDelay(*timeout * kj::SECONDS) + .exclusiveJoin(kj::mv(childFinished)); + } else { + return std::move(childFinished); } + }(); - else if (!goal->exitCode.has_value() && - 0 != settings.buildTimeout && - goal->respectsTimeouts() && - after - j->timeStarted >= std::chrono::seconds(settings.buildTimeout)) - { - handleWorkResult( - goal, - goal->timedOut( - Error("%1% timed out after %2% seconds", goal->getName(), settings.buildTimeout) - ) - ); - continue; - } + waitFor.wait(aio.waitScope); - std::set<int> fds2(j->fds); - std::vector<unsigned char> buffer(4096); - for (auto & k : fds2) { - const auto fdPollStatusId = get(fdToPollStatus, k); - assert(fdPollStatusId); - assert(*fdPollStatusId < pollStatus.size()); - if (pollStatus.at(*fdPollStatusId).revents) { - ssize_t rd = ::read(k, buffer.data(), buffer.size()); - // FIXME: is there a cleaner way to handle pt close - // than EIO? Is this even standard? - if (rd == 0 || (rd == -1 && errno == EIO)) { - debug("%1%: got EOF", goal->getName()); - goal->handleEOF(k); - handleWorkResult(goal, Goal::ContinueImmediately{}); - j->fds.erase(k); - } else if (rd == -1) { - if (errno != EINTR) - throw SysError("%s: read failed", goal->getName()); - } else { - printMsg(lvlVomit, "%1%: read %2% bytes", - goal->getName(), rd); - std::string_view data(charptr_cast<char *>(buffer.data()), rd); - j->lastOutput = after; - handleWorkResult(goal, goal->handleChildOutput(k, data)); - } - } - } - } + auto after = steady_time_point::clock::now(); if (!waitingForAWhile.empty() && lastWokenUp + std::chrono::seconds(settings.pollInterval) <= after) { lastWokenUp = after; |