diff options
Diffstat (limited to 'src/libstore/build/derivation-goal.cc')
-rw-r--r-- | src/libstore/build/derivation-goal.cc | 177 |
1 files changed, 149 insertions, 28 deletions
diff --git a/src/libstore/build/derivation-goal.cc b/src/libstore/build/derivation-goal.cc index 827c9f541..f40611b31 100644 --- a/src/libstore/build/derivation-goal.cc +++ b/src/libstore/build/derivation-goal.cc @@ -11,7 +11,10 @@ #include "drv-output-substitution-goal.hh" #include "strings.hh" +#include <boost/outcome/try.hpp> #include <fstream> +#include <kj/async-unix.h> +#include <kj/debug.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/un.h> @@ -780,7 +783,7 @@ try { buildResult.startTime = time(0); // inexact state = &DerivationGoal::buildDone; started(); - return WaitForWorld{std::move(a.fds), false}; + return WaitForWorld{std::move(a.promise), false}; }, [&](HookReply::Postpone) -> std::optional<WorkResult> { /* Not now; wait until at least one child finishes or @@ -992,9 +995,6 @@ try { buildResult.timesBuilt++; buildResult.stopTime = time(0); - /* So the child is gone now. */ - worker.childTerminated(this); - /* Close the read side of the logger pipe. */ closeReadPipes(); @@ -1266,12 +1266,8 @@ HookReply DerivationGoal::tryBuildHook(bool inBuildSlot) /* Create the log file and pipe. */ Path logFile = openLogFile(); - std::set<int> fds; - fds.insert(hook->fromHook.get()); - fds.insert(hook->builderOut.get()); builderOutFD = &hook->builderOut; - - return HookReply::Accept{std::move(fds)}; + return HookReply::Accept{handleChildOutput()}; } @@ -1331,23 +1327,69 @@ void DerivationGoal::closeLogFile() } -Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data) +Goal::Finished DerivationGoal::tooMuchLogs() { - assert(builderOutFD); + killChild(); + return done( + BuildResult::LogLimitExceeded, {}, + Error("%s killed after writing more than %d bytes of log output", + getName(), settings.maxLogSize)); +} - auto tooMuchLogs = [&] { - killChild(); - return done( - BuildResult::LogLimitExceeded, {}, - Error("%s killed after writing more than %d bytes of log output", - getName(), settings.maxLogSize)); - }; +struct DerivationGoal::InputStream final : private kj::AsyncObject +{ + int fd; + kj::UnixEventPort::FdObserver observer; + + InputStream(kj::UnixEventPort & ep, int fd) + : fd(fd) + , observer(ep, fd, kj::UnixEventPort::FdObserver::OBSERVE_READ) + { + int flags = fcntl(fd, F_GETFL); + if (flags < 0) { + throw SysError("fcntl(F_GETFL) failed on fd %i", fd); + } + if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) { + throw SysError("fcntl(F_SETFL) failed on fd %i", fd); + } + } + + kj::Promise<std::string_view> read(kj::ArrayPtr<char> buffer) + { + const auto res = ::read(fd, buffer.begin(), buffer.size()); + // closing a pty endpoint causes EIO on the other endpoint. stock kj streams + // do not handle this and throw exceptions we can't ask for errno instead :( + // (we can't use `errno` either because kj may well have mangled it by now.) + if (res == 0 || (res == -1 && errno == EIO)) { + return std::string_view{}; + } + + KJ_NONBLOCKING_SYSCALL(res) {} + + if (res > 0) { + return std::string_view{buffer.begin(), static_cast<size_t>(res)}; + } + + return observer.whenBecomesReadable().then([this, buffer] { + return read(buffer); + }); + } +}; + +kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::handleBuilderOutput(InputStream & in) noexcept +try { + auto buf = kj::heapArray<char>(4096); + while (true) { + auto data = co_await in.read(buf); + lastChildActivity = worker.aio.provider->getTimer().now(); + + if (data.empty()) { + co_return result::success(); + } - // local & `ssh://`-builds are dealt with here. - if (fd == builderOutFD->get()) { logSize += data.size(); if (settings.maxLogSize && logSize > settings.maxLogSize) { - return tooMuchLogs(); + co_return tooMuchLogs(); } for (auto c : data) @@ -1362,10 +1404,22 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data } if (logSink) (*logSink)(data); - return StillAlive{}; } +} catch (...) { + co_return std::current_exception(); +} + +kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::handleHookOutput(InputStream & in) noexcept +try { + auto buf = kj::heapArray<char>(4096); + while (true) { + auto data = co_await in.read(buf); + lastChildActivity = worker.aio.provider->getTimer().now(); + + if (data.empty()) { + co_return result::success(); + } - if (hook && fd == hook->fromHook.get()) { for (auto c : data) if (c == '\n') { auto json = parseJSONMessage(currentHookLine); @@ -1381,7 +1435,7 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data (fields.size() > 0 ? fields[0].get<std::string>() : "") + "\n"; logSize += logLine.size(); if (settings.maxLogSize && logSize > settings.maxLogSize) { - return tooMuchLogs(); + co_return tooMuchLogs(); } (*logSink)(logLine); } else if (type == resSetPhase && ! fields.is_null()) { @@ -1405,16 +1459,83 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data } else currentHookLine += c; } - - return StillAlive{}; +} catch (...) { + co_return std::current_exception(); } +kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::handleChildOutput() noexcept +try { + assert(builderOutFD); + + auto builderIn = kj::heap<InputStream>(worker.aio.unixEventPort, builderOutFD->get()); + kj::Own<InputStream> hookIn; + if (hook) { + hookIn = kj::heap<InputStream>(worker.aio.unixEventPort, hook->fromHook.get()); + } + + auto handlers = handleChildStreams(*builderIn, hookIn.get()).attach(std::move(builderIn), std::move(hookIn)); -void DerivationGoal::handleEOF(int fd) + if (respectsTimeouts() && settings.buildTimeout != 0) { + handlers = handlers.exclusiveJoin( + worker.aio.provider->getTimer() + .afterDelay(settings.buildTimeout.get() * kj::SECONDS) + .then([this]() -> Outcome<void, Finished> { + return timedOut( + Error("%1% timed out after %2% seconds", name, settings.buildTimeout) + ); + }) + ); + } + + return handlers.then([this](auto r) -> Outcome<void, Finished> { + if (!currentLogLine.empty()) flushLine(); + return r; + }); +} catch (...) { + return {std::current_exception()}; +} + +kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::monitorForSilence() noexcept { - if (!currentLogLine.empty()) flushLine(); + while (true) { + const auto stash = lastChildActivity; + auto waitUntil = lastChildActivity + settings.maxSilentTime.get() * kj::SECONDS; + co_await worker.aio.provider->getTimer().atTime(waitUntil); + if (lastChildActivity == stash) { + co_return timedOut( + Error("%1% timed out after %2% seconds of silence", name, settings.maxSilentTime) + ); + } + } } +kj::Promise<Outcome<void, Goal::Finished>> +DerivationGoal::handleChildStreams(InputStream & builderIn, InputStream * hookIn) noexcept +{ + lastChildActivity = worker.aio.provider->getTimer().now(); + + auto handlers = kj::joinPromisesFailFast([&] { + kj::Vector<kj::Promise<Outcome<void, Finished>>> parts{2}; + + parts.add(handleBuilderOutput(builderIn)); + if (hookIn) { + parts.add(handleHookOutput(*hookIn)); + } + + return parts.releaseAsArray(); + }()); + + if (respectsTimeouts() && settings.maxSilentTime != 0) { + handlers = handlers.exclusiveJoin(monitorForSilence().then([](auto r) { + return kj::arr(std::move(r)); + })); + } + + for (auto r : co_await handlers) { + BOOST_OUTCOME_CO_TRYV(r); + } + co_return result::success(); +} void DerivationGoal::flushLine() { |