diff options
54 files changed, 1690 insertions, 999 deletions
diff --git a/doc/manual/change-authors.yml b/doc/manual/change-authors.yml index e18abada1..60c0924c7 100644 --- a/doc/manual/change-authors.yml +++ b/doc/manual/change-authors.yml @@ -147,3 +147,6 @@ winter: yshui: github: yshui + +zimbatm: + github: zimbatm diff --git a/doc/manual/meson.build b/doc/manual/meson.build index f53d41b5d..38aad55b5 100644 --- a/doc/manual/meson.build +++ b/doc/manual/meson.build @@ -126,20 +126,18 @@ manual = custom_target( 'manual', 'markdown', ], + install_dir : [ + datadir / 'doc/nix', + false, + ], depfile : 'manual.d', env : { 'RUST_LOG': 'info', 'MDBOOK_SUBSTITUTE_SEARCH': meson.current_build_dir() / 'src', }, ) -manual_html = manual[0] manual_md = manual[1] -install_subdir( - manual_html.full_path(), - install_dir : datadir / 'doc/nix', -) - nix_nested_manpages = [ [ 'nix-env', [ diff --git a/doc/manual/rl-next/fetchGit-regression.md b/doc/manual/rl-next/fetchGit-regression.md new file mode 100644 index 000000000..f6b4fb9e5 --- /dev/null +++ b/doc/manual/rl-next/fetchGit-regression.md @@ -0,0 +1,23 @@ +--- +synopsis: restore backwards-compatibility of `builtins.fetchGit` with Nix 2.3 +issues: [5291, 5128] +credits: [ma27] +category: Fixes +--- + +Compatibility with `builtins.fetchGit` from Nix 2.3 has been restored as follows: + +* Until now, each `ref` was prefixed with `refs/heads` unless it starts with `refs/` itself. + + Now, this is not done if the `ref` looks like a commit hash. + +* Specifying `builtins.fetchGit { ref = "a-tag"; /* … */ }` was broken because `refs/heads` was appended. + + Now, the fetcher doesn't turn a ref into `refs/heads/ref`, but into `refs/*/ref`. That way, + the value in `ref` can be either a tag or a branch. + +* The ref resolution happens the same way as in git: + + * If `refs/ref` exists, it's used. + * If a tag `refs/tags/ref` exists, it's used. + * If a branch `refs/heads/ref` exists, it's used. diff --git a/doc/manual/rl-next/nix-fmt-default-argument.md b/doc/manual/rl-next/nix-fmt-default-argument.md new file mode 100644 index 000000000..41b8f85bd --- /dev/null +++ b/doc/manual/rl-next/nix-fmt-default-argument.md @@ -0,0 +1,38 @@ +--- +synopsis: Removing the `.` default argument passed to the `nix fmt` formatter +issues: [] +prs: [11438] +cls: [1902] +category: Breaking Changes +credits: zimbatm +--- + +The underlying formatter no longer receives the ". " default argument when `nix fmt` is called with no arguments. + +This change was necessary as the formatter wasn't able to distinguish between +a user wanting to format the current folder with `nix fmt .` or the generic +`nix fmt`. + +The default behaviour is now the responsibility of the formatter itself, and +allows tools such as treefmt to format the whole tree instead of only the +current directory and below. + +This may cause issues with some formatters: nixfmt, nixpkgs-fmt and alejandra currently format stdin when no arguments are passed. + +Here is a small wrapper example that will restore the previous behaviour for such a formatter: + +```nix +{ + outputs = { self, nixpkgs, systems }: + let + eachSystem = nixpkgs.lib.genAttrs (import systems) (system: nixpkgs.legacyPackages.${system}); + in + { + formatter = eachSystem (pkgs: + pkgs.writeShellScriptBin "formatter" '' + if [[ $# = 0 ]]; set -- .; fi + exec "${pkgs.nixfmt-rfc-style}/bin/nixfmt "$@" + ''); + }; +} +``` diff --git a/doc/manual/rl-next/stack-traces.md b/doc/manual/rl-next/stack-traces.md new file mode 100644 index 000000000..e16d6c886 --- /dev/null +++ b/doc/manual/rl-next/stack-traces.md @@ -0,0 +1,26 @@ +--- +synopsis: "Some Lix crashes now produce reporting instructions and a stack trace, then abort" +cls: [1854] +category: Improvements +credits: jade +--- + +Lix, being a C++ program, can crash in a few kinds of ways. +It can obviously do a memory access violation, which will generate a core dump and thus be relatively debuggable. +But, worse, it could throw an unhandled exception, and, in the past, we would just show the message but not where it comes from, in spite of this always being a bug, since we expect all such errors to be translated to a Lix specific error. +Now the latter kind of bug should print reporting instructions, a rudimentary stack trace and (depending on system configuration) generate a core dump. + +Sample output: + +``` +Lix crashed. This is a bug. We would appreciate if you report it along with what caused it at https://git.lix.systems/lix-project/lix/issues with the following information included: + +Exception: std::runtime_error: test exception +Stack trace: + 0# nix::printStackTrace() in /home/jade/lix/lix3/build/src/nix/../libutil/liblixutil.so + 1# 0x000073C9862331F2 in /home/jade/lix/lix3/build/src/nix/../libmain/liblixmain.so + 2# 0x000073C985F2E21A in /nix/store/p44qan69linp3ii0xrviypsw2j4qdcp2-gcc-13.2.0-lib/lib/libstdc++.so.6 + 3# 0x000073C985F2E285 in /nix/store/p44qan69linp3ii0xrviypsw2j4qdcp2-gcc-13.2.0-lib/lib/libstdc++.so.6 + 4# nix::handleExceptions(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::function<void ()>) in /home/jade/lix/lix3/build/src/nix/../libmain/liblixmain.so + ... +``` diff --git a/doc/manual/rl-next/verify-tls.md b/doc/manual/rl-next/verify-tls.md new file mode 100644 index 000000000..608f3347e --- /dev/null +++ b/doc/manual/rl-next/verify-tls.md @@ -0,0 +1,10 @@ +--- +synopsis: "`<nix/fetchurl.nix>` now uses TLS verification" +category: Fixes +prs: [11585] +credits: edolstra +--- + +Previously `<nix/fetchurl.nix>` did not do TLS verification. This was because the Nix sandbox in the past did not have access to TLS certificates, and Nix checks the hash of the fetched file anyway. However, this can expose authentication data from `netrc` and URLs to man-in-the-middle attackers. In addition, Nix now in some cases (such as when using impure derivations) does *not* check the hash. Therefore we have now enabled TLS verification. This means that downloads by `<nix/fetchurl.nix>` will now fail if you're fetching from a HTTPS server that does not have a valid certificate. + +`<nix/fetchurl.nix>` is also known as the builtin derivation builder `builtin:fetchurl`. It's not to be confused with the evaluation-time function `builtins.fetchurl`, which was not affected by this issue. @@ -217,7 +217,7 @@ # A Nixpkgs overlay that overrides the 'nix' and # 'nix.perl-bindings' packages. - overlays.default = overlayFor (p: p.stdenv); + overlays.default = overlayFor (p: p.clangStdenv); hydraJobs = { # Binary package for various platforms. diff --git a/meson.build b/meson.build index f89f5a016..a8c6b9621 100644 --- a/meson.build +++ b/meson.build @@ -47,6 +47,7 @@ # in the build directory. project('lix', 'cpp', 'rust', + meson_version : '>=1.4.0', version : run_command('bash', '-c', 'echo -n $(jq -r .version < ./version.json)$VERSION_SUFFIX', check : true).stdout().strip(), default_options : [ 'cpp_std=c++2a', @@ -492,12 +493,6 @@ add_project_arguments( '-Wdeprecated-copy', '-Wignored-qualifiers', '-Werror=suggest-override', - # Enable assertions in libstdc++ by default. Harmless on libc++. Benchmarked - # at ~1% overhead in `nix search`. - # - # FIXME: remove when we get meson 1.4.0 which will default this to on for us: - # https://mesonbuild.com/Release-notes-for-1-4-0.html#ndebug-setting-now-controls-c-stdlib-assertions - '-D_GLIBCXX_ASSERTIONS=1', language : 'cpp', ) @@ -593,10 +588,10 @@ run_command( ) if is_darwin - configure_file( - input : 'misc/launchd/org.nixos.nix-daemon.plist.in', - output : 'org.nixos.nix-daemon.plist', - copy : true, + fs.copyfile( + 'misc/launchd/org.nixos.nix-daemon.plist.in', + 'org.nixos.nix-daemon.plist', + install : true, install_dir : prefix / 'Library/LaunchDaemons', ) endif diff --git a/misc/bash/meson.build b/misc/bash/meson.build index 75acce2ea..178692536 100644 --- a/misc/bash/meson.build +++ b/misc/bash/meson.build @@ -1,8 +1,7 @@ -configure_file( - input : 'completion.sh', - output : 'nix', +fs.copyfile( + 'completion.sh', + 'nix', install : true, install_dir : datadir / 'bash-completion/completions', install_mode : 'rw-r--r--', - copy : true, ) diff --git a/misc/fish/meson.build b/misc/fish/meson.build index d54de9a13..7f9cd0896 100644 --- a/misc/fish/meson.build +++ b/misc/fish/meson.build @@ -1,8 +1,7 @@ -configure_file( - input : 'completion.fish', - output : 'nix.fish', +fs.copyfile( + 'completion.fish', + 'nix.fish', install : true, install_dir : datadir / 'fish/vendor_completions.d', install_mode : 'rw-r--r--', - copy : true, ) diff --git a/misc/meson.build b/misc/meson.build index bf3c157f7..4e2f6aacf 100644 --- a/misc/meson.build +++ b/misc/meson.build @@ -5,8 +5,4 @@ subdir('zsh') subdir('systemd') subdir('flake-registry') -runinpty = configure_file( - copy : true, - input : meson.current_source_dir() / 'runinpty.py', - output : 'runinpty.py', -) +runinpty = fs.copyfile('runinpty.py') diff --git a/misc/zsh/meson.build b/misc/zsh/meson.build index 8063a5cb8..bd388a31f 100644 --- a/misc/zsh/meson.build +++ b/misc/zsh/meson.build @@ -1,10 +1,9 @@ foreach script : [ [ 'completion.zsh', '_nix' ], [ 'run-help-nix' ] ] - configure_file( - input : script[0], - output : script.get(1, script[0]), + fs.copyfile( + script[0], + script.get(1, script[0]), install : true, install_dir : datadir / 'zsh/site-functions', install_mode : 'rw-r--r--', - copy : true, ) endforeach diff --git a/package.nix b/package.nix index 23ebc3a82..ff862bf92 100644 --- a/package.nix +++ b/package.nix @@ -22,7 +22,6 @@ doxygen, editline-lix ? __forDefaults.editline-lix, editline, - fetchpatch, git, gtest, jq, @@ -100,7 +99,7 @@ (lib.enableFeature (ncurses != null) "termcap") ]; - nativeBuildInputs = (prev.nativeBuildInputs or [ ]) ++ [ ncurses ]; + buildInputs = (prev.buildInputs or [ ]) ++ [ ncurses ]; }); build-release-notes = callPackage ./maintainers/build-release-notes.nix { }; @@ -111,7 +110,7 @@ }: # gcc miscompiles coroutines at least until 13.2, possibly longer -assert stdenv.cc.isClang || lintInsteadOfBuild; +assert stdenv.cc.isClang || lintInsteadOfBuild || internalApiDocs; let inherit (__forDefaults) canRunInstalled; diff --git a/scripts/meson.build b/scripts/meson.build index c916c8efa..e35c6cbb0 100644 --- a/scripts/meson.build +++ b/scripts/meson.build @@ -8,12 +8,7 @@ configure_file( } ) -# https://github.com/mesonbuild/meson/issues/860 -configure_file( - input : 'nix-profile.sh.in', - output : 'nix-profile.sh.in', - copy : true, -) +fs.copyfile('nix-profile.sh.in') foreach rc : [ '.sh', '.fish', '-daemon.sh', '-daemon.fish' ] configure_file( diff --git a/src/libexpr/primops/fetchTree.cc b/src/libexpr/primops/fetchTree.cc index b0e14a26e..c98fe2a03 100644 --- a/src/libexpr/primops/fetchTree.cc +++ b/src/libexpr/primops/fetchTree.cc @@ -394,7 +394,8 @@ static RegisterPrimOp primop_fetchGit({ [Git reference]: https://git-scm.com/book/en/v2/Git-Internals-Git-References By default, the `ref` value is prefixed with `refs/heads/`. - As of 2.3.0, Nix will not prefix `refs/heads/` if `ref` starts with `refs/`. + As of 2.3.0, Nix will not prefix `refs/heads/` if `ref` starts with `refs/` or + if `ref` looks like a commit hash for backwards compatibility with CppNix 2.3. - `submodules` (default: `false`) diff --git a/src/libfetchers/git.cc b/src/libfetchers/git.cc index 7d16d3f57..da60bf331 100644 --- a/src/libfetchers/git.cc +++ b/src/libfetchers/git.cc @@ -1,3 +1,4 @@ +#include "error.hh" #include "fetchers.hh" #include "cache.hh" #include "globals.hh" @@ -257,6 +258,28 @@ std::pair<StorePath, Input> fetchFromWorkdir(ref<Store> store, Input & input, co } } // end namespace +static std::optional<Path> resolveRefToCachePath( + Input & input, + const Path & cacheDir, + std::vector<Path> & gitRefFileCandidates, + std::function<bool(const Path&)> condition) +{ + if (input.getRef()->starts_with("refs/")) { + Path fullpath = cacheDir + "/" + *input.getRef(); + if (condition(fullpath)) { + return fullpath; + } + } + + for (auto & candidate : gitRefFileCandidates) { + if (condition(candidate)) { + return candidate; + } + } + + return std::nullopt; +} + struct GitInputScheme : InputScheme { std::optional<Input> inputFromURL(const ParsedURL & url, bool requireTree) const override @@ -539,10 +562,13 @@ struct GitInputScheme : InputScheme runProgram("git", true, { "-c", "init.defaultBranch=" + gitInitialBranch, "init", "--bare", repoDir }); } - Path localRefFile = - input.getRef()->compare(0, 5, "refs/") == 0 - ? cacheDir + "/" + *input.getRef() - : cacheDir + "/refs/heads/" + *input.getRef(); + std::vector<Path> gitRefFileCandidates; + for (auto & infix : {"", "tags/", "heads/"}) { + Path p = cacheDir + "/refs/" + infix + *input.getRef(); + gitRefFileCandidates.push_back(p); + } + + Path localRefFile; bool doFetch; time_t now = time(0); @@ -564,29 +590,70 @@ struct GitInputScheme : InputScheme if (allRefs) { doFetch = true; } else { - /* If the local ref is older than ‘tarball-ttl’ seconds, do a - git fetch to update the local ref to the remote ref. */ - struct stat st; - doFetch = stat(localRefFile.c_str(), &st) != 0 || - !isCacheFileWithinTtl(now, st); + std::function<bool(const Path&)> condition; + condition = [&now](const Path & path) { + /* If the local ref is older than ‘tarball-ttl’ seconds, do a + git fetch to update the local ref to the remote ref. */ + struct stat st; + return stat(path.c_str(), &st) == 0 && + isCacheFileWithinTtl(now, st); + }; + if (auto result = resolveRefToCachePath( + input, + cacheDir, + gitRefFileCandidates, + condition + )) { + localRefFile = *result; + doFetch = false; + } else { + doFetch = true; + } } } + // When having to fetch, we don't know `localRefFile` yet. + // Because git needs to figure out what we're fetching + // (i.e. is it a rev? a branch? a tag?) if (doFetch) { Activity act(*logger, lvlTalkative, actUnknown, fmt("fetching Git repository '%s'", actualUrl)); - // FIXME: git stderr messes up our progress indicator, so - // we're using --quiet for now. Should process its stderr. + auto ref = input.getRef(); + std::string fetchRef; + if (allRefs) { + fetchRef = "refs/*"; + } else if ( + ref->starts_with("refs/") + || *ref == "HEAD" + || std::regex_match(*ref, revRegex)) + { + fetchRef = *ref; + } else { + fetchRef = "refs/*/" + *ref; + } + try { - auto ref = input.getRef(); - auto fetchRef = allRefs - ? "refs/*" - : ref->compare(0, 5, "refs/") == 0 - ? *ref - : ref == "HEAD" - ? *ref - : "refs/heads/" + *ref; - runProgram("git", true, { "-C", repoDir, "--git-dir", gitDir, "fetch", "--quiet", "--force", "--", actualUrl, fmt("%s:%s", fetchRef, fetchRef) }, true); + Finally finally([&]() { + if (auto p = resolveRefToCachePath( + input, + cacheDir, + gitRefFileCandidates, + pathExists + )) { + localRefFile = *p; + } + }); + + // FIXME: git stderr messes up our progress indicator, so + // we're using --quiet for now. Should process its stderr. + runProgram("git", true, { + "-C", repoDir, + "--git-dir", gitDir, + "fetch", + "--quiet", + "--force", + "--", actualUrl, fmt("%s:%s", fetchRef, fetchRef) + }, true); } catch (Error & e) { if (!pathExists(localRefFile)) throw; warn("could not update local clone of Git repository '%s'; continuing with the most recent version", actualUrl); diff --git a/src/libmain/crash-handler.cc b/src/libmain/crash-handler.cc new file mode 100644 index 000000000..3f1b9f7d8 --- /dev/null +++ b/src/libmain/crash-handler.cc @@ -0,0 +1,41 @@ +#include "crash-handler.hh" +#include "fmt.hh" + +#include <boost/core/demangle.hpp> +#include <exception> + +namespace nix { + +namespace { +void onTerminate() +{ + std::cerr << "Lix crashed. This is a bug. We would appreciate if you report it along with what caused it at https://git.lix.systems/lix-project/lix/issues with the following information included:\n\n"; + try { + std::exception_ptr eptr = std::current_exception(); + if (eptr) { + std::rethrow_exception(eptr); + } else { + std::cerr << "std::terminate() called without exception\n"; + } + } catch (const std::exception & ex) { + std::cerr << "Exception: " << boost::core::demangle(typeid(ex).name()) << ": " << ex.what() << "\n"; + } catch (...) { + std::cerr << "Unknown exception! Spooky.\n"; + } + + std::cerr << "Stack trace:\n"; + nix::printStackTrace(); + + std::abort(); +} +} + +void registerCrashHandler() +{ + // DO NOT use this for signals. Boost stacktrace is very much not + // async-signal-safe, and in a world with ASLR, addr2line is pointless. + // + // If you want signals, set up a minidump system and do it out-of-process. + std::set_terminate(onTerminate); +} +} diff --git a/src/libmain/crash-handler.hh b/src/libmain/crash-handler.hh new file mode 100644 index 000000000..4c5641b8c --- /dev/null +++ b/src/libmain/crash-handler.hh @@ -0,0 +1,21 @@ +#pragma once +/// @file Crash handler for Lix that prints back traces (hopefully in instances where it is not just going to crash the process itself). +/* + * Author's note: This will probably be partially/fully supplanted by a + * minidump writer like the following once we get our act together on crashes a + * little bit more: + * https://github.com/rust-minidump/minidump-writer + * https://github.com/EmbarkStudios/crash-handling + * (out of process implementation *should* be able to be done on-demand) + * + * Such an out-of-process implementation could then both make minidumps and + * print stack traces for arbitrarily messed-up process states such that we can + * safely give out backtraces for SIGSEGV and other deadly signals. + */ + +namespace nix { + +/** Registers the Lix crash handler for std::terminate (currently; will support more crashes later). See also detectStackOverflow(). */ +void registerCrashHandler(); + +} diff --git a/src/libmain/meson.build b/src/libmain/meson.build index a7cce287c..a1a888c16 100644 --- a/src/libmain/meson.build +++ b/src/libmain/meson.build @@ -1,5 +1,6 @@ libmain_sources = files( 'common-args.cc', + 'crash-handler.cc', 'loggers.cc', 'progress-bar.cc', 'shared.cc', @@ -8,6 +9,7 @@ libmain_sources = files( libmain_headers = files( 'common-args.hh', + 'crash-handler.hh', 'loggers.hh', 'progress-bar.hh', 'shared.hh', diff --git a/src/libmain/shared.cc b/src/libmain/shared.cc index bc9548e09..64bd00606 100644 --- a/src/libmain/shared.cc +++ b/src/libmain/shared.cc @@ -1,3 +1,4 @@ +#include "crash-handler.hh" #include "globals.hh" #include "shared.hh" #include "store-api.hh" @@ -118,6 +119,8 @@ static void sigHandler(int signo) { } void initNix() { + registerCrashHandler(); + /* Turn on buffering for cerr. */ static char buf[1024]; std::cerr.rdbuf()->pubsetbuf(buf, sizeof(buf)); @@ -335,12 +338,15 @@ int handleExceptions(const std::string & programName, std::function<void()> fun) } catch (BaseError & e) { logError(e.info()); return e.info().status; - } catch (std::bad_alloc & e) { + } catch (const std::bad_alloc & e) { printError(error + "out of memory"); return 1; - } catch (std::exception & e) { - printError(error + e.what()); - return 1; + } catch (const std::exception & e) { + // Random exceptions bubbling into main are cause for bug reports, crash + std::terminate(); + } catch (...) { + // Explicitly do not tolerate non-std exceptions escaping. + std::terminate(); } return 0; diff --git a/src/libmain/shared.hh b/src/libmain/shared.hh index b41efe567..49b72a54e 100644 --- a/src/libmain/shared.hh +++ b/src/libmain/shared.hh @@ -111,7 +111,7 @@ struct PrintFreed /** - * Install a SIGSEGV handler to detect stack overflows. + * Install a SIGSEGV handler to detect stack overflows. See also registerCrashHandler(). */ void detectStackOverflow(); diff --git a/src/libstore/build/derivation-goal.cc b/src/libstore/build/derivation-goal.cc index 827c9f541..5c0452391 100644 --- a/src/libstore/build/derivation-goal.cc +++ b/src/libstore/build/derivation-goal.cc @@ -11,7 +11,13 @@ #include "drv-output-substitution-goal.hh" #include "strings.hh" +#include <boost/outcome/try.hpp> #include <fstream> +#include <kj/array.h> +#include <kj/async-unix.h> +#include <kj/async.h> +#include <kj/debug.h> +#include <kj/vector.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/un.h> @@ -65,7 +71,6 @@ DerivationGoal::DerivationGoal(const StorePath & drvPath, , wantedOutputs(wantedOutputs) , buildMode(buildMode) { - state = &DerivationGoal::getDerivation; name = fmt( "building of '%s' from .drv file", DerivedPath::Built { makeConstantStorePathRef(drvPath), wantedOutputs }.to_string(worker.store)); @@ -85,7 +90,6 @@ DerivationGoal::DerivationGoal(const StorePath & drvPath, const BasicDerivation { this->drv = std::make_unique<Derivation>(drv); - state = &DerivationGoal::haveDerivation; name = fmt( "building of '%s' from in-memory derivation", DerivedPath::Built { makeConstantStorePathRef(drvPath), drv.outputNames() }.to_string(worker.store)); @@ -107,16 +111,6 @@ DerivationGoal::~DerivationGoal() noexcept(false) } -std::string DerivationGoal::key() -{ - /* Ensure that derivations get built in order of their name, - i.e. a derivation named "aardvark" always comes before - "baboon". And substitution goals always happen before - derivation goals (due to "b$"). */ - return "b$" + std::string(drvPath.name()) + "$" + worker.store.printStorePath(drvPath); -} - - void DerivationGoal::killChild() { hook.reset(); @@ -131,9 +125,9 @@ Goal::Finished DerivationGoal::timedOut(Error && ex) } -kj::Promise<Result<Goal::WorkResult>> DerivationGoal::work(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DerivationGoal::work() noexcept { - return (this->*state)(inBuildSlot); + return useDerivation ? getDerivation() : haveDerivation(); } void DerivationGoal::addWantedOutputs(const OutputsSpec & outputs) @@ -157,7 +151,7 @@ void DerivationGoal::addWantedOutputs(const OutputsSpec & outputs) } -kj::Promise<Result<Goal::WorkResult>> DerivationGoal::getDerivation(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DerivationGoal::getDerivation() noexcept try { trace("init"); @@ -165,18 +159,17 @@ try { exists. If it doesn't, it may be created through a substitute. */ if (buildMode == bmNormal && worker.evalStore.isValidPath(drvPath)) { - return loadDerivation(inBuildSlot); + co_return co_await loadDerivation(); } - - state = &DerivationGoal::loadDerivation; - return {WaitForGoals{{worker.goalFactory().makePathSubstitutionGoal(drvPath)}}}; + (co_await waitForGoals(worker.goalFactory().makePathSubstitutionGoal(drvPath))).value(); + co_return co_await loadDerivation(); } catch (...) { - return {std::current_exception()}; + co_return result::failure(std::current_exception()); } -kj::Promise<Result<Goal::WorkResult>> DerivationGoal::loadDerivation(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DerivationGoal::loadDerivation() noexcept try { trace("loading derivation"); @@ -207,13 +200,13 @@ try { } assert(drv); - return haveDerivation(inBuildSlot); + return haveDerivation(); } catch (...) { return {std::current_exception()}; } -kj::Promise<Result<Goal::WorkResult>> DerivationGoal::haveDerivation(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DerivationGoal::haveDerivation() noexcept try { trace("have derivation"); @@ -241,7 +234,7 @@ try { }); } - return gaveUpOnSubstitution(inBuildSlot); + co_return co_await gaveUpOnSubstitution(); } for (auto & i : drv->outputsAndOptPaths(worker.store)) @@ -263,19 +256,19 @@ try { /* If they are all valid, then we're done. */ if (allValid && buildMode == bmNormal) { - return {done(BuildResult::AlreadyValid, std::move(validOutputs))}; + co_return done(BuildResult::AlreadyValid, std::move(validOutputs)); } /* We are first going to try to create the invalid output paths through substitutes. If that doesn't work, we'll build them. */ - WaitForGoals result; + kj::Vector<std::pair<GoalPtr, kj::Promise<void>>> dependencies; if (settings.useSubstitutes) { if (parsedDrv->substitutesAllowed()) { for (auto & [outputName, status] : initialOutputs) { if (!status.wanted) continue; if (!status.known) - result.goals.insert( + dependencies.add( worker.goalFactory().makeDrvOutputSubstitutionGoal( DrvOutput{status.outputHash, outputName}, buildMode == bmRepair ? Repair : NoRepair @@ -283,7 +276,7 @@ try { ); else { auto * cap = getDerivationCA(*drv); - result.goals.insert(worker.goalFactory().makePathSubstitutionGoal( + dependencies.add(worker.goalFactory().makePathSubstitutionGoal( status.known->path, buildMode == bmRepair ? Repair : NoRepair, cap ? std::optional { *cap } : std::nullopt)); @@ -294,17 +287,15 @@ try { } } - if (result.goals.empty()) { /* to prevent hang (no wake-up event) */ - return outputsSubstitutionTried(inBuildSlot); - } else { - state = &DerivationGoal::outputsSubstitutionTried; - return {std::move(result)}; + if (!dependencies.empty()) { /* to prevent hang (no wake-up event) */ + (co_await waitForGoals(dependencies.releaseAsArray())).value(); } + co_return co_await outputsSubstitutionTried(); } catch (...) { - return {std::current_exception()}; + co_return result::failure(std::current_exception()); } -kj::Promise<Result<Goal::WorkResult>> DerivationGoal::outputsSubstitutionTried(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DerivationGoal::outputsSubstitutionTried() noexcept try { trace("all outputs substituted (maybe)"); @@ -354,7 +345,7 @@ try { if (needRestart == NeedRestartForMoreOutputs::OutputsAddedDoNeed) { needRestart = NeedRestartForMoreOutputs::OutputsUnmodifedDontNeed; - return haveDerivation(inBuildSlot); + return haveDerivation(); } auto [allValid, validOutputs] = checkPathValidity(); @@ -370,7 +361,7 @@ try { worker.store.printStorePath(drvPath)); /* Nothing to wait for; tail call */ - return gaveUpOnSubstitution(inBuildSlot); + return gaveUpOnSubstitution(); } catch (...) { return {std::current_exception()}; } @@ -378,9 +369,9 @@ try { /* At least one of the output paths could not be produced using a substitute. So we have to build instead. */ -kj::Promise<Result<Goal::WorkResult>> DerivationGoal::gaveUpOnSubstitution(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DerivationGoal::gaveUpOnSubstitution() noexcept try { - WaitForGoals result; + kj::Vector<std::pair<GoalPtr, kj::Promise<void>>> dependencies; /* At this point we are building all outputs, so if more are wanted there is no need to restart. */ @@ -393,7 +384,7 @@ try { addWaiteeDerivedPath = [&](ref<SingleDerivedPath> inputDrv, const DerivedPathMap<StringSet>::ChildNode & inputNode) { if (!inputNode.value.empty()) - result.goals.insert(worker.goalFactory().makeGoal( + dependencies.add(worker.goalFactory().makeGoal( DerivedPath::Built { .drvPath = inputDrv, .outputs = inputNode.value, @@ -438,17 +429,15 @@ try { if (!settings.useSubstitutes) throw Error("dependency '%s' of '%s' does not exist, and substitution is disabled", worker.store.printStorePath(i), worker.store.printStorePath(drvPath)); - result.goals.insert(worker.goalFactory().makePathSubstitutionGoal(i)); + dependencies.add(worker.goalFactory().makePathSubstitutionGoal(i)); } - if (result.goals.empty()) {/* to prevent hang (no wake-up event) */ - return inputsRealised(inBuildSlot); - } else { - state = &DerivationGoal::inputsRealised; - return {result}; + if (!dependencies.empty()) {/* to prevent hang (no wake-up event) */ + (co_await waitForGoals(dependencies.releaseAsArray())).value(); } + co_return co_await inputsRealised(); } catch (...) { - return {std::current_exception()}; + co_return result::failure(std::current_exception()); } @@ -488,7 +477,7 @@ try { } /* Check each path (slow!). */ - WaitForGoals result; + kj::Vector<std::pair<GoalPtr, kj::Promise<void>>> dependencies; for (auto & i : outputClosure) { if (worker.pathContentsGood(i)) continue; printError( @@ -496,9 +485,9 @@ try { worker.store.printStorePath(i), worker.store.printStorePath(drvPath)); auto drvPath2 = outputsToDrv.find(i); if (drvPath2 == outputsToDrv.end()) - result.goals.insert(worker.goalFactory().makePathSubstitutionGoal(i, Repair)); + dependencies.add(worker.goalFactory().makePathSubstitutionGoal(i, Repair)); else - result.goals.insert(worker.goalFactory().makeGoal( + dependencies.add(worker.goalFactory().makeGoal( DerivedPath::Built { .drvPath = makeConstantStorePathRef(drvPath2->second), .outputs = OutputsSpec::All { }, @@ -506,18 +495,18 @@ try { bmRepair)); } - if (result.goals.empty()) { - return {done(BuildResult::AlreadyValid, assertPathValidity())}; + if (dependencies.empty()) { + co_return done(BuildResult::AlreadyValid, assertPathValidity()); } - state = &DerivationGoal::closureRepaired; - return {result}; + (co_await waitForGoals(dependencies.releaseAsArray())).value(); + co_return co_await closureRepaired(); } catch (...) { - return {std::current_exception()}; + co_return result::failure(std::current_exception()); } -kj::Promise<Result<Goal::WorkResult>> DerivationGoal::closureRepaired(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DerivationGoal::closureRepaired() noexcept try { trace("closure repaired"); if (nrFailed > 0) @@ -529,14 +518,14 @@ try { } -kj::Promise<Result<Goal::WorkResult>> DerivationGoal::inputsRealised(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DerivationGoal::inputsRealised() noexcept try { trace("all inputs realised"); if (nrFailed != 0) { if (!useDerivation) throw Error("some dependencies of '%s' are missing", worker.store.printStorePath(drvPath)); - return {done( + co_return done( BuildResult::DependencyFailed, {}, Error( @@ -544,12 +533,12 @@ try { nrFailed, worker.store.printStorePath(drvPath) ) - )}; + ); } if (retrySubstitution == RetrySubstitution::YesNeed) { retrySubstitution = RetrySubstitution::AlreadyRetried; - return haveDerivation(inBuildSlot); + co_return co_await haveDerivation(); } /* Gather information necessary for computing the closure and/or @@ -611,11 +600,12 @@ try { worker.store.printStorePath(pathResolved), }); - resolvedDrvGoal = worker.goalFactory().makeDerivationGoal( + auto dependency = worker.goalFactory().makeDerivationGoal( pathResolved, wantedOutputs, buildMode); + resolvedDrvGoal = dependency.first; - state = &DerivationGoal::resolvedFinished; - return {WaitForGoals{{resolvedDrvGoal}}}; + (co_await waitForGoals(std::move(dependency))).value(); + co_return co_await resolvedFinished(); } std::function<void(const StorePath &, const DerivedPathMap<StringSet>::ChildNode &)> accumInputPaths; @@ -679,10 +669,9 @@ try { /* Okay, try to build. Note that here we don't wait for a build slot to become available, since we don't need one if there is a build hook. */ - state = &DerivationGoal::tryToBuild; - return tryToBuild(inBuildSlot); + co_return co_await tryToBuild(); } catch (...) { - return {std::current_exception()}; + co_return result::failure(std::current_exception()); } void DerivationGoal::started() @@ -698,8 +687,9 @@ void DerivationGoal::started() mcRunningBuilds = worker.runningBuilds.addTemporarily(1); } -kj::Promise<Result<Goal::WorkResult>> DerivationGoal::tryToBuild(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DerivationGoal::tryToBuild() noexcept try { +retry: trace("trying to build"); /* Obtain locks on all output paths, if the paths are known a priori. @@ -733,7 +723,9 @@ try { if (!actLock) actLock = std::make_unique<Activity>(*logger, lvlWarn, actBuildWaiting, fmt("waiting for lock on %s", Magenta(showPaths(lockFiles)))); - return {WaitForAWhile{}}; + (co_await waitForAWhile()).value(); + // we can loop very often, and `co_return co_await` always allocates a new frame + goto retry; } actLock.reset(); @@ -750,7 +742,7 @@ try { if (buildMode != bmCheck && allValid) { debug("skipping build of derivation '%s', someone beat us to it", worker.store.printStorePath(drvPath)); outputLocks.setDeletion(true); - return {done(BuildResult::AlreadyValid, std::move(validOutputs))}; + co_return done(BuildResult::AlreadyValid, std::move(validOutputs)); } /* If any of the outputs already exist but are not valid, delete @@ -770,47 +762,56 @@ try { && settings.maxBuildJobs.get() != 0; if (!buildLocally) { - auto hookReply = tryBuildHook(inBuildSlot); - auto result = std::visit( - overloaded{ - [&](HookReply::Accept & a) -> std::optional<WorkResult> { - /* Yes, it has started doing so. Wait until we get - EOF from the hook. */ - actLock.reset(); - buildResult.startTime = time(0); // inexact - state = &DerivationGoal::buildDone; - started(); - return WaitForWorld{std::move(a.fds), false}; - }, - [&](HookReply::Postpone) -> std::optional<WorkResult> { - /* Not now; wait until at least one child finishes or - the wake-up timeout expires. */ - if (!actLock) - actLock = std::make_unique<Activity>(*logger, lvlTalkative, actBuildWaiting, - fmt("waiting for a machine to build '%s'", Magenta(worker.store.printStorePath(drvPath)))); - outputLocks.unlock(); - return WaitForAWhile{}; - }, - [&](HookReply::Decline) -> std::optional<WorkResult> { - /* We should do it ourselves. */ - return std::nullopt; - }, - }, - hookReply); - if (result) { - return {std::move(*result)}; + auto hookReply = tryBuildHook(); + switch (hookReply.index()) { + case 0: { + HookReply::Accept & a = std::get<0>(hookReply); + /* Yes, it has started doing so. Wait until we get + EOF from the hook. */ + actLock.reset(); + buildResult.startTime = time(0); // inexact + started(); + auto r = co_await a.promise; + if (r.has_value()) { + co_return co_await buildDone(); + } else if (r.has_error()) { + co_return r.assume_error(); + } else { + co_return r.assume_exception(); + } + } + + case 1: { + HookReply::Decline _ [[gnu::unused]] = std::get<1>(hookReply); + break; + } + + case 2: { + HookReply::Postpone _ [[gnu::unused]] = std::get<2>(hookReply); + /* Not now; wait until at least one child finishes or + the wake-up timeout expires. */ + if (!actLock) + actLock = std::make_unique<Activity>(*logger, lvlTalkative, actBuildWaiting, + fmt("waiting for a machine to build '%s'", Magenta(worker.store.printStorePath(drvPath)))); + outputLocks.unlock(); + (co_await waitForAWhile()).value(); + goto retry; + } + + default: + // can't static_assert this because HookReply *subclasses* variant and std::variant_size breaks + assert(false && "unexpected hook reply"); } } actLock.reset(); - state = &DerivationGoal::tryLocalBuild; - return tryLocalBuild(inBuildSlot); + co_return co_await tryLocalBuild(); } catch (...) { - return {std::current_exception()}; + co_return result::failure(std::current_exception()); } -kj::Promise<Result<Goal::WorkResult>> DerivationGoal::tryLocalBuild(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DerivationGoal::tryLocalBuild() noexcept try { throw Error( "unable to build with a primary store that isn't a local store; " @@ -973,10 +974,11 @@ void runPostBuildHook( proc.getStdout()->drainInto(sink); } -kj::Promise<Result<Goal::WorkResult>> DerivationGoal::buildDone(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DerivationGoal::buildDone() noexcept try { trace("build done"); + slotToken = {}; Finally releaseBuildUser([&](){ this->cleanupHookFinally(); }); cleanupPreChildKill(); @@ -992,9 +994,6 @@ try { buildResult.timesBuilt++; buildResult.stopTime = time(0); - /* So the child is gone now. */ - worker.childTerminated(this); - /* Close the read side of the logger pipe. */ closeReadPipes(); @@ -1095,7 +1094,7 @@ try { return {std::current_exception()}; } -kj::Promise<Result<Goal::WorkResult>> DerivationGoal::resolvedFinished(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DerivationGoal::resolvedFinished() noexcept try { trace("resolved derivation finished"); @@ -1168,7 +1167,7 @@ try { return {std::current_exception()}; } -HookReply DerivationGoal::tryBuildHook(bool inBuildSlot) +HookReply DerivationGoal::tryBuildHook() { if (!worker.hook.available || !useDerivation) return HookReply::Decline{}; @@ -1180,7 +1179,7 @@ HookReply DerivationGoal::tryBuildHook(bool inBuildSlot) /* Send the request to the hook. */ worker.hook.instance->sink << "try" - << (inBuildSlot ? 1 : 0) + << (slotToken.valid() ? 1 : 0) << drv->platform << worker.store.printStorePath(drvPath) << parsedDrv->getRequiredSystemFeatures(); @@ -1266,12 +1265,8 @@ HookReply DerivationGoal::tryBuildHook(bool inBuildSlot) /* Create the log file and pipe. */ Path logFile = openLogFile(); - std::set<int> fds; - fds.insert(hook->fromHook.get()); - fds.insert(hook->builderOut.get()); builderOutFD = &hook->builderOut; - - return HookReply::Accept{std::move(fds)}; + return HookReply::Accept{handleChildOutput()}; } @@ -1331,23 +1326,69 @@ void DerivationGoal::closeLogFile() } -Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data) +Goal::Finished DerivationGoal::tooMuchLogs() { - assert(builderOutFD); + killChild(); + return done( + BuildResult::LogLimitExceeded, {}, + Error("%s killed after writing more than %d bytes of log output", + getName(), settings.maxLogSize)); +} - auto tooMuchLogs = [&] { - killChild(); - return done( - BuildResult::LogLimitExceeded, {}, - Error("%s killed after writing more than %d bytes of log output", - getName(), settings.maxLogSize)); - }; +struct DerivationGoal::InputStream final : private kj::AsyncObject +{ + int fd; + kj::UnixEventPort::FdObserver observer; + + InputStream(kj::UnixEventPort & ep, int fd) + : fd(fd) + , observer(ep, fd, kj::UnixEventPort::FdObserver::OBSERVE_READ) + { + int flags = fcntl(fd, F_GETFL); + if (flags < 0) { + throw SysError("fcntl(F_GETFL) failed on fd %i", fd); + } + if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) { + throw SysError("fcntl(F_SETFL) failed on fd %i", fd); + } + } + + kj::Promise<std::string_view> read(kj::ArrayPtr<char> buffer) + { + const auto res = ::read(fd, buffer.begin(), buffer.size()); + // closing a pty endpoint causes EIO on the other endpoint. stock kj streams + // do not handle this and throw exceptions we can't ask for errno instead :( + // (we can't use `errno` either because kj may well have mangled it by now.) + if (res == 0 || (res == -1 && errno == EIO)) { + return std::string_view{}; + } + + KJ_NONBLOCKING_SYSCALL(res) {} + + if (res > 0) { + return std::string_view{buffer.begin(), static_cast<size_t>(res)}; + } + + return observer.whenBecomesReadable().then([this, buffer] { + return read(buffer); + }); + } +}; + +kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::handleBuilderOutput(InputStream & in) noexcept +try { + auto buf = kj::heapArray<char>(4096); + while (true) { + auto data = co_await in.read(buf); + lastChildActivity = worker.aio.provider->getTimer().now(); + + if (data.empty()) { + co_return result::success(); + } - // local & `ssh://`-builds are dealt with here. - if (fd == builderOutFD->get()) { logSize += data.size(); if (settings.maxLogSize && logSize > settings.maxLogSize) { - return tooMuchLogs(); + co_return tooMuchLogs(); } for (auto c : data) @@ -1362,10 +1403,22 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data } if (logSink) (*logSink)(data); - return StillAlive{}; } +} catch (...) { + co_return std::current_exception(); +} + +kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::handleHookOutput(InputStream & in) noexcept +try { + auto buf = kj::heapArray<char>(4096); + while (true) { + auto data = co_await in.read(buf); + lastChildActivity = worker.aio.provider->getTimer().now(); + + if (data.empty()) { + co_return result::success(); + } - if (hook && fd == hook->fromHook.get()) { for (auto c : data) if (c == '\n') { auto json = parseJSONMessage(currentHookLine); @@ -1381,7 +1434,7 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data (fields.size() > 0 ? fields[0].get<std::string>() : "") + "\n"; logSize += logLine.size(); if (settings.maxLogSize && logSize > settings.maxLogSize) { - return tooMuchLogs(); + co_return tooMuchLogs(); } (*logSink)(logLine); } else if (type == resSetPhase && ! fields.is_null()) { @@ -1405,16 +1458,83 @@ Goal::WorkResult DerivationGoal::handleChildOutput(int fd, std::string_view data } else currentHookLine += c; } - - return StillAlive{}; +} catch (...) { + co_return std::current_exception(); } +kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::handleChildOutput() noexcept +try { + assert(builderOutFD); + + auto builderIn = kj::heap<InputStream>(worker.aio.unixEventPort, builderOutFD->get()); + kj::Own<InputStream> hookIn; + if (hook) { + hookIn = kj::heap<InputStream>(worker.aio.unixEventPort, hook->fromHook.get()); + } + + auto handlers = handleChildStreams(*builderIn, hookIn.get()).attach(std::move(builderIn), std::move(hookIn)); -void DerivationGoal::handleEOF(int fd) + if (respectsTimeouts() && settings.buildTimeout != 0) { + handlers = handlers.exclusiveJoin( + worker.aio.provider->getTimer() + .afterDelay(settings.buildTimeout.get() * kj::SECONDS) + .then([this]() -> Outcome<void, Finished> { + return timedOut( + Error("%1% timed out after %2% seconds", name, settings.buildTimeout) + ); + }) + ); + } + + return handlers.then([this](auto r) -> Outcome<void, Finished> { + if (!currentLogLine.empty()) flushLine(); + return r; + }); +} catch (...) { + return {std::current_exception()}; +} + +kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::monitorForSilence() noexcept { - if (!currentLogLine.empty()) flushLine(); + while (true) { + const auto stash = lastChildActivity; + auto waitUntil = lastChildActivity + settings.maxSilentTime.get() * kj::SECONDS; + co_await worker.aio.provider->getTimer().atTime(waitUntil); + if (lastChildActivity == stash) { + co_return timedOut( + Error("%1% timed out after %2% seconds of silence", name, settings.maxSilentTime) + ); + } + } } +kj::Promise<Outcome<void, Goal::Finished>> +DerivationGoal::handleChildStreams(InputStream & builderIn, InputStream * hookIn) noexcept +{ + lastChildActivity = worker.aio.provider->getTimer().now(); + + auto handlers = kj::joinPromisesFailFast([&] { + kj::Vector<kj::Promise<Outcome<void, Finished>>> parts{2}; + + parts.add(handleBuilderOutput(builderIn)); + if (hookIn) { + parts.add(handleHookOutput(*hookIn)); + } + + return parts.releaseAsArray(); + }()); + + if (respectsTimeouts() && settings.maxSilentTime != 0) { + handlers = handlers.exclusiveJoin(monitorForSilence().then([](auto r) { + return kj::arr(std::move(r)); + })); + } + + for (auto r : co_await handlers) { + BOOST_OUTCOME_CO_TRYV(r); + } + co_return result::success(); +} void DerivationGoal::flushLine() { @@ -1629,5 +1749,4 @@ void DerivationGoal::waiteeDone(GoalPtr waitee) } } } - } diff --git a/src/libstore/build/derivation-goal.hh b/src/libstore/build/derivation-goal.hh index 020388d5a..7505409c0 100644 --- a/src/libstore/build/derivation-goal.hh +++ b/src/libstore/build/derivation-goal.hh @@ -8,6 +8,7 @@ #include "store-api.hh" #include "pathlocks.hh" #include "goal.hh" +#include <kj/time.h> namespace nix { @@ -17,7 +18,7 @@ struct HookInstance; struct HookReplyBase { struct [[nodiscard]] Accept { - std::set<int> fds; + kj::Promise<Outcome<void, Goal::Finished>> promise; }; struct [[nodiscard]] Decline {}; struct [[nodiscard]] Postpone {}; @@ -70,6 +71,8 @@ struct InitialOutput { */ struct DerivationGoal : public Goal { + struct InputStream; + /** * Whether to use an on-disk .drv file. */ @@ -213,9 +216,6 @@ struct DerivationGoal : public Goal */ std::optional<DerivationType> derivationType; - typedef kj::Promise<Result<WorkResult>> (DerivationGoal::*GoalState)(bool inBuildSlot) noexcept; - GoalState state; - BuildMode buildMode; NotifyingCounter<uint64_t>::Bump mcExpectedBuilds, mcRunningBuilds; @@ -242,11 +242,9 @@ struct DerivationGoal : public Goal BuildMode buildMode = bmNormal); virtual ~DerivationGoal() noexcept(false); - Finished timedOut(Error && ex) override; + Finished timedOut(Error && ex); - std::string key() override; - - kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept override; + kj::Promise<Result<WorkResult>> work() noexcept override; /** * Add wanted outputs to an already existing derivation goal. @@ -256,23 +254,23 @@ struct DerivationGoal : public Goal /** * The states. */ - kj::Promise<Result<WorkResult>> getDerivation(bool inBuildSlot) noexcept; - kj::Promise<Result<WorkResult>> loadDerivation(bool inBuildSlot) noexcept; - kj::Promise<Result<WorkResult>> haveDerivation(bool inBuildSlot) noexcept; - kj::Promise<Result<WorkResult>> outputsSubstitutionTried(bool inBuildSlot) noexcept; - kj::Promise<Result<WorkResult>> gaveUpOnSubstitution(bool inBuildSlot) noexcept; - kj::Promise<Result<WorkResult>> closureRepaired(bool inBuildSlot) noexcept; - kj::Promise<Result<WorkResult>> inputsRealised(bool inBuildSlot) noexcept; - kj::Promise<Result<WorkResult>> tryToBuild(bool inBuildSlot) noexcept; - virtual kj::Promise<Result<WorkResult>> tryLocalBuild(bool inBuildSlot) noexcept; - kj::Promise<Result<WorkResult>> buildDone(bool inBuildSlot) noexcept; + kj::Promise<Result<WorkResult>> getDerivation() noexcept; + kj::Promise<Result<WorkResult>> loadDerivation() noexcept; + kj::Promise<Result<WorkResult>> haveDerivation() noexcept; + kj::Promise<Result<WorkResult>> outputsSubstitutionTried() noexcept; + kj::Promise<Result<WorkResult>> gaveUpOnSubstitution() noexcept; + kj::Promise<Result<WorkResult>> closureRepaired() noexcept; + kj::Promise<Result<WorkResult>> inputsRealised() noexcept; + kj::Promise<Result<WorkResult>> tryToBuild() noexcept; + virtual kj::Promise<Result<WorkResult>> tryLocalBuild() noexcept; + kj::Promise<Result<WorkResult>> buildDone() noexcept; - kj::Promise<Result<WorkResult>> resolvedFinished(bool inBuildSlot) noexcept; + kj::Promise<Result<WorkResult>> resolvedFinished() noexcept; /** * Is the build hook willing to perform the build? */ - HookReply tryBuildHook(bool inBuildSlot); + HookReply tryBuildHook(); virtual int getChildStatus(); @@ -312,13 +310,19 @@ struct DerivationGoal : public Goal virtual void cleanupPostOutputsRegisteredModeCheck(); virtual void cleanupPostOutputsRegisteredModeNonCheck(); - /** - * Callback used by the worker to write to the log. - */ - WorkResult handleChildOutput(int fd, std::string_view data) override; - void handleEOF(int fd) override; +protected: + kj::TimePoint lastChildActivity = kj::minValue; + + kj::Promise<Outcome<void, Finished>> handleChildOutput() noexcept; + kj::Promise<Outcome<void, Finished>> + handleChildStreams(InputStream & builderIn, InputStream * hookIn) noexcept; + kj::Promise<Outcome<void, Finished>> handleBuilderOutput(InputStream & in) noexcept; + kj::Promise<Outcome<void, Finished>> handleHookOutput(InputStream & in) noexcept; + kj::Promise<Outcome<void, Finished>> monitorForSilence() noexcept; + Finished tooMuchLogs(); void flushLine(); +public: /** * Wrappers around the corresponding Store methods that first consult the * derivation. This is currently needed because when there is no drv file @@ -357,6 +361,11 @@ struct DerivationGoal : public Goal void waiteeDone(GoalPtr waitee) override; + virtual bool respectsTimeouts() + { + return false; + } + StorePathSet exportReferences(const StorePathSet & storePaths); JobCategory jobCategory() const override { diff --git a/src/libstore/build/drv-output-substitution-goal.cc b/src/libstore/build/drv-output-substitution-goal.cc index 7986123cc..9ffa33d7b 100644 --- a/src/libstore/build/drv-output-substitution-goal.cc +++ b/src/libstore/build/drv-output-substitution-goal.cc @@ -4,6 +4,9 @@ #include "worker.hh" #include "substitution-goal.hh" #include "signals.hh" +#include <kj/array.h> +#include <kj/async.h> +#include <kj/vector.h> namespace nix { @@ -16,33 +19,32 @@ DrvOutputSubstitutionGoal::DrvOutputSubstitutionGoal( : Goal(worker, isDependency) , id(id) { - state = &DrvOutputSubstitutionGoal::init; name = fmt("substitution of '%s'", id.to_string()); trace("created"); } -kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::init(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::work() noexcept try { trace("init"); /* If the derivation already exists, we’re done */ if (worker.store.queryRealisation(id)) { - return {Finished{ecSuccess, std::move(buildResult)}}; + co_return Finished{ecSuccess, std::move(buildResult)}; } subs = settings.useSubstitutes ? getDefaultSubstituters() : std::list<ref<Store>>(); - return tryNext(inBuildSlot); + co_return co_await tryNext(); } catch (...) { - return {std::current_exception()}; + co_return result::failure(std::current_exception()); } -kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::tryNext(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::tryNext() noexcept try { trace("trying next substituter"); - if (!inBuildSlot) { - return {WaitForSlot{}}; + if (!slotToken.valid()) { + slotToken = co_await worker.substitutions.acquire(); } maintainRunningSubstitutions = worker.runningSubstitutions.addTemporarily(1); @@ -59,7 +61,7 @@ try { /* Hack: don't indicate failure if there were no substituters. In that case the calling derivation should just do a build. */ - return {Finished{substituterFailed ? ecFailed : ecNoSubstituters, std::move(buildResult)}}; + co_return Finished{substituterFailed ? ecFailed : ecNoSubstituters, std::move(buildResult)}; } sub = subs.front(); @@ -69,25 +71,26 @@ try { some other error occurs), so it must not touch `this`. So put the shared state in a separate refcounted object. */ downloadState = std::make_shared<DownloadState>(); - downloadState->outPipe.create(); + auto pipe = kj::newPromiseAndCrossThreadFulfiller<void>(); + downloadState->outPipe = kj::mv(pipe.fulfiller); downloadState->result = std::async(std::launch::async, [downloadState{downloadState}, id{id}, sub{sub}] { + Finally updateStats([&]() { downloadState->outPipe->fulfill(); }); ReceiveInterrupts receiveInterrupts; - Finally updateStats([&]() { downloadState->outPipe.writeSide.close(); }); return sub->queryRealisation(id); }); - state = &DrvOutputSubstitutionGoal::realisationFetched; - return {WaitForWorld{{downloadState->outPipe.readSide.get()}, true}}; + co_await pipe.promise; + co_return co_await realisationFetched(); } catch (...) { - return {std::current_exception()}; + co_return result::failure(std::current_exception()); } -kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::realisationFetched(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::realisationFetched() noexcept try { - worker.childTerminated(this); maintainRunningSubstitutions.reset(); + slotToken = {}; try { outputInfo = downloadState->result.get(); @@ -97,10 +100,10 @@ try { } if (!outputInfo) { - return tryNext(inBuildSlot); + co_return co_await tryNext(); } - WaitForGoals result; + kj::Vector<std::pair<GoalPtr, kj::Promise<void>>> dependencies; for (const auto & [depId, depPath] : outputInfo->dependentRealisations) { if (depId != id) { if (auto localOutputInfo = worker.store.queryRealisation(depId); @@ -114,25 +117,23 @@ try { worker.store.printStorePath(localOutputInfo->outPath), worker.store.printStorePath(depPath) ); - return tryNext(inBuildSlot); + co_return co_await tryNext(); } - result.goals.insert(worker.goalFactory().makeDrvOutputSubstitutionGoal(depId)); + dependencies.add(worker.goalFactory().makeDrvOutputSubstitutionGoal(depId)); } } - result.goals.insert(worker.goalFactory().makePathSubstitutionGoal(outputInfo->outPath)); + dependencies.add(worker.goalFactory().makePathSubstitutionGoal(outputInfo->outPath)); - if (result.goals.empty()) { - return outPathValid(inBuildSlot); - } else { - state = &DrvOutputSubstitutionGoal::outPathValid; - return {std::move(result)}; + if (!dependencies.empty()) { + (co_await waitForGoals(dependencies.releaseAsArray())).value(); } + co_return co_await outPathValid(); } catch (...) { - return {std::current_exception()}; + co_return result::failure(std::current_exception()); } -kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::outPathValid(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::outPathValid() noexcept try { assert(outputInfo); trace("output path substituted"); @@ -159,17 +160,4 @@ try { return {std::current_exception()}; } -std::string DrvOutputSubstitutionGoal::key() -{ - /* "a$" ensures substitution goals happen before derivation - goals. */ - return "a$" + std::string(id.to_string()); -} - -kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::work(bool inBuildSlot) noexcept -{ - return (this->*state)(inBuildSlot); -} - - } diff --git a/src/libstore/build/drv-output-substitution-goal.hh b/src/libstore/build/drv-output-substitution-goal.hh index f33196665..86020705e 100644 --- a/src/libstore/build/drv-output-substitution-goal.hh +++ b/src/libstore/build/drv-output-substitution-goal.hh @@ -45,7 +45,7 @@ class DrvOutputSubstitutionGoal : public Goal { struct DownloadState { - Pipe outPipe; + kj::Own<kj::CrossThreadPromiseFulfiller<void>> outPipe; std::future<std::shared_ptr<const Realisation>> result; }; @@ -65,20 +65,12 @@ public: std::optional<ContentAddress> ca = std::nullopt ); - typedef kj::Promise<Result<WorkResult>> (DrvOutputSubstitutionGoal::*GoalState)(bool inBuildSlot) noexcept; - GoalState state; - - kj::Promise<Result<WorkResult>> init(bool inBuildSlot) noexcept; - kj::Promise<Result<WorkResult>> tryNext(bool inBuildSlot) noexcept; - kj::Promise<Result<WorkResult>> realisationFetched(bool inBuildSlot) noexcept; - kj::Promise<Result<WorkResult>> outPathValid(bool inBuildSlot) noexcept; + kj::Promise<Result<WorkResult>> tryNext() noexcept; + kj::Promise<Result<WorkResult>> realisationFetched() noexcept; + kj::Promise<Result<WorkResult>> outPathValid() noexcept; kj::Promise<Result<WorkResult>> finished() noexcept; - Finished timedOut(Error && ex) override { abort(); }; - - std::string key() override; - - kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept override; + kj::Promise<Result<WorkResult>> work() noexcept override; JobCategory jobCategory() const override { return JobCategory::Substitution; diff --git a/src/libstore/build/entry-points.cc b/src/libstore/build/entry-points.cc index a0f18a02c..27c341295 100644 --- a/src/libstore/build/entry-points.cc +++ b/src/libstore/build/entry-points.cc @@ -17,9 +17,9 @@ void Store::buildPaths(const std::vector<DerivedPath> & reqs, BuildMode buildMod Worker worker(*this, evalStore ? *evalStore : *this, aio); auto goals = runWorker(worker, [&](GoalFactory & gf) { - Goals goals; + Worker::Targets goals; for (auto & br : reqs) - goals.insert(gf.makeGoal(br, buildMode)); + goals.emplace(gf.makeGoal(br, buildMode)); return goals; }); @@ -60,11 +60,11 @@ std::vector<KeyedBuildResult> Store::buildPathsWithResults( std::vector<std::pair<const DerivedPath &, GoalPtr>> state; auto goals = runWorker(worker, [&](GoalFactory & gf) { - Goals goals; + Worker::Targets goals; for (const auto & req : reqs) { auto goal = gf.makeGoal(req, buildMode); - goals.insert(goal); - state.push_back({req, goal}); + state.push_back({req, goal.first}); + goals.emplace(std::move(goal)); } return goals; }); @@ -84,8 +84,10 @@ BuildResult Store::buildDerivation(const StorePath & drvPath, const BasicDerivat Worker worker(*this, *this, aio); try { - auto goals = runWorker(worker, [&](GoalFactory & gf) -> Goals { - return Goals{gf.makeBasicDerivationGoal(drvPath, drv, OutputsSpec::All{}, buildMode)}; + auto goals = runWorker(worker, [&](GoalFactory & gf) { + Worker::Targets goals; + goals.emplace(gf.makeBasicDerivationGoal(drvPath, drv, OutputsSpec::All{}, buildMode)); + return goals; }); auto goal = *goals.begin(); return goal->buildResult.restrictTo(DerivedPath::Built { @@ -110,7 +112,9 @@ void Store::ensurePath(const StorePath & path) Worker worker(*this, *this, aio); auto goals = runWorker(worker, [&](GoalFactory & gf) { - return Goals{gf.makePathSubstitutionGoal(path)}; + Worker::Targets goals; + goals.emplace(gf.makePathSubstitutionGoal(path)); + return goals; }); auto goal = *goals.begin(); @@ -130,7 +134,9 @@ void Store::repairPath(const StorePath & path) Worker worker(*this, *this, aio); auto goals = runWorker(worker, [&](GoalFactory & gf) { - return Goals{gf.makePathSubstitutionGoal(path, Repair)}; + Worker::Targets goals; + goals.emplace(gf.makePathSubstitutionGoal(path, Repair)); + return goals; }); auto goal = *goals.begin(); @@ -140,14 +146,16 @@ void Store::repairPath(const StorePath & path) auto info = queryPathInfo(path); if (info->deriver && isValidPath(*info->deriver)) { worker.run([&](GoalFactory & gf) { - return Goals{gf.makeGoal( + Worker::Targets goals; + goals.emplace(gf.makeGoal( DerivedPath::Built{ .drvPath = makeConstantStorePathRef(*info->deriver), // FIXME: Should just build the specific output we need. .outputs = OutputsSpec::All{}, }, bmRepair - )}; + )); + return goals; }); } else throw Error(worker.failingExitStatus(), "cannot repair path '%s'", printStorePath(path)); diff --git a/src/libstore/build/goal.cc b/src/libstore/build/goal.cc index 82861ad2b..cfdb6717c 100644 --- a/src/libstore/build/goal.cc +++ b/src/libstore/build/goal.cc @@ -1,18 +1,57 @@ #include "goal.hh" +#include "async-collect.hh" +#include "worker.hh" +#include <kj/time.h> namespace nix { -bool CompareGoalPtrs::operator() (const GoalPtr & a, const GoalPtr & b) const { - std::string s1 = a->key(); - std::string s2 = b->key(); - return s1 < s2; -} - - void Goal::trace(std::string_view s) { debug("%1%: %2%", name, s); } +kj::Promise<Result<Goal::WorkResult>> Goal::waitForAWhile() +try { + trace("wait for a while"); + /* If we are polling goals that are waiting for a lock, then wake + up after a few seconds at most. */ + co_await worker.aio.provider->getTimer().afterDelay(settings.pollInterval.get() * kj::SECONDS); + co_return StillAlive{}; +} catch (...) { + co_return std::current_exception(); +} + +kj::Promise<Result<Goal::WorkResult>> +Goal::waitForGoals(kj::Array<std::pair<GoalPtr, kj::Promise<void>>> dependencies) noexcept +try { + auto left = dependencies.size(); + for (auto & [dep, p] : dependencies) { + p = p.then([this, dep, &left] { + left--; + trace(fmt("waitee '%s' done; %d left", dep->name, left)); + + if (dep->exitCode != Goal::ecSuccess) ++nrFailed; + if (dep->exitCode == Goal::ecNoSubstituters) ++nrNoSubstituters; + if (dep->exitCode == Goal::ecIncompleteClosure) ++nrIncompleteClosure; + }).eagerlyEvaluate(nullptr); + } + + auto collectDeps = asyncCollect(std::move(dependencies)); + + while (auto item = co_await collectDeps.next()) { + auto & dep = *item; + + waiteeDone(dep); + + if (dep->exitCode == ecFailed && !settings.keepGoing) { + co_return result::success(StillAlive{}); + } + } + + co_return result::success(StillAlive{}); +} catch (...) { + co_return result::failure(std::current_exception()); +} + } diff --git a/src/libstore/build/goal.hh b/src/libstore/build/goal.hh index 189505308..17c3d85db 100644 --- a/src/libstore/build/goal.hh +++ b/src/libstore/build/goal.hh @@ -1,10 +1,12 @@ #pragma once ///@file +#include "async-semaphore.hh" #include "result.hh" #include "types.hh" #include "store-api.hh" #include "build-result.hh" +#include <concepts> // IWYU pragma: keep #include <kj/async.h> namespace nix { @@ -19,22 +21,11 @@ class Worker; * A pointer to a goal. */ typedef std::shared_ptr<Goal> GoalPtr; -typedef std::weak_ptr<Goal> WeakGoalPtr; - -struct CompareGoalPtrs { - bool operator() (const GoalPtr & a, const GoalPtr & b) const; -}; /** * Set of goals. */ -typedef std::set<GoalPtr, CompareGoalPtrs> Goals; -typedef std::set<WeakGoalPtr, std::owner_less<WeakGoalPtr>> WeakGoals; - -/** - * A map of paths to goals (and the other way around). - */ -typedef std::map<StorePath, WeakGoalPtr> WeakGoalMap; +typedef std::set<GoalPtr> Goals; /** * Used as a hint to the worker on how to schedule a particular goal. For example, @@ -70,17 +61,6 @@ struct Goal const bool isDependency; /** - * Goals that this goal is waiting for. - */ - Goals waitees; - - /** - * Goals waiting for this one to finish. Must use weak pointers - * here to prevent cycles. - */ - WeakGoals waiters; - - /** * Number of goals we are/were waiting for that have failed. */ size_t nrFailed = 0; @@ -112,19 +92,17 @@ struct Goal */ BuildResult buildResult; + // for use by Worker only. will go away once work() is a promise. + kj::Own<kj::PromiseFulfiller<void>> notify; + +protected: + AsyncSemaphore::Token slotToken; + public: + struct Finished; + struct [[nodiscard]] StillAlive {}; - struct [[nodiscard]] WaitForSlot {}; - struct [[nodiscard]] WaitForAWhile {}; - struct [[nodiscard]] ContinueImmediately {}; - struct [[nodiscard]] WaitForGoals { - Goals goals; - }; - struct [[nodiscard]] WaitForWorld { - std::set<int> fds; - bool inBuildSlot; - }; struct [[nodiscard]] Finished { ExitCode exitCode; BuildResult result; @@ -137,17 +115,26 @@ public: struct [[nodiscard]] WorkResult : std::variant< StillAlive, - WaitForSlot, - WaitForAWhile, - ContinueImmediately, - WaitForGoals, - WaitForWorld, Finished> { WorkResult() = delete; using variant::variant; }; +protected: + kj::Promise<Result<WorkResult>> waitForAWhile(); + kj::Promise<Result<WorkResult>> + waitForGoals(kj::Array<std::pair<GoalPtr, kj::Promise<void>>> dependencies) noexcept; + + template<std::derived_from<Goal>... G> + kj::Promise<Result<Goal::WorkResult>> + waitForGoals(std::pair<std::shared_ptr<G>, kj::Promise<void>>... goals) noexcept + { + return waitForGoals(kj::arrOf<std::pair<GoalPtr, kj::Promise<void>>>(std::move(goals)...)); + } + +public: + /** * Exception containing an error message, if any. */ @@ -163,24 +150,10 @@ public: trace("goal destroyed"); } - virtual kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept = 0; + virtual kj::Promise<Result<WorkResult>> work() noexcept = 0; virtual void waiteeDone(GoalPtr waitee) { } - virtual WorkResult handleChildOutput(int fd, std::string_view data) - { - abort(); - } - - virtual void handleEOF(int fd) - { - } - - virtual bool respectsTimeouts() - { - return false; - } - void trace(std::string_view s); std::string getName() const @@ -188,15 +161,6 @@ public: return name; } - /** - * Callback in case of a timeout. It should wake up its waiters, - * get rid of any running child processes that are being monitored - * by the worker (important!), etc. - */ - virtual Finished timedOut(Error && ex) = 0; - - virtual std::string key() = 0; - virtual void cleanup() { } /** diff --git a/src/libstore/build/local-derivation-goal.cc b/src/libstore/build/local-derivation-goal.cc index 4baa525d9..f3d0bc8b4 100644 --- a/src/libstore/build/local-derivation-goal.cc +++ b/src/libstore/build/local-derivation-goal.cc @@ -121,8 +121,6 @@ LocalStore & LocalDerivationGoal::getLocalStore() void LocalDerivationGoal::killChild() { if (pid) { - worker.childTerminated(this); - /* If we're using a build user, then there is a tricky race condition: if we kill the build user before the child has done its setuid() to the build user uid, then it won't be @@ -149,17 +147,18 @@ void LocalDerivationGoal::killSandbox(bool getStats) } -kj::Promise<Result<Goal::WorkResult>> LocalDerivationGoal::tryLocalBuild(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> LocalDerivationGoal::tryLocalBuild() noexcept try { +retry: #if __APPLE__ additionalSandboxProfile = parsedDrv->getStringAttr("__sandboxProfile").value_or(""); #endif - if (!inBuildSlot) { - state = &DerivationGoal::tryToBuild; + if (!slotToken.valid()) { outputLocks.unlock(); - if (0U != settings.maxBuildJobs) { - return {WaitForSlot{}}; + if (worker.localBuilds.capacity() > 0) { + slotToken = co_await worker.localBuilds.acquire(); + co_return co_await tryToBuild(); } if (getMachines().empty()) { throw Error( @@ -214,7 +213,9 @@ try { if (!actLock) actLock = std::make_unique<Activity>(*logger, lvlWarn, actBuildWaiting, fmt("waiting for a free build user ID for '%s'", Magenta(worker.store.printStorePath(drvPath)))); - return {WaitForAWhile{}}; + (co_await waitForAWhile()).value(); + // we can loop very often, and `co_return co_await` always allocates a new frame + goto retry; } } @@ -243,24 +244,29 @@ try { try { /* Okay, we have to build. */ - auto fds = startBuilder(); - - /* This state will be reached when we get EOF on the child's - log pipe. */ - state = &DerivationGoal::buildDone; + auto promise = startBuilder(); started(); - return {WaitForWorld{std::move(fds), true}}; + auto r = co_await promise; + if (r.has_value()) { + // all good so far + } else if (r.has_error()) { + co_return r.assume_error(); + } else { + co_return r.assume_exception(); + } } catch (BuildError & e) { outputLocks.unlock(); buildUser.reset(); auto report = done(BuildResult::InputRejected, {}, std::move(e)); report.permanentFailure = true; - return {std::move(report)}; + co_return report; } + + co_return co_await buildDone(); } catch (...) { - return {std::current_exception()}; + co_return result::failure(std::current_exception()); } @@ -390,7 +396,9 @@ void LocalDerivationGoal::cleanupPostOutputsRegisteredModeNonCheck() cleanupPostOutputsRegisteredModeCheck(); } -std::set<int> LocalDerivationGoal::startBuilder() +// NOTE this one isn't noexcept because it's called from places that expect +// exceptions to signal failure to launch. we should change this some time. +kj::Promise<Outcome<void, Goal::Finished>> LocalDerivationGoal::startBuilder() { if ((buildUser && buildUser->getUIDCount() != 1) #if __linux__ @@ -779,7 +787,7 @@ std::set<int> LocalDerivationGoal::startBuilder() msgs.push_back(std::move(msg)); } - return {builderOutPTY.get()}; + return handleChildOutput(); } @@ -1361,13 +1369,20 @@ void LocalDerivationGoal::runChild() bool setUser = true; - /* Make the contents of netrc available to builtin:fetchurl - (which may run under a different uid and/or in a sandbox). */ + /* Make the contents of netrc and the CA certificate bundle + available to builtin:fetchurl (which may run under a + different uid and/or in a sandbox). */ std::string netrcData; - try { - if (drv->isBuiltin() && drv->builder == "builtin:fetchurl" && !derivationType->isSandboxed()) + std::string caFileData; + if (drv->isBuiltin() && drv->builder == "builtin:fetchurl" && !derivationType->isSandboxed()) { + try { netrcData = readFile(settings.netrcFile); - } catch (SysError &) { } + } catch (SysError &) { } + + try { + caFileData = readFile(settings.caFile); + } catch (SysError &) { } + } #if __linux__ if (useChroot) { @@ -1802,7 +1817,7 @@ void LocalDerivationGoal::runChild() e.second = rewriteStrings(e.second, inputRewrites); if (drv->builder == "builtin:fetchurl") - builtinFetchurl(drv2, netrcData); + builtinFetchurl(drv2, netrcData, caFileData); else if (drv->builder == "builtin:buildenv") builtinBuildenv(drv2); else if (drv->builder == "builtin:unpack-channel") diff --git a/src/libstore/build/local-derivation-goal.hh b/src/libstore/build/local-derivation-goal.hh index cd040bc15..44bcd2ffe 100644 --- a/src/libstore/build/local-derivation-goal.hh +++ b/src/libstore/build/local-derivation-goal.hh @@ -182,7 +182,7 @@ struct LocalDerivationGoal : public DerivationGoal * Create a LocalDerivationGoal without an on-disk .drv file, * possibly a platform-specific subclass */ - static std::shared_ptr<LocalDerivationGoal> makeLocalDerivationGoal( + static std::unique_ptr<LocalDerivationGoal> makeLocalDerivationGoal( const StorePath & drvPath, const OutputsSpec & wantedOutputs, Worker & worker, @@ -194,7 +194,7 @@ struct LocalDerivationGoal : public DerivationGoal * Create a LocalDerivationGoal for an on-disk .drv file, * possibly a platform-specific subclass */ - static std::shared_ptr<LocalDerivationGoal> makeLocalDerivationGoal( + static std::unique_ptr<LocalDerivationGoal> makeLocalDerivationGoal( const StorePath & drvPath, const BasicDerivation & drv, const OutputsSpec & wantedOutputs, @@ -213,12 +213,12 @@ struct LocalDerivationGoal : public DerivationGoal /** * The additional states. */ - kj::Promise<Result<WorkResult>> tryLocalBuild(bool inBuildSlot) noexcept override; + kj::Promise<Result<WorkResult>> tryLocalBuild() noexcept override; /** * Start building a derivation. */ - std::set<int> startBuilder(); + kj::Promise<Outcome<void, Finished>> startBuilder(); /** * Fill in the environment for the builder. diff --git a/src/libstore/build/substitution-goal.cc b/src/libstore/build/substitution-goal.cc index bd0ffcb9b..8088bf668 100644 --- a/src/libstore/build/substitution-goal.cc +++ b/src/libstore/build/substitution-goal.cc @@ -3,6 +3,8 @@ #include "nar-info.hh" #include "signals.hh" #include "finally.hh" +#include <kj/array.h> +#include <kj/vector.h> namespace nix { @@ -18,7 +20,6 @@ PathSubstitutionGoal::PathSubstitutionGoal( , repair(repair) , ca(ca) { - state = &PathSubstitutionGoal::init; name = fmt("substitution of '%s'", worker.store.printStorePath(this->storePath)); trace("created"); maintainExpectedSubstitutions = worker.expectedSubstitutions.addTemporarily(1); @@ -45,13 +46,7 @@ Goal::Finished PathSubstitutionGoal::done( } -kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::work(bool inBuildSlot) noexcept -{ - return (this->*state)(inBuildSlot); -} - - -kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::init(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::work() noexcept try { trace("init"); @@ -67,13 +62,13 @@ try { subs = settings.useSubstitutes ? getDefaultSubstituters() : std::list<ref<Store>>(); - return tryNext(inBuildSlot); + return tryNext(); } catch (...) { return {std::current_exception()}; } -kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::tryNext(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::tryNext() noexcept try { trace("trying next substituter"); @@ -89,10 +84,10 @@ try { /* Hack: don't indicate failure if there were no substituters. In that case the calling derivation should just do a build. */ - return {done( + co_return done( substituterFailed ? ecFailed : ecNoSubstituters, BuildResult::NoSubstituters, - fmt("path '%s' is required, but there is no substituter that can build it", worker.store.printStorePath(storePath)))}; + fmt("path '%s' is required, but there is no substituter that can build it", worker.store.printStorePath(storePath))); } sub = subs.front(); @@ -105,26 +100,28 @@ try { if (sub->storeDir == worker.store.storeDir) assert(subPath == storePath); } else if (sub->storeDir != worker.store.storeDir) { - return tryNext(inBuildSlot); + co_return co_await tryNext(); } - try { - // FIXME: make async - info = sub->queryPathInfo(subPath ? *subPath : storePath); - } catch (InvalidPath &) { - return tryNext(inBuildSlot); - } catch (SubstituterDisabled &) { - if (settings.tryFallback) { - return tryNext(inBuildSlot); - } - throw; - } catch (Error & e) { - if (settings.tryFallback) { - logError(e.info()); - return tryNext(inBuildSlot); + do { + try { + // FIXME: make async + info = sub->queryPathInfo(subPath ? *subPath : storePath); + break; + } catch (InvalidPath &) { + } catch (SubstituterDisabled &) { + if (!settings.tryFallback) { + throw; + } + } catch (Error & e) { + if (settings.tryFallback) { + logError(e.info()); + } else { + throw; + } } - throw; - } + co_return co_await tryNext(); + } while (false); if (info->path != storePath) { if (info->isContentAddressed(*sub) && info->references.empty()) { @@ -134,7 +131,7 @@ try { } else { printError("asked '%s' for '%s' but got '%s'", sub->getUri(), worker.store.printStorePath(storePath), sub->printStorePath(info->path)); - return tryNext(inBuildSlot); + co_return co_await tryNext(); } } @@ -155,28 +152,26 @@ try { { warn("ignoring substitute for '%s' from '%s', as it's not signed by any of the keys in 'trusted-public-keys'", worker.store.printStorePath(storePath), sub->getUri()); - return tryNext(inBuildSlot); + co_return co_await tryNext(); } /* To maintain the closure invariant, we first have to realise the paths referenced by this one. */ - WaitForGoals result; + kj::Vector<std::pair<GoalPtr, kj::Promise<void>>> dependencies; for (auto & i : info->references) if (i != storePath) /* ignore self-references */ - result.goals.insert(worker.goalFactory().makePathSubstitutionGoal(i)); + dependencies.add(worker.goalFactory().makePathSubstitutionGoal(i)); - if (result.goals.empty()) {/* to prevent hang (no wake-up event) */ - return referencesValid(inBuildSlot); - } else { - state = &PathSubstitutionGoal::referencesValid; - return {std::move(result)}; + if (!dependencies.empty()) {/* to prevent hang (no wake-up event) */ + (co_await waitForGoals(dependencies.releaseAsArray())).value(); } + co_return co_await referencesValid(); } catch (...) { - return {std::current_exception()}; + co_return result::failure(std::current_exception()); } -kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::referencesValid(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::referencesValid() noexcept try { trace("all references realised"); @@ -191,33 +186,33 @@ try { if (i != storePath) /* ignore self-references */ assert(worker.store.isValidPath(i)); - state = &PathSubstitutionGoal::tryToRun; - return tryToRun(inBuildSlot); + return tryToRun(); } catch (...) { return {std::current_exception()}; } -kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::tryToRun(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::tryToRun() noexcept try { trace("trying to run"); - if (!inBuildSlot) { - return {WaitForSlot{}}; + if (!slotToken.valid()) { + slotToken = co_await worker.substitutions.acquire(); } maintainRunningSubstitutions = worker.runningSubstitutions.addTemporarily(1); - outPipe.create(); + auto pipe = kj::newPromiseAndCrossThreadFulfiller<void>(); + outPipe = kj::mv(pipe.fulfiller); thr = std::async(std::launch::async, [this]() { + /* Wake up the worker loop when we're done. */ + Finally updateStats([this]() { outPipe->fulfill(); }); + auto & fetchPath = subPath ? *subPath : storePath; try { ReceiveInterrupts receiveInterrupts; - /* Wake up the worker loop when we're done. */ - Finally updateStats([this]() { outPipe.writeSide.close(); }); - Activity act(*logger, actSubstitute, Logger::Fields{worker.store.printStorePath(storePath), sub->getUri()}); PushActivity pact(act.id); @@ -233,39 +228,39 @@ try { } }); - state = &PathSubstitutionGoal::finished; - return {WaitForWorld{{outPipe.readSide.get()}, true}}; + co_await pipe.promise; + co_return co_await finished(); } catch (...) { - return {std::current_exception()}; + co_return result::failure(std::current_exception()); } -kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::finished(bool inBuildSlot) noexcept +kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::finished() noexcept try { trace("substitute finished"); - worker.childTerminated(this); - - try { - thr.get(); - } catch (std::exception & e) { - printError(e.what()); - - /* Cause the parent build to fail unless --fallback is given, - or the substitute has disappeared. The latter case behaves - the same as the substitute never having existed in the - first place. */ + do { try { - throw; - } catch (SubstituteGone &) { - } catch (...) { - substituterFailed = true; + slotToken = {}; + thr.get(); + break; + } catch (std::exception & e) { + printError(e.what()); + + /* Cause the parent build to fail unless --fallback is given, + or the substitute has disappeared. The latter case behaves + the same as the substitute never having existed in the + first place. */ + try { + throw; + } catch (SubstituteGone &) { + } catch (...) { + substituterFailed = true; + } } - /* Try the next substitute. */ - state = &PathSubstitutionGoal::tryNext; - return tryNext(inBuildSlot); - } + co_return co_await tryNext(); + } while (false); worker.markContentsGood(storePath); @@ -282,15 +277,9 @@ try { worker.doneNarSize += maintainExpectedNar.delta(); maintainExpectedNar.reset(); - return {done(ecSuccess, BuildResult::Substituted)}; + co_return done(ecSuccess, BuildResult::Substituted); } catch (...) { - return {std::current_exception()}; -} - - -Goal::WorkResult PathSubstitutionGoal::handleChildOutput(int fd, std::string_view data) -{ - return StillAlive{}; + co_return result::failure(std::current_exception()); } @@ -300,10 +289,7 @@ void PathSubstitutionGoal::cleanup() if (thr.valid()) { // FIXME: signal worker thread to quit. thr.get(); - worker.childTerminated(this); } - - outPipe.close(); } catch (...) { ignoreException(); } diff --git a/src/libstore/build/substitution-goal.hh b/src/libstore/build/substitution-goal.hh index 3c97b19fd..dc701bcba 100644 --- a/src/libstore/build/substitution-goal.hh +++ b/src/libstore/build/substitution-goal.hh @@ -46,7 +46,7 @@ struct PathSubstitutionGoal : public Goal /** * Pipe for the substituter's standard output. */ - Pipe outPipe; + kj::Own<kj::CrossThreadPromiseFulfiller<void>> outPipe; /** * The substituter thread. @@ -67,9 +67,6 @@ struct PathSubstitutionGoal : public Goal NotifyingCounter<uint64_t>::Bump maintainExpectedSubstitutions, maintainRunningSubstitutions, maintainExpectedNar, maintainExpectedDownload; - typedef kj::Promise<Result<WorkResult>> (PathSubstitutionGoal::*GoalState)(bool inBuildSlot) noexcept; - GoalState state; - /** * Content address for recomputing store path */ @@ -90,32 +87,15 @@ public: ); ~PathSubstitutionGoal(); - Finished timedOut(Error && ex) override { abort(); }; - - /** - * We prepend "a$" to the key name to ensure substitution goals - * happen before derivation goals. - */ - std::string key() override - { - return "a$" + std::string(storePath.name()) + "$" + worker.store.printStorePath(storePath); - } - - kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept override; + kj::Promise<Result<WorkResult>> work() noexcept override; /** * The states. */ - kj::Promise<Result<WorkResult>> init(bool inBuildSlot) noexcept; - kj::Promise<Result<WorkResult>> tryNext(bool inBuildSlot) noexcept; - kj::Promise<Result<WorkResult>> referencesValid(bool inBuildSlot) noexcept; - kj::Promise<Result<WorkResult>> tryToRun(bool inBuildSlot) noexcept; - kj::Promise<Result<WorkResult>> finished(bool inBuildSlot) noexcept; - - /** - * Callback used by the worker to write to the log. - */ - WorkResult handleChildOutput(int fd, std::string_view data) override; + kj::Promise<Result<WorkResult>> tryNext() noexcept; + kj::Promise<Result<WorkResult>> referencesValid() noexcept; + kj::Promise<Result<WorkResult>> tryToRun() noexcept; + kj::Promise<Result<WorkResult>> finished() noexcept; /* Called by destructor, can't be overridden */ void cleanup() override final; diff --git a/src/libstore/build/worker.cc b/src/libstore/build/worker.cc index ee45c7e3f..5be706e42 100644 --- a/src/libstore/build/worker.cc +++ b/src/libstore/build/worker.cc @@ -7,10 +7,19 @@ #include "signals.hh" #include "hook-instance.hh" // IWYU pragma: keep -#include <poll.h> - namespace nix { +namespace { +struct ErrorHandler : kj::TaskSet::ErrorHandler +{ + void taskFailed(kj::Exception && e) override + { + printError("unexpected async failure in Worker: %s", kj::str(e).cStr()); + abort(); + } +} errorHandler; +} + Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio) : act(*logger, actRealise) , actDerivations(*logger, actBuilds) @@ -18,11 +27,13 @@ Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio) , store(store) , evalStore(evalStore) , aio(aio) + /* Make sure that we are always allowed to run at least one substitution. + This prevents infinite waiting. */ + , substitutions(std::max<unsigned>(1, settings.maxSubstitutionJobs)) + , localBuilds(settings.maxBuildJobs) + , children(errorHandler) { /* Debugging: prevent recursive workers. */ - nrLocalBuilds = 0; - nrSubstitutions = 0; - lastWokenUp = steady_time_point::min(); } @@ -33,6 +44,7 @@ Worker::~Worker() are in trouble, since goals may call childTerminated() etc. in their destructors). */ topGoals.clear(); + children.clear(); assert(expectedSubstitutions == 0); assert(expectedDownloadSize == 0); @@ -40,111 +52,128 @@ Worker::~Worker() } -std::shared_ptr<DerivationGoal> Worker::makeDerivationGoalCommon( - const StorePath & drvPath, - const OutputsSpec & wantedOutputs, - std::function<std::shared_ptr<DerivationGoal>()> mkDrvGoal) +template<typename ID, std::derived_from<Goal> G> +std::pair<std::shared_ptr<G>, kj::Promise<void>> Worker::makeGoalCommon( + std::map<ID, CachedGoal<G>> & map, + const ID & key, + InvocableR<std::unique_ptr<G>> auto create, + std::invocable<G &> auto modify +) { - std::weak_ptr<DerivationGoal> & goal_weak = derivationGoals[drvPath]; - std::shared_ptr<DerivationGoal> goal = goal_weak.lock(); + auto [it, _inserted] = map.try_emplace(key); + auto & goal_weak = it->second; + auto goal = goal_weak.goal.lock(); if (!goal) { - goal = mkDrvGoal(); - goal_weak = goal; - wakeUp(goal); + goal = create(); + goal->notify = std::move(goal_weak.fulfiller); + goal_weak.goal = goal; + // do not start working immediately, this round of the event loop + // may have more calls to this function lined up that'll also run + // modify(). starting early can then cause the goals to misbehave + childStarted(goal, kj::evalLater([goal] { return goal->work(); })); } else { - goal->addWantedOutputs(wantedOutputs); + modify(*goal); } - return goal; + return {goal, goal_weak.promise->addBranch()}; } -std::shared_ptr<DerivationGoal> Worker::makeDerivationGoal(const StorePath & drvPath, - const OutputsSpec & wantedOutputs, BuildMode buildMode) +std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<void>> Worker::makeDerivationGoal( + const StorePath & drvPath, const OutputsSpec & wantedOutputs, BuildMode buildMode +) { - return makeDerivationGoalCommon( + return makeGoalCommon( + derivationGoals, drvPath, - wantedOutputs, - [&]() -> std::shared_ptr<DerivationGoal> { + [&]() -> std::unique_ptr<DerivationGoal> { return !dynamic_cast<LocalStore *>(&store) - ? std::make_shared<DerivationGoal>( + ? std::make_unique<DerivationGoal>( drvPath, wantedOutputs, *this, running, buildMode ) : LocalDerivationGoal::makeLocalDerivationGoal( drvPath, wantedOutputs, *this, running, buildMode ); - } + }, + [&](DerivationGoal & g) { g.addWantedOutputs(wantedOutputs); } ); } -std::shared_ptr<DerivationGoal> Worker::makeBasicDerivationGoal(const StorePath & drvPath, - const BasicDerivation & drv, const OutputsSpec & wantedOutputs, BuildMode buildMode) +std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<void>> Worker::makeBasicDerivationGoal( + const StorePath & drvPath, + const BasicDerivation & drv, + const OutputsSpec & wantedOutputs, + BuildMode buildMode +) { - return makeDerivationGoalCommon( + return makeGoalCommon( + derivationGoals, drvPath, - wantedOutputs, - [&]() -> std::shared_ptr<DerivationGoal> { + [&]() -> std::unique_ptr<DerivationGoal> { return !dynamic_cast<LocalStore *>(&store) - ? std::make_shared<DerivationGoal>( + ? std::make_unique<DerivationGoal>( drvPath, drv, wantedOutputs, *this, running, buildMode ) : LocalDerivationGoal::makeLocalDerivationGoal( drvPath, drv, wantedOutputs, *this, running, buildMode ); - } + }, + [&](DerivationGoal & g) { g.addWantedOutputs(wantedOutputs); } ); } -std::shared_ptr<PathSubstitutionGoal> Worker::makePathSubstitutionGoal(const StorePath & path, RepairFlag repair, std::optional<ContentAddress> ca) +std::pair<std::shared_ptr<PathSubstitutionGoal>, kj::Promise<void>> +Worker::makePathSubstitutionGoal( + const StorePath & path, RepairFlag repair, std::optional<ContentAddress> ca +) { - std::weak_ptr<PathSubstitutionGoal> & goal_weak = substitutionGoals[path]; - auto goal = goal_weak.lock(); // FIXME - if (!goal) { - goal = std::make_shared<PathSubstitutionGoal>(path, *this, running, repair, ca); - goal_weak = goal; - wakeUp(goal); - } - return goal; + return makeGoalCommon( + substitutionGoals, + path, + [&] { return std::make_unique<PathSubstitutionGoal>(path, *this, running, repair, ca); }, + [&](auto &) {} + ); } -std::shared_ptr<DrvOutputSubstitutionGoal> Worker::makeDrvOutputSubstitutionGoal(const DrvOutput& id, RepairFlag repair, std::optional<ContentAddress> ca) +std::pair<std::shared_ptr<DrvOutputSubstitutionGoal>, kj::Promise<void>> +Worker::makeDrvOutputSubstitutionGoal( + const DrvOutput & id, RepairFlag repair, std::optional<ContentAddress> ca +) { - std::weak_ptr<DrvOutputSubstitutionGoal> & goal_weak = drvOutputSubstitutionGoals[id]; - auto goal = goal_weak.lock(); // FIXME - if (!goal) { - goal = std::make_shared<DrvOutputSubstitutionGoal>(id, *this, running, repair, ca); - goal_weak = goal; - wakeUp(goal); - } - return goal; + return makeGoalCommon( + drvOutputSubstitutionGoals, + id, + [&] { return std::make_unique<DrvOutputSubstitutionGoal>(id, *this, running, repair, ca); }, + [&](auto &) {} + ); } -GoalPtr Worker::makeGoal(const DerivedPath & req, BuildMode buildMode) +std::pair<GoalPtr, kj::Promise<void>> Worker::makeGoal(const DerivedPath & req, BuildMode buildMode) { return std::visit(overloaded { - [&](const DerivedPath::Built & bfd) -> GoalPtr { + [&](const DerivedPath::Built & bfd) -> std::pair<GoalPtr, kj::Promise<void>> { if (auto bop = std::get_if<DerivedPath::Opaque>(&*bfd.drvPath)) return makeDerivationGoal(bop->path, bfd.outputs, buildMode); else throw UnimplementedError("Building dynamic derivations in one shot is not yet implemented."); }, - [&](const DerivedPath::Opaque & bo) -> GoalPtr { + [&](const DerivedPath::Opaque & bo) -> std::pair<GoalPtr, kj::Promise<void>> { return makePathSubstitutionGoal(bo.path, buildMode == bmRepair ? Repair : NoRepair); }, }, req.raw()); } -template<typename K, typename G> -static void removeGoal(std::shared_ptr<G> goal, std::map<K, std::weak_ptr<G>> & goalMap) +template<typename G> +static void removeGoal(std::shared_ptr<G> goal, auto & goalMap) { /* !!! inefficient */ for (auto i = goalMap.begin(); i != goalMap.end(); ) - if (i->second.lock() == goal) { + if (i->second.goal.lock() == goal) { auto j = i; ++j; goalMap.erase(i); i = j; @@ -165,33 +194,8 @@ void Worker::goalFinished(GoalPtr goal, Goal::Finished & f) hashMismatch |= f.hashMismatch; checkMismatch |= f.checkMismatch; - for (auto & i : goal->waiters) { - if (GoalPtr waiting = i.lock()) { - assert(waiting->waitees.count(goal)); - waiting->waitees.erase(goal); - - waiting->trace(fmt("waitee '%s' done; %d left", goal->name, waiting->waitees.size())); - - if (f.exitCode != Goal::ecSuccess) ++waiting->nrFailed; - if (f.exitCode == Goal::ecNoSubstituters) ++waiting->nrNoSubstituters; - if (f.exitCode == Goal::ecIncompleteClosure) ++waiting->nrIncompleteClosure; - - if (waiting->waitees.empty() || (f.exitCode == Goal::ecFailed && !settings.keepGoing)) { - /* If we failed and keepGoing is not set, we remove all - remaining waitees. */ - for (auto & i : waiting->waitees) { - i->waiters.extract(waiting); - } - waiting->waitees.clear(); - - wakeUp(waiting); - } - - waiting->waiteeDone(goal); - } - } - goal->waiters.clear(); removeGoal(goal); + goal->notify->fulfill(); goal->cleanup(); } @@ -199,21 +203,14 @@ void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how) { std::visit( overloaded{ - [&](Goal::StillAlive) {}, - [&](Goal::WaitForSlot) { waitForBuildSlot(goal); }, - [&](Goal::WaitForAWhile) { waitForAWhile(goal); }, - [&](Goal::ContinueImmediately) { wakeUp(goal); }, - [&](Goal::WaitForGoals & w) { - for (auto & dep : w.goals) { - goal->waitees.insert(dep); - dep->waiters.insert(goal); - } + [&](Goal::StillAlive) { + childStarted(goal, kj::evalLater([goal] { return goal->work(); })); }, - [&](Goal::WaitForWorld & w) { childStarted(goal, w.fds, w.inBuildSlot); }, [&](Goal::Finished & f) { goalFinished(goal, f); }, }, how ); + updateStatistics(); } void Worker::removeGoal(GoalPtr goal) @@ -237,95 +234,37 @@ void Worker::removeGoal(GoalPtr goal) } -void Worker::wakeUp(GoalPtr goal) +void Worker::childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> promise) { - goal->trace("woken up"); - awake.insert(goal); -} - - -void Worker::childStarted(GoalPtr goal, const std::set<int> & fds, - bool inBuildSlot) -{ - Child child; - child.goal = goal; - child.goal2 = goal.get(); - child.fds = fds; - child.timeStarted = child.lastOutput = steady_time_point::clock::now(); - child.inBuildSlot = inBuildSlot; - children.emplace_back(child); - if (inBuildSlot) { - switch (goal->jobCategory()) { - case JobCategory::Substitution: - nrSubstitutions++; - break; - case JobCategory::Build: - nrLocalBuilds++; - break; - default: - abort(); - } - } + children.add(promise + .then([this, goal](auto result) { + if (result.has_value()) { + handleWorkResult(goal, std::move(result.assume_value())); + } else { + childException = result.assume_error(); + } + }) + .attach(Finally{[this, goal] { + childTerminated(goal); + }})); } -void Worker::childTerminated(Goal * goal) +void Worker::childTerminated(GoalPtr goal) { - auto i = std::find_if(children.begin(), children.end(), - [&](const Child & child) { return child.goal2 == goal; }); - if (i == children.end()) return; - - if (i->inBuildSlot) { - switch (goal->jobCategory()) { - case JobCategory::Substitution: - assert(nrSubstitutions > 0); - nrSubstitutions--; - break; - case JobCategory::Build: - assert(nrLocalBuilds > 0); - nrLocalBuilds--; - break; - default: - abort(); - } - } - - children.erase(i); - - /* Wake up goals waiting for a build slot. */ - for (auto & j : wantingToBuild) { - GoalPtr goal = j.lock(); - if (goal) wakeUp(goal); + if (childFinished) { + childFinished->fulfill(); } - - wantingToBuild.clear(); } -void Worker::waitForBuildSlot(GoalPtr goal) -{ - goal->trace("wait for build slot"); - bool isSubstitutionGoal = goal->jobCategory() == JobCategory::Substitution; - if ((!isSubstitutionGoal && nrLocalBuilds < settings.maxBuildJobs) || - (isSubstitutionGoal && nrSubstitutions < settings.maxSubstitutionJobs)) - wakeUp(goal); /* we can do it right away */ - else - wantingToBuild.insert(goal); -} +kj::Promise<Result<void>> Worker::updateStatistics() +try { + while (true) { + statisticsUpdateInhibitor = co_await statisticsUpdateSignal.acquire(); - -void Worker::waitForAWhile(GoalPtr goal) -{ - debug("wait for a while"); - waitingForAWhile.insert(goal); -} - - -void Worker::updateStatistics() -{ - // only update progress info while running. this notably excludes updating - // progress info while destroying, which causes the progress bar to assert - if (running && statisticsOutdated) { + // only update progress info while running. this notably excludes updating + // progress info while destroying, which causes the progress bar to assert actDerivations.progress( doneBuilds, expectedBuilds + doneBuilds, runningBuilds, failedBuilds ); @@ -338,11 +277,14 @@ void Worker::updateStatistics() act.setExpected(actFileTransfer, expectedDownloadSize + doneDownloadSize); act.setExpected(actCopyPath, expectedNarSize + doneNarSize); - statisticsOutdated = false; + // limit to 50fps. that should be more than good enough for anything we do + co_await aio.provider->getTimer().afterDelay(20 * kj::MILLISECONDS); } +} catch (...) { + co_return result::failure(std::current_exception()); } -Goals Worker::run(std::function<Goals (GoalFactory &)> req) +std::vector<GoalPtr> Worker::run(std::function<Targets (GoalFactory &)> req) { auto _topGoals = req(goalFactory()); @@ -350,209 +292,77 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req) running = true; Finally const _stop([&] { running = false; }); - updateStatistics(); + topGoals.clear(); + for (auto & [goal, _promise] : _topGoals) { + topGoals.insert(goal); + } + + auto promise = runImpl().exclusiveJoin(updateStatistics()); + + // TODO GC interface? + if (auto localStore = dynamic_cast<LocalStore *>(&store); localStore && settings.minFree != 0) { + // Periodically wake up to see if we need to run the garbage collector. + promise = promise.exclusiveJoin(boopGC(*localStore)); + } + + promise.wait(aio.waitScope).value(); - topGoals = _topGoals; + std::vector<GoalPtr> results; + for (auto & [i, _p] : _topGoals) { + results.push_back(i); + } + return results; +} +kj::Promise<Result<void>> Worker::runImpl() +try { debug("entered goal loop"); while (1) { checkInterrupt(); - // TODO GC interface? - if (auto localStore = dynamic_cast<LocalStore *>(&store)) - localStore->autoGC(false); - - /* Call every wake goal (in the ordering established by - CompareGoalPtrs). */ - while (!awake.empty() && !topGoals.empty()) { - Goals awake2; - for (auto & i : awake) { - GoalPtr goal = i.lock(); - if (goal) awake2.insert(goal); - } - awake.clear(); - for (auto & goal : awake2) { - checkInterrupt(); - /* Make sure that we are always allowed to run at least one substitution. - This prevents infinite waiting. */ - const bool inSlot = goal->jobCategory() == JobCategory::Substitution - ? nrSubstitutions < std::max(1U, (unsigned int) settings.maxSubstitutionJobs) - : nrLocalBuilds < settings.maxBuildJobs; - handleWorkResult(goal, goal->work(inSlot).wait(aio.waitScope).value()); - updateStatistics(); - - if (topGoals.empty()) break; // stuff may have been cancelled - } - } - if (topGoals.empty()) break; /* Wait for input. */ - if (!children.empty() || !waitingForAWhile.empty()) - waitForInput(); - else { - assert(!awake.empty()); + if (!children.isEmpty()) + (co_await waitForInput()).value(); + + if (childException) { + std::rethrow_exception(childException); } } /* If --keep-going is not set, it's possible that the main goal exited while some of its subgoals were still active. But if --keep-going *is* set, then they must all be finished now. */ - assert(!settings.keepGoing || awake.empty()); - assert(!settings.keepGoing || wantingToBuild.empty()); - assert(!settings.keepGoing || children.empty()); + assert(!settings.keepGoing || children.isEmpty()); - return _topGoals; + co_return result::success(); +} catch (...) { + co_return result::failure(std::current_exception()); } -void Worker::waitForInput() -{ - printMsg(lvlVomit, "waiting for children"); - - /* Process output from the file descriptors attached to the - children, namely log output and output path creation commands. - We also use this to detect child termination: if we get EOF on - the logger pipe of a build, we assume that the builder has - terminated. */ - - bool useTimeout = false; - long timeout = 0; - auto before = steady_time_point::clock::now(); - - /* If we're monitoring for silence on stdout/stderr, or if there - is a build timeout, then wait for input until the first - deadline for any child. */ - auto nearest = steady_time_point::max(); // nearest deadline - if (settings.minFree.get() != 0) - // Periodicallty wake up to see if we need to run the garbage collector. - nearest = before + std::chrono::seconds(10); - for (auto & i : children) { - if (auto goal = i.goal.lock()) { - if (!goal->respectsTimeouts()) continue; - if (0 != settings.maxSilentTime) - nearest = std::min(nearest, i.lastOutput + std::chrono::seconds(settings.maxSilentTime)); - if (0 != settings.buildTimeout) - nearest = std::min(nearest, i.timeStarted + std::chrono::seconds(settings.buildTimeout)); - } - } - if (nearest != steady_time_point::max()) { - timeout = std::max(1L, (long) std::chrono::duration_cast<std::chrono::seconds>(nearest - before).count()); - useTimeout = true; - } - - /* If we are polling goals that are waiting for a lock, then wake - up after a few seconds at most. */ - if (!waitingForAWhile.empty()) { - useTimeout = true; - if (lastWokenUp == steady_time_point::min() || lastWokenUp > before) lastWokenUp = before; - timeout = std::max(1L, - (long) std::chrono::duration_cast<std::chrono::seconds>( - lastWokenUp + std::chrono::seconds(settings.pollInterval) - before).count()); - } else lastWokenUp = steady_time_point::min(); - - if (useTimeout) - vomit("sleeping %d seconds", timeout); - - /* Use select() to wait for the input side of any logger pipe to - become `available'. Note that `available' (i.e., non-blocking) - includes EOF. */ - std::vector<struct pollfd> pollStatus; - std::map<int, size_t> fdToPollStatus; - for (auto & i : children) { - for (auto & j : i.fds) { - pollStatus.push_back((struct pollfd) { .fd = j, .events = POLLIN }); - fdToPollStatus[j] = pollStatus.size() - 1; - } - } - - if (poll(pollStatus.data(), pollStatus.size(), - useTimeout ? timeout * 1000 : -1) == -1) { - if (errno == EINTR) return; - throw SysError("waiting for input"); +kj::Promise<Result<void>> Worker::boopGC(LocalStore & localStore) +try { + while (true) { + co_await aio.provider->getTimer().afterDelay(10 * kj::SECONDS); + localStore.autoGC(false); } +} catch (...) { + co_return result::failure(std::current_exception()); +} - auto after = steady_time_point::clock::now(); - - /* Process all available file descriptors. FIXME: this is - O(children * fds). */ - decltype(children)::iterator i; - for (auto j = children.begin(); j != children.end(); j = i) { - i = std::next(j); - - checkInterrupt(); - - GoalPtr goal = j->goal.lock(); - assert(goal); - - if (!goal->exitCode.has_value() && - 0 != settings.maxSilentTime && - goal->respectsTimeouts() && - after - j->lastOutput >= std::chrono::seconds(settings.maxSilentTime)) - { - handleWorkResult( - goal, - goal->timedOut(Error( - "%1% timed out after %2% seconds of silence", - goal->getName(), - settings.maxSilentTime - )) - ); - continue; - } - - else if (!goal->exitCode.has_value() && - 0 != settings.buildTimeout && - goal->respectsTimeouts() && - after - j->timeStarted >= std::chrono::seconds(settings.buildTimeout)) - { - handleWorkResult( - goal, - goal->timedOut( - Error("%1% timed out after %2% seconds", goal->getName(), settings.buildTimeout) - ) - ); - continue; - } - - std::set<int> fds2(j->fds); - std::vector<unsigned char> buffer(4096); - for (auto & k : fds2) { - const auto fdPollStatusId = get(fdToPollStatus, k); - assert(fdPollStatusId); - assert(*fdPollStatusId < pollStatus.size()); - if (pollStatus.at(*fdPollStatusId).revents) { - ssize_t rd = ::read(k, buffer.data(), buffer.size()); - // FIXME: is there a cleaner way to handle pt close - // than EIO? Is this even standard? - if (rd == 0 || (rd == -1 && errno == EIO)) { - debug("%1%: got EOF", goal->getName()); - goal->handleEOF(k); - handleWorkResult(goal, Goal::ContinueImmediately{}); - j->fds.erase(k); - } else if (rd == -1) { - if (errno != EINTR) - throw SysError("%s: read failed", goal->getName()); - } else { - printMsg(lvlVomit, "%1%: read %2% bytes", - goal->getName(), rd); - std::string_view data(charptr_cast<char *>(buffer.data()), rd); - j->lastOutput = after; - handleWorkResult(goal, goal->handleChildOutput(k, data)); - } - } - } - } +kj::Promise<Result<void>> Worker::waitForInput() +try { + printMsg(lvlVomit, "waiting for children"); - if (!waitingForAWhile.empty() && lastWokenUp + std::chrono::seconds(settings.pollInterval) <= after) { - lastWokenUp = after; - for (auto & i : waitingForAWhile) { - GoalPtr goal = i.lock(); - if (goal) wakeUp(goal); - } - waitingForAWhile.clear(); - } + auto pair = kj::newPromiseAndFulfiller<void>(); + this->childFinished = kj::mv(pair.fulfiller); + co_await pair.promise; + co_return result::success(); +} catch (...) { + co_return result::failure(std::current_exception()); } diff --git a/src/libstore/build/worker.hh b/src/libstore/build/worker.hh index 6735ea0b9..d6cde8384 100644 --- a/src/libstore/build/worker.hh +++ b/src/libstore/build/worker.hh @@ -1,6 +1,8 @@ #pragma once ///@file +#include "async-semaphore.hh" +#include "concepts.hh" #include "notifying-counter.hh" #include "types.hh" #include "lock.hh" @@ -18,37 +20,20 @@ namespace nix { struct DerivationGoal; struct PathSubstitutionGoal; class DrvOutputSubstitutionGoal; +class LocalStore; typedef std::chrono::time_point<std::chrono::steady_clock> steady_time_point; -/** - * A mapping used to remember for each child process to what goal it - * belongs, and file descriptors for receiving log data and output - * path creation commands. - */ -struct Child -{ - WeakGoalPtr goal; - Goal * goal2; // ugly hackery - std::set<int> fds; - bool inBuildSlot; - /** - * Time we last got output on stdout/stderr - */ - steady_time_point lastOutput; - steady_time_point timeStarted; -}; - /* Forward definition. */ struct HookInstance; class GoalFactory { public: - virtual std::shared_ptr<DerivationGoal> makeDerivationGoal( + virtual std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<void>> makeDerivationGoal( const StorePath & drvPath, const OutputsSpec & wantedOutputs, BuildMode buildMode = bmNormal ) = 0; - virtual std::shared_ptr<DerivationGoal> makeBasicDerivationGoal( + virtual std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<void>> makeBasicDerivationGoal( const StorePath & drvPath, const BasicDerivation & drv, const OutputsSpec & wantedOutputs, @@ -58,12 +43,14 @@ public: /** * @ref SubstitutionGoal "substitution goal" */ - virtual std::shared_ptr<PathSubstitutionGoal> makePathSubstitutionGoal( + virtual std::pair<std::shared_ptr<PathSubstitutionGoal>, kj::Promise<void>> + makePathSubstitutionGoal( const StorePath & storePath, RepairFlag repair = NoRepair, std::optional<ContentAddress> ca = std::nullopt ) = 0; - virtual std::shared_ptr<DrvOutputSubstitutionGoal> makeDrvOutputSubstitutionGoal( + virtual std::pair<std::shared_ptr<DrvOutputSubstitutionGoal>, kj::Promise<void>> + makeDrvOutputSubstitutionGoal( const DrvOutput & id, RepairFlag repair = NoRepair, std::optional<ContentAddress> ca = std::nullopt @@ -75,7 +62,8 @@ public: * It will be a `DerivationGoal` for a `DerivedPath::Built` or * a `SubstitutionGoal` for a `DerivedPath::Opaque`. */ - virtual GoalPtr makeGoal(const DerivedPath & req, BuildMode buildMode = bmNormal) = 0; + virtual std::pair<GoalPtr, kj::Promise<void>> + makeGoal(const DerivedPath & req, BuildMode buildMode = bmNormal) = 0; }; // elaborate hoax to let goals access factory methods while hiding them from the public @@ -98,57 +86,32 @@ private: bool running = false; - /* Note: the worker should only have strong pointers to the - top-level goals. */ - /** * The top-level goals of the worker. */ Goals topGoals; - /** - * Goals that are ready to do some work. - */ - WeakGoals awake; - - /** - * Goals waiting for a build slot. - */ - WeakGoals wantingToBuild; - - /** - * Child processes currently running. - */ - std::list<Child> children; - - /** - * Number of build slots occupied. This includes local builds but does not - * include substitutions or remote builds via the build hook. - */ - unsigned int nrLocalBuilds; - - /** - * Number of substitution slots occupied. - */ - unsigned int nrSubstitutions; - + template<typename G> + struct CachedGoal + { + std::weak_ptr<G> goal; + kj::Own<kj::ForkedPromise<void>> promise; + kj::Own<kj::PromiseFulfiller<void>> fulfiller; + + CachedGoal() + { + auto pf = kj::newPromiseAndFulfiller<void>(); + promise = kj::heap(pf.promise.fork()); + fulfiller = std::move(pf.fulfiller); + } + }; /** * Maps used to prevent multiple instantiations of a goal for the * same derivation / path. */ - std::map<StorePath, std::weak_ptr<DerivationGoal>> derivationGoals; - std::map<StorePath, std::weak_ptr<PathSubstitutionGoal>> substitutionGoals; - std::map<DrvOutput, std::weak_ptr<DrvOutputSubstitutionGoal>> drvOutputSubstitutionGoals; - - /** - * Goals sleeping for a few seconds (polling a lock). - */ - WeakGoals waitingForAWhile; - - /** - * Last time the goals in `waitingForAWhile` where woken up. - */ - steady_time_point lastWokenUp; + std::map<StorePath, CachedGoal<DerivationGoal>> derivationGoals; + std::map<StorePath, CachedGoal<PathSubstitutionGoal>> substitutionGoals; + std::map<DrvOutput, CachedGoal<DrvOutputSubstitutionGoal>> drvOutputSubstitutionGoals; /** * Cache for pathContentsGood(). @@ -179,29 +142,12 @@ private: void goalFinished(GoalPtr goal, Goal::Finished & f); void handleWorkResult(GoalPtr goal, Goal::WorkResult how); - /** - * Put `goal` to sleep until a build slot becomes available (which - * might be right away). - */ - void waitForBuildSlot(GoalPtr goal); - - /** - * Wait for a few seconds and then retry this goal. Used when - * waiting for a lock held by another process. This kind of - * polling is inefficient, but POSIX doesn't really provide a way - * to wait for multiple locks in the main select() loop. - */ - void waitForAWhile(GoalPtr goal); - - /** - * Wake up a goal (i.e., there is something for it to do). - */ - void wakeUp(GoalPtr goal); + kj::Own<kj::PromiseFulfiller<void>> childFinished; /** * Wait for input to become available. */ - void waitForInput(); + kj::Promise<Result<void>> waitForInput(); /** * Remove a dead goal. @@ -209,27 +155,34 @@ private: void removeGoal(GoalPtr goal); /** - * Registers a running child process. `inBuildSlot` means that - * the process counts towards the jobs limit. + * Registers a running child process. + */ + void childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> promise); + + /** + * Unregisters a running child process. */ - void childStarted(GoalPtr goal, const std::set<int> & fds, - bool inBuildSlot); + void childTerminated(GoalPtr goal); /** * Pass current stats counters to the logger for progress bar updates. */ - void updateStatistics(); + kj::Promise<Result<void>> updateStatistics(); - bool statisticsOutdated = true; + AsyncSemaphore statisticsUpdateSignal{1}; + std::optional<AsyncSemaphore::Token> statisticsUpdateInhibitor; /** * Mark statistics as outdated, such that `updateStatistics` will be called. */ void updateStatisticsLater() { - statisticsOutdated = true; + statisticsUpdateInhibitor = {}; } + kj::Promise<Result<void>> runImpl(); + kj::Promise<Result<void>> boopGC(LocalStore & localStore); + public: const Activity act; @@ -239,7 +192,13 @@ public: Store & store; Store & evalStore; kj::AsyncIoContext & aio; + AsyncSemaphore substitutions, localBuilds; + +private: + kj::TaskSet children; + std::exception_ptr childException; +public: struct HookState { std::unique_ptr<HookInstance> instance; @@ -277,21 +236,35 @@ public: * @ref DerivationGoal "derivation goal" */ private: - std::shared_ptr<DerivationGoal> makeDerivationGoalCommon( - const StorePath & drvPath, const OutputsSpec & wantedOutputs, - std::function<std::shared_ptr<DerivationGoal>()> mkDrvGoal); - std::shared_ptr<DerivationGoal> makeDerivationGoal( + template<typename ID, std::derived_from<Goal> G> + std::pair<std::shared_ptr<G>, kj::Promise<void>> makeGoalCommon( + std::map<ID, CachedGoal<G>> & map, + const ID & key, + InvocableR<std::unique_ptr<G>> auto create, + std::invocable<G &> auto modify + ); + std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<void>> makeDerivationGoal( const StorePath & drvPath, const OutputsSpec & wantedOutputs, BuildMode buildMode = bmNormal) override; - std::shared_ptr<DerivationGoal> makeBasicDerivationGoal( + std::pair<std::shared_ptr<DerivationGoal>, kj::Promise<void>> makeBasicDerivationGoal( const StorePath & drvPath, const BasicDerivation & drv, const OutputsSpec & wantedOutputs, BuildMode buildMode = bmNormal) override; /** * @ref SubstitutionGoal "substitution goal" */ - std::shared_ptr<PathSubstitutionGoal> makePathSubstitutionGoal(const StorePath & storePath, RepairFlag repair = NoRepair, std::optional<ContentAddress> ca = std::nullopt) override; - std::shared_ptr<DrvOutputSubstitutionGoal> makeDrvOutputSubstitutionGoal(const DrvOutput & id, RepairFlag repair = NoRepair, std::optional<ContentAddress> ca = std::nullopt) override; + std::pair<std::shared_ptr<PathSubstitutionGoal>, kj::Promise<void>> + makePathSubstitutionGoal( + const StorePath & storePath, + RepairFlag repair = NoRepair, + std::optional<ContentAddress> ca = std::nullopt + ) override; + std::pair<std::shared_ptr<DrvOutputSubstitutionGoal>, kj::Promise<void>> + makeDrvOutputSubstitutionGoal( + const DrvOutput & id, + RepairFlag repair = NoRepair, + std::optional<ContentAddress> ca = std::nullopt + ) override; /** * Make a goal corresponding to the `DerivedPath`. @@ -299,18 +272,16 @@ private: * It will be a `DerivationGoal` for a `DerivedPath::Built` or * a `SubstitutionGoal` for a `DerivedPath::Opaque`. */ - GoalPtr makeGoal(const DerivedPath & req, BuildMode buildMode = bmNormal) override; + std::pair<GoalPtr, kj::Promise<void>> + makeGoal(const DerivedPath & req, BuildMode buildMode = bmNormal) override; public: - /** - * Unregisters a running child process. - */ - void childTerminated(Goal * goal); + using Targets = std::map<GoalPtr, kj::Promise<void>>; /** * Loop until the specified top-level goals have finished. */ - Goals run(std::function<Goals (GoalFactory &)> req); + std::vector<GoalPtr> run(std::function<Targets (GoalFactory &)> req); /*** * The exit status in case of failure. diff --git a/src/libstore/builtins.hh b/src/libstore/builtins.hh index d201fb3ac..e20d14b90 100644 --- a/src/libstore/builtins.hh +++ b/src/libstore/builtins.hh @@ -6,7 +6,7 @@ namespace nix { // TODO: make pluggable. -void builtinFetchurl(const BasicDerivation & drv, const std::string & netrcData); +void builtinFetchurl(const BasicDerivation & drv, const std::string & netrcData, const std::string & caFileData); void builtinUnpackChannel(const BasicDerivation & drv); } diff --git a/src/libstore/builtins/fetchurl.cc b/src/libstore/builtins/fetchurl.cc index 062ecdc14..b28eb01d0 100644 --- a/src/libstore/builtins/fetchurl.cc +++ b/src/libstore/builtins/fetchurl.cc @@ -7,7 +7,7 @@ namespace nix { -void builtinFetchurl(const BasicDerivation & drv, const std::string & netrcData) +void builtinFetchurl(const BasicDerivation & drv, const std::string & netrcData, const std::string & caFileData) { /* Make the host's netrc data available. Too bad curl requires this to be stored in a file. It would be nice if we could just @@ -17,6 +17,9 @@ void builtinFetchurl(const BasicDerivation & drv, const std::string & netrcData) writeFile(settings.netrcFile, netrcData, 0600); } + settings.caFile = "ca-certificates.crt"; + writeFile(settings.caFile, caFileData, 0600); + auto getAttr = [&](const std::string & name) { auto i = drv.env.find(name); if (i == drv.env.end()) throw Error("attribute '%s' missing", name); @@ -33,10 +36,7 @@ void builtinFetchurl(const BasicDerivation & drv, const std::string & netrcData) auto fetch = [&](const std::string & url) { - /* No need to do TLS verification, because we check the hash of - the result anyway. */ FileTransferRequest request(url); - request.verifyTLS = false; auto raw = fileTransfer->download(std::move(request)); auto decompressor = makeDecompressionSource( diff --git a/src/libstore/globals.cc b/src/libstore/globals.cc index ffc2543ef..f43b759d2 100644 --- a/src/libstore/globals.cc +++ b/src/libstore/globals.cc @@ -443,7 +443,7 @@ static bool initLibStoreDone = false; void assertLibStoreInitialized() { if (!initLibStoreDone) { printError("The program must call nix::initNix() before calling any libstore library functions."); - abort(); + std::terminate(); }; } diff --git a/src/libstore/platform.cc b/src/libstore/platform.cc index f2c023c82..36f8e352a 100644 --- a/src/libstore/platform.cc +++ b/src/libstore/platform.cc @@ -25,7 +25,7 @@ std::shared_ptr<LocalStore> LocalStore::makeLocalStore(const Params & params) #endif } -std::shared_ptr<LocalDerivationGoal> LocalDerivationGoal::makeLocalDerivationGoal( +std::unique_ptr<LocalDerivationGoal> LocalDerivationGoal::makeLocalDerivationGoal( const StorePath & drvPath, const OutputsSpec & wantedOutputs, Worker & worker, @@ -34,17 +34,17 @@ std::shared_ptr<LocalDerivationGoal> LocalDerivationGoal::makeLocalDerivationGoa ) { #if __linux__ - return std::make_shared<LinuxLocalDerivationGoal>(drvPath, wantedOutputs, worker, isDependency, buildMode); + return std::make_unique<LinuxLocalDerivationGoal>(drvPath, wantedOutputs, worker, isDependency, buildMode); #elif __APPLE__ - return std::make_shared<DarwinLocalDerivationGoal>(drvPath, wantedOutputs, worker, isDependency, buildMode); + return std::make_unique<DarwinLocalDerivationGoal>(drvPath, wantedOutputs, worker, isDependency, buildMode); #elif __FreeBSD__ - return std::make_shared<FreeBSDLocalDerivationGoal>(drvPath, wantedOutputs, worker, isDependency, buildMode); + return std::make_unique<FreeBSDLocalDerivationGoal>(drvPath, wantedOutputs, worker, isDependency, buildMode); #else - return std::make_shared<FallbackLocalDerivationGoal>(drvPath, wantedOutputs, worker, isDependency, buildMode); + return std::make_unique<FallbackLocalDerivationGoal>(drvPath, wantedOutputs, worker, isDependency, buildMode); #endif } -std::shared_ptr<LocalDerivationGoal> LocalDerivationGoal::makeLocalDerivationGoal( +std::unique_ptr<LocalDerivationGoal> LocalDerivationGoal::makeLocalDerivationGoal( const StorePath & drvPath, const BasicDerivation & drv, const OutputsSpec & wantedOutputs, @@ -54,19 +54,19 @@ std::shared_ptr<LocalDerivationGoal> LocalDerivationGoal::makeLocalDerivationGoa ) { #if __linux__ - return std::make_shared<LinuxLocalDerivationGoal>( + return std::make_unique<LinuxLocalDerivationGoal>( drvPath, drv, wantedOutputs, worker, isDependency, buildMode ); #elif __APPLE__ - return std::make_shared<DarwinLocalDerivationGoal>( + return std::make_unique<DarwinLocalDerivationGoal>( drvPath, drv, wantedOutputs, worker, isDependency, buildMode ); #elif __FreeBSD__ - return std::make_shared<FreeBSDLocalDerivationGoal>( + return std::make_unique<FreeBSDLocalDerivationGoal>( drvPath, drv, wantedOutputs, worker, isDependency, buildMode ); #else - return std::make_shared<FallbackLocalDerivationGoal>( + return std::make_unique<FallbackLocalDerivationGoal>( drvPath, drv, wantedOutputs, worker, isDependency, buildMode ); #endif diff --git a/src/libutil/async-collect.hh b/src/libutil/async-collect.hh new file mode 100644 index 000000000..9e0b8bad9 --- /dev/null +++ b/src/libutil/async-collect.hh @@ -0,0 +1,101 @@ +#pragma once +/// @file + +#include <kj/async.h> +#include <kj/common.h> +#include <kj/vector.h> +#include <list> +#include <optional> +#include <type_traits> + +namespace nix { + +template<typename K, typename V> +class AsyncCollect +{ +public: + using Item = std::conditional_t<std::is_void_v<V>, K, std::pair<K, V>>; + +private: + kj::ForkedPromise<void> allPromises; + std::list<Item> results; + size_t remaining; + + kj::ForkedPromise<void> signal; + kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> notify; + + void oneDone(Item item) + { + results.emplace_back(std::move(item)); + remaining -= 1; + KJ_IF_MAYBE (n, notify) { + (*n)->fulfill(); + notify = nullptr; + } + } + + kj::Promise<void> collectorFor(K key, kj::Promise<V> promise) + { + if constexpr (std::is_void_v<V>) { + return promise.then([this, key{std::move(key)}] { oneDone(std::move(key)); }); + } else { + return promise.then([this, key{std::move(key)}](V v) { + oneDone(Item{std::move(key), std::move(v)}); + }); + } + } + + kj::ForkedPromise<void> waitForAll(kj::Array<std::pair<K, kj::Promise<V>>> & promises) + { + kj::Vector<kj::Promise<void>> wrappers; + for (auto & [key, promise] : promises) { + wrappers.add(collectorFor(std::move(key), std::move(promise))); + } + + return kj::joinPromisesFailFast(wrappers.releaseAsArray()).fork(); + } + +public: + AsyncCollect(kj::Array<std::pair<K, kj::Promise<V>>> && promises) + : allPromises(waitForAll(promises)) + , remaining(promises.size()) + , signal{nullptr} + { + } + + kj::Promise<std::optional<Item>> next() + { + if (remaining == 0 && results.empty()) { + return {std::nullopt}; + } + + if (!results.empty()) { + auto result = std::move(results.front()); + results.pop_front(); + return {{std::move(result)}}; + } + + if (notify == nullptr) { + auto pair = kj::newPromiseAndFulfiller<void>(); + notify = std::move(pair.fulfiller); + signal = pair.promise.fork(); + } + + return signal.addBranch().exclusiveJoin(allPromises.addBranch()).then([this] { + return next(); + }); + } +}; + +/** + * Collect the results of a list of promises, in order of completion. + * Once any input promise is rejected all promises that have not been + * resolved or rejected will be cancelled and the exception rethrown. + */ +template<typename K, typename V> +AsyncCollect<K, V> asyncCollect(kj::Array<std::pair<K, kj::Promise<V>>> promises) +{ + return AsyncCollect<K, V>(std::move(promises)); +} + +} diff --git a/src/libutil/async-semaphore.hh b/src/libutil/async-semaphore.hh new file mode 100644 index 000000000..f8db31a68 --- /dev/null +++ b/src/libutil/async-semaphore.hh @@ -0,0 +1,122 @@ +#pragma once +/// @file +/// @brief A semaphore implementation usable from within a KJ event loop. + +#include <cassert> +#include <kj/async.h> +#include <kj/common.h> +#include <kj/exception.h> +#include <kj/list.h> +#include <kj/source-location.h> +#include <memory> +#include <optional> + +namespace nix { + +class AsyncSemaphore +{ +public: + class [[nodiscard("destroying a semaphore guard releases the semaphore immediately")]] Token + { + struct Release + { + void operator()(AsyncSemaphore * sem) const + { + sem->unsafeRelease(); + } + }; + + std::unique_ptr<AsyncSemaphore, Release> parent; + + public: + Token() = default; + Token(AsyncSemaphore & parent, kj::Badge<AsyncSemaphore>) : parent(&parent) {} + + bool valid() const + { + return parent != nullptr; + } + }; + +private: + struct Waiter + { + kj::PromiseFulfiller<Token> & fulfiller; + kj::ListLink<Waiter> link; + kj::List<Waiter, &Waiter::link> & list; + + Waiter(kj::PromiseFulfiller<Token> & fulfiller, kj::List<Waiter, &Waiter::link> & list) + : fulfiller(fulfiller) + , list(list) + { + list.add(*this); + } + + ~Waiter() + { + if (link.isLinked()) { + list.remove(*this); + } + } + }; + + const unsigned capacity_; + unsigned used_ = 0; + kj::List<Waiter, &Waiter::link> waiters; + + void unsafeRelease() + { + used_ -= 1; + while (used_ < capacity_ && !waiters.empty()) { + used_ += 1; + auto & w = waiters.front(); + w.fulfiller.fulfill(Token{*this, {}}); + waiters.remove(w); + } + } + +public: + explicit AsyncSemaphore(unsigned capacity) : capacity_(capacity) {} + + KJ_DISALLOW_COPY_AND_MOVE(AsyncSemaphore); + + ~AsyncSemaphore() + { + assert(waiters.empty() && "destroyed a semaphore with active waiters"); + } + + std::optional<Token> tryAcquire() + { + if (used_ < capacity_) { + used_ += 1; + return Token{*this, {}}; + } else { + return {}; + } + } + + kj::Promise<Token> acquire() + { + if (auto t = tryAcquire()) { + return std::move(*t); + } else { + return kj::newAdaptedPromise<Token, Waiter>(waiters); + } + } + + unsigned capacity() const + { + return capacity_; + } + + unsigned used() const + { + return used_; + } + + unsigned available() const + { + return capacity_ - used_; + } +}; +} diff --git a/src/libutil/fmt.hh b/src/libutil/fmt.hh index ee3e1e2e7..5feefdf90 100644 --- a/src/libutil/fmt.hh +++ b/src/libutil/fmt.hh @@ -136,11 +136,17 @@ inline std::string fmt(const char * s) template<typename... Args> inline std::string fmt(const std::string & fs, const Args &... args) -{ +try { boost::format f(fs); fmt_internal::setExceptions(f); (f % ... % args); return f.str(); +} catch (boost::io::format_error & fe) { + // I don't care who catches this, we do not put up with boost format errors + // Give me a stack trace and a core dump + std::cerr << "nix::fmt threw format error. Original format string: '"; + std::cerr << fs << "'; number of arguments: " << sizeof...(args) << "\n"; + std::terminate(); } /** @@ -174,15 +180,13 @@ public: std::cerr << "HintFmt received incorrect number of format args. Original format string: '"; std::cerr << format << "'; number of arguments: " << sizeof...(args) << "\n"; // And regardless of the coredump give me a damn stacktrace. - printStackTrace(); - abort(); + std::terminate(); } } catch (boost::io::format_error & ex) { // Same thing, but for anything that happens in the member initializers. std::cerr << "HintFmt received incorrect format string. Original format string: '"; std::cerr << format << "'; number of arguments: " << sizeof...(args) << "\n"; - printStackTrace(); - abort(); + std::terminate(); } HintFmt(const HintFmt & hf) : fmt(hf.fmt) {} diff --git a/src/libutil/meson.build b/src/libutil/meson.build index a3f21de59..afca4e021 100644 --- a/src/libutil/meson.build +++ b/src/libutil/meson.build @@ -53,6 +53,8 @@ libutil_headers = files( 'archive.hh', 'args/root.hh', 'args.hh', + 'async-collect.hh', + 'async-semaphore.hh', 'backed-string-view.hh', 'box_ptr.hh', 'canon-path.hh', diff --git a/src/nix-channel/meson.build b/src/nix-channel/meson.build index 952dfdb78..97b92d789 100644 --- a/src/nix-channel/meson.build +++ b/src/nix-channel/meson.build @@ -1,5 +1 @@ -configure_file( - input : 'unpack-channel.nix', - output : 'unpack-channel.nix', - copy : true, -) +fs.copyfile('unpack-channel.nix') diff --git a/src/nix/fmt.cc b/src/nix/fmt.cc index 059904150..f47f2204a 100644 --- a/src/nix/fmt.cc +++ b/src/nix/fmt.cc @@ -39,14 +39,8 @@ struct CmdFmt : SourceExprCommand { Strings programArgs{app.program}; // Propagate arguments from the CLI - if (args.empty()) { - // Format the current flake out of the box - programArgs.push_back("."); - } else { - // User wants more power, let them decide which paths to include/exclude - for (auto &i : args) { - programArgs.push_back(i); - } + for (auto &i : args) { + programArgs.push_back(i); } runProgramInStore(store, UseSearchPath::DontUse, app.program, programArgs); diff --git a/tests/functional/build.sh b/tests/functional/build.sh index 356985a64..a14f6e3c2 100644 --- a/tests/functional/build.sh +++ b/tests/functional/build.sh @@ -146,11 +146,8 @@ out="$(nix build -f fod-failing.nix -L 2>&1)" && status=0 || status=$? test "$status" = 1 # one "hash mismatch" error, one "build of ... failed" test "$(<<<"$out" grep -E '^error:' | wc -l)" = 2 -<<<"$out" grepQuiet -E "hash mismatch in fixed-output derivation '.*-x1\\.drv'" -<<<"$out" grepQuiet -vE "hash mismatch in fixed-output derivation '.*-x3\\.drv'" -<<<"$out" grepQuiet -vE "hash mismatch in fixed-output derivation '.*-x2\\.drv'" -<<<"$out" grepQuiet -E "likely URL: https://meow.puppy.forge/puppy.tar.gz" -<<<"$out" grepQuiet -vE "likely URL: https://kitty.forge/cat.tar.gz" +<<<"$out" grepQuiet -E "hash mismatch in fixed-output derivation '.*-x.\\.drv'" +<<<"$out" grepQuiet -E "likely URL: " <<<"$out" grepQuiet -E "error: build of '.*-x[1-4]\\.drv\\^out', '.*-x[1-4]\\.drv\\^out', '.*-x[1-4]\\.drv\\^out', '.*-x[1-4]\\.drv\\^out' failed" out="$(nix build -f fod-failing.nix -L x1 x2 x3 --keep-going 2>&1)" && status=0 || status=$? @@ -167,9 +164,9 @@ test "$(<<<"$out" grep -E '^error:' | wc -l)" = 4 out="$(nix build -f fod-failing.nix -L x4 2>&1)" && status=0 || status=$? test "$status" = 1 -test "$(<<<"$out" grep -E '^error:' | wc -l)" = 2 -<<<"$out" grepQuiet -E "error: 1 dependencies of derivation '.*-x4\\.drv' failed to build" -<<<"$out" grepQuiet -E "hash mismatch in fixed-output derivation '.*-x2\\.drv'" +test "$(<<<"$out" grep -E '^error:' | wc -l)" -ge 2 +<<<"$out" grepQuiet -E "error: [12] dependencies of derivation '.*-x4\\.drv' failed to build" +<<<"$out" grepQuiet -E "hash mismatch in fixed-output derivation '.*-x[23]\\.drv'" out="$(nix build -f fod-failing.nix -L x4 --keep-going 2>&1)" && status=0 || status=$? test "$status" = 1 diff --git a/tests/functional/fetchGit.sh b/tests/functional/fetchGit.sh index 2c00facc2..492c57602 100644 --- a/tests/functional/fetchGit.sh +++ b/tests/functional/fetchGit.sh @@ -53,8 +53,17 @@ out=$(nix eval --impure --raw --expr "builtins.fetchGit { url = \"file://$repo\" [[ $status == 1 ]] [[ $out =~ 'Cannot find Git revision' ]] +# allow revs as refs (for 2.3 compat) [[ $(nix eval --raw --expr "builtins.readFile (builtins.fetchGit { url = \"file://$repo\"; rev = \"$devrev\"; allRefs = true; } + \"/differentbranch\")") = 'different file' ]] +rm -rf "$TEST_ROOT/test-home" +[[ $(nix eval --raw --expr "builtins.readFile (builtins.fetchGit { url = \"file://$repo\"; rev = \"$devrev\"; allRefs = true; } + \"/differentbranch\")") = 'different file' ]] + +rm -rf "$TEST_ROOT/test-home" +out=$(nix eval --raw --expr "builtins.readFile (builtins.fetchGit { url = \"file://$repo\"; rev = \"$devrev\"; ref = \"lolkek\"; } + \"/differentbranch\")" 2>&1) || status=$? +[[ $status == 1 ]] +[[ $out =~ 'Cannot find Git revision' ]] + # In pure eval mode, fetchGit without a revision should fail. [[ $(nix eval --impure --raw --expr "builtins.readFile (fetchGit \"file://$repo\" + \"/hello\")") = world ]] (! nix eval --raw --expr "builtins.readFile (fetchGit \"file://$repo\" + \"/hello\")") @@ -228,6 +237,12 @@ export _NIX_FORCE_HTTP=1 rev_tag1_nix=$(nix eval --impure --raw --expr "(builtins.fetchGit { url = \"file://$repo\"; ref = \"refs/tags/tag1\"; }).rev") rev_tag1=$(git -C $repo rev-parse refs/tags/tag1) [[ $rev_tag1_nix = $rev_tag1 ]] + +# Allow fetching tags w/o specifying refs/tags +rm -rf "$TEST_ROOT/test-home" +rev_tag1_nix_alt=$(nix eval --impure --raw --expr "(builtins.fetchGit { url = \"file://$repo\"; ref = \"tag1\"; }).rev") +[[ $rev_tag1_nix_alt = $rev_tag1 ]] + rev_tag2_nix=$(nix eval --impure --raw --expr "(builtins.fetchGit { url = \"file://$repo\"; ref = \"refs/tags/tag2\"; }).rev") rev_tag2=$(git -C $repo rev-parse refs/tags/tag2) [[ $rev_tag2_nix = $rev_tag2 ]] @@ -254,3 +269,33 @@ git -C "$repo" add hello .gitignore git -C "$repo" commit -m 'Bla1' cd "$repo" path11=$(nix eval --impure --raw --expr "(builtins.fetchGit ./.).outPath") + +# test behavior if both branch and tag with same name exist +repo="$TEST_ROOT/git" +rm -rf "$repo"/.git +git init "$repo" +git -C "$repo" config user.email "foobar@example.com" +git -C "$repo" config user.name "Foobar" + +touch "$repo"/test +echo "hello world" > "$repo"/test +git -C "$repo" checkout -b branch +git -C "$repo" add test + +git -C "$repo" commit -m "Init" + +git -C "$repo" tag branch + +echo "goodbye world" > "$repo"/test +git -C "$repo" add test +git -C "$repo" commit -m "Update test" + +path12=$(nix eval --impure --raw --expr "(builtins.fetchGit { url = \"file://$repo\"; ref = \"branch\"; }).outPath") +[[ "$(cat "$path12"/test)" =~ 'hello world' ]] +[[ "$(cat "$repo"/test)" =~ 'goodbye world' ]] + +path13=$(nix eval --impure --raw --expr "(builtins.fetchGit { url = \"file://$repo\"; ref = \"refs/heads/branch\"; }).outPath") +[[ "$(cat "$path13"/test)" =~ 'goodbye world' ]] + +path14=$(nix eval --impure --raw --expr "(builtins.fetchGit { url = \"file://$repo\"; ref = \"refs/tags/branch\"; }).outPath") +[[ "$path14" = "$path12" ]] diff --git a/tests/functional/fmt.sh b/tests/functional/fmt.sh index 3c1bd9989..7d6add9b6 100644 --- a/tests/functional/fmt.sh +++ b/tests/functional/fmt.sh @@ -26,7 +26,10 @@ cat << EOF > flake.nix }; } EOF -nix fmt ./file ./folder | grep 'Formatting: ./file ./folder' +# No arguments check +[[ "$(nix fmt)" = "Formatting(0):" ]] +# Argument forwarding check +nix fmt ./file ./folder | grep 'Formatting(2): ./file ./folder' nix flake check nix flake show | grep -P "package 'formatter'" diff --git a/tests/functional/fmt.simple.sh b/tests/functional/fmt.simple.sh index 03109a655..f655846ca 100755 --- a/tests/functional/fmt.simple.sh +++ b/tests/functional/fmt.simple.sh @@ -1,3 +1,3 @@ #!/usr/bin/env bash -echo Formatting: "${@}" +echo "Formatting(${#}):" "${@}" diff --git a/tests/nixos/default.nix b/tests/nixos/default.nix index 20e66f6c1..2d6eaed16 100644 --- a/tests/nixos/default.nix +++ b/tests/nixos/default.nix @@ -157,4 +157,6 @@ in coredumps = runNixOSTestFor "x86_64-linux" ./coredumps; io_uring = runNixOSTestFor "x86_64-linux" ./io_uring; + + fetchurl = runNixOSTestFor "x86_64-linux" ./fetchurl.nix; } diff --git a/tests/nixos/fetchurl.nix b/tests/nixos/fetchurl.nix new file mode 100644 index 000000000..97365d053 --- /dev/null +++ b/tests/nixos/fetchurl.nix @@ -0,0 +1,84 @@ +# Test whether builtin:fetchurl properly performs TLS certificate +# checks on HTTPS servers. + +{ lib, config, pkgs, ... }: + +let + + makeTlsCert = name: pkgs.runCommand name { + nativeBuildInputs = with pkgs; [ openssl ]; + } '' + mkdir -p $out + openssl req -x509 \ + -subj '/CN=${name}/' -days 49710 \ + -addext 'subjectAltName = DNS:${name}' \ + -keyout "$out/key.pem" -newkey ed25519 \ + -out "$out/cert.pem" -noenc + ''; + + goodCert = makeTlsCert "good"; + badCert = makeTlsCert "bad"; + +in + +{ + name = "fetchurl"; + + nodes = { + machine = { lib, pkgs, ... }: { + services.nginx = { + enable = true; + + virtualHosts."good" = { + addSSL = true; + sslCertificate = "${goodCert}/cert.pem"; + sslCertificateKey = "${goodCert}/key.pem"; + root = pkgs.runCommand "nginx-root" {} '' + mkdir "$out" + echo 'hello world' > "$out/index.html" + ''; + }; + + virtualHosts."bad" = { + addSSL = true; + sslCertificate = "${badCert}/cert.pem"; + sslCertificateKey = "${badCert}/key.pem"; + root = pkgs.runCommand "nginx-root" {} '' + mkdir "$out" + echo 'foobar' > "$out/index.html" + ''; + }; + }; + + security.pki.certificateFiles = [ "${goodCert}/cert.pem" ]; + + networking.hosts."127.0.0.1" = [ "good" "bad" ]; + + virtualisation.writableStore = true; + + nix.settings.experimental-features = "nix-command"; + }; + }; + + testScript = { nodes, ... }: '' + machine.wait_for_unit("nginx") + machine.wait_for_open_port(443) + + out = machine.succeed("curl https://good/index.html") + assert out == "hello world\n" + + out = machine.succeed("cat ${badCert}/cert.pem > /tmp/cafile.pem; curl --cacert /tmp/cafile.pem https://bad/index.html") + assert out == "foobar\n" + + # Fetching from a server with a trusted cert should work. + machine.succeed("nix build --no-substitute --expr 'import <nix/fetchurl.nix> { url = \"https://good/index.html\"; hash = \"sha256-qUiQTy8PR5uPgZdpSzAYSw0u0cHNKh7A+4XSmaGSpEc=\"; }'") + + # Fetching from a server with an untrusted cert should fail. + err = machine.fail("nix build --no-substitute --expr 'import <nix/fetchurl.nix> { url = \"https://bad/index.html\"; hash = \"sha256-rsBwZF/lPuOzdjBZN2E08FjMM3JHyXit0Xi2zN+wAZ8=\"; }' 2>&1") + print(err) + assert "SSL certificate problem: self-signed certificate" in err or "SSL peer certificate or SSH remote key was not OK" in err + + # Fetching from a server with a trusted cert should work via environment variable override. + machine.succeed("NIX_SSL_CERT_FILE=/tmp/cafile.pem nix build --no-substitute --expr 'import <nix/fetchurl.nix> { url = \"https://bad/index.html\"; hash = \"sha256-rsBwZF/lPuOzdjBZN2E08FjMM3JHyXit0Xi2zN+wAZ8=\"; }'") + ''; +} diff --git a/tests/unit/libmain/crash.cc b/tests/unit/libmain/crash.cc new file mode 100644 index 000000000..883dc39bd --- /dev/null +++ b/tests/unit/libmain/crash.cc @@ -0,0 +1,56 @@ +#include <gtest/gtest.h> +#include "crash-handler.hh" + +namespace nix { + +class OopsException : public std::exception +{ + const char * msg; + +public: + OopsException(const char * msg) : msg(msg) {} + const char * what() const noexcept override + { + return msg; + } +}; + +void causeCrashForTesting(std::function<void()> fixture) +{ + registerCrashHandler(); + std::cerr << "time to crash\n"; + try { + fixture(); + } catch (...) { + std::terminate(); + } +} + +TEST(CrashHandler, exceptionName) +{ + ASSERT_DEATH( + causeCrashForTesting([]() { throw OopsException{"lol oops"}; }), + "time to crash\nLix crashed.*OopsException: lol oops" + ); +} + +TEST(CrashHandler, unknownTerminate) +{ + ASSERT_DEATH( + causeCrashForTesting([]() { std::terminate(); }), + "time to crash\nLix crashed.*std::terminate\\(\\) called without exception" + ); +} + +TEST(CrashHandler, nonStdException) +{ + ASSERT_DEATH( + causeCrashForTesting([]() { + // NOLINTNEXTLINE(hicpp-exception-baseclass): intentional + throw 4; + }), + "time to crash\nLix crashed.*Unknown exception! Spooky\\." + ); +} + +} diff --git a/tests/unit/libutil/async-collect.cc b/tests/unit/libutil/async-collect.cc new file mode 100644 index 000000000..770374d21 --- /dev/null +++ b/tests/unit/libutil/async-collect.cc @@ -0,0 +1,104 @@ +#include "async-collect.hh" + +#include <gtest/gtest.h> +#include <kj/array.h> +#include <kj/async.h> +#include <kj/exception.h> +#include <stdexcept> + +namespace nix { + +TEST(AsyncCollect, void) +{ + kj::EventLoop loop; + kj::WaitScope waitScope(loop); + + auto a = kj::newPromiseAndFulfiller<void>(); + auto b = kj::newPromiseAndFulfiller<void>(); + auto c = kj::newPromiseAndFulfiller<void>(); + auto d = kj::newPromiseAndFulfiller<void>(); + + auto collect = asyncCollect(kj::arr( + std::pair(1, std::move(a.promise)), + std::pair(2, std::move(b.promise)), + std::pair(3, std::move(c.promise)), + std::pair(4, std::move(d.promise)) + )); + + auto p = collect.next(); + ASSERT_FALSE(p.poll(waitScope)); + + // collection is ordered + c.fulfiller->fulfill(); + b.fulfiller->fulfill(); + + ASSERT_TRUE(p.poll(waitScope)); + ASSERT_EQ(p.wait(waitScope), 3); + + p = collect.next(); + ASSERT_TRUE(p.poll(waitScope)); + ASSERT_EQ(p.wait(waitScope), 2); + + p = collect.next(); + ASSERT_FALSE(p.poll(waitScope)); + + // exceptions propagate + a.fulfiller->rejectIfThrows([] { throw std::runtime_error("test"); }); + + p = collect.next(); + ASSERT_TRUE(p.poll(waitScope)); + ASSERT_THROW(p.wait(waitScope), kj::Exception); + + // first exception aborts collection + p = collect.next(); + ASSERT_TRUE(p.poll(waitScope)); + ASSERT_THROW(p.wait(waitScope), kj::Exception); +} + +TEST(AsyncCollect, nonVoid) +{ + kj::EventLoop loop; + kj::WaitScope waitScope(loop); + + auto a = kj::newPromiseAndFulfiller<int>(); + auto b = kj::newPromiseAndFulfiller<int>(); + auto c = kj::newPromiseAndFulfiller<int>(); + auto d = kj::newPromiseAndFulfiller<int>(); + + auto collect = asyncCollect(kj::arr( + std::pair(1, std::move(a.promise)), + std::pair(2, std::move(b.promise)), + std::pair(3, std::move(c.promise)), + std::pair(4, std::move(d.promise)) + )); + + auto p = collect.next(); + ASSERT_FALSE(p.poll(waitScope)); + + // collection is ordered + c.fulfiller->fulfill(1); + b.fulfiller->fulfill(2); + + ASSERT_TRUE(p.poll(waitScope)); + ASSERT_EQ(p.wait(waitScope), std::pair(3, 1)); + + p = collect.next(); + ASSERT_TRUE(p.poll(waitScope)); + ASSERT_EQ(p.wait(waitScope), std::pair(2, 2)); + + p = collect.next(); + ASSERT_FALSE(p.poll(waitScope)); + + // exceptions propagate + a.fulfiller->rejectIfThrows([] { throw std::runtime_error("test"); }); + + p = collect.next(); + ASSERT_TRUE(p.poll(waitScope)); + ASSERT_THROW(p.wait(waitScope), kj::Exception); + + // first exception aborts collection + p = collect.next(); + ASSERT_TRUE(p.poll(waitScope)); + ASSERT_THROW(p.wait(waitScope), kj::Exception); +} +} diff --git a/tests/unit/libutil/async-semaphore.cc b/tests/unit/libutil/async-semaphore.cc new file mode 100644 index 000000000..12b52885d --- /dev/null +++ b/tests/unit/libutil/async-semaphore.cc @@ -0,0 +1,74 @@ +#include "async-semaphore.hh" + +#include <gtest/gtest.h> +#include <kj/async.h> + +namespace nix { + +TEST(AsyncSemaphore, counting) +{ + kj::EventLoop loop; + kj::WaitScope waitScope(loop); + + AsyncSemaphore sem(2); + + ASSERT_EQ(sem.available(), 2); + ASSERT_EQ(sem.used(), 0); + + auto a = kj::evalNow([&] { return sem.acquire(); }); + ASSERT_EQ(sem.available(), 1); + ASSERT_EQ(sem.used(), 1); + auto b = kj::evalNow([&] { return sem.acquire(); }); + ASSERT_EQ(sem.available(), 0); + ASSERT_EQ(sem.used(), 2); + + auto c = kj::evalNow([&] { return sem.acquire(); }); + auto d = kj::evalNow([&] { return sem.acquire(); }); + + ASSERT_TRUE(a.poll(waitScope)); + ASSERT_TRUE(b.poll(waitScope)); + ASSERT_FALSE(c.poll(waitScope)); + ASSERT_FALSE(d.poll(waitScope)); + + a = nullptr; + ASSERT_TRUE(c.poll(waitScope)); + ASSERT_FALSE(d.poll(waitScope)); + + { + auto lock = b.wait(waitScope); + ASSERT_FALSE(d.poll(waitScope)); + } + + ASSERT_TRUE(d.poll(waitScope)); + + ASSERT_EQ(sem.available(), 0); + ASSERT_EQ(sem.used(), 2); + c = nullptr; + ASSERT_EQ(sem.available(), 1); + ASSERT_EQ(sem.used(), 1); + d = nullptr; + ASSERT_EQ(sem.available(), 2); + ASSERT_EQ(sem.used(), 0); +} + +TEST(AsyncSemaphore, cancelledWaiter) +{ + kj::EventLoop loop; + kj::WaitScope waitScope(loop); + + AsyncSemaphore sem(1); + + auto a = kj::evalNow([&] { return sem.acquire(); }); + auto b = kj::evalNow([&] { return sem.acquire(); }); + auto c = kj::evalNow([&] { return sem.acquire(); }); + + ASSERT_TRUE(a.poll(waitScope)); + ASSERT_FALSE(b.poll(waitScope)); + + b = nullptr; + a = nullptr; + + ASSERT_TRUE(c.poll(waitScope)); +} + +} diff --git a/tests/unit/meson.build b/tests/unit/meson.build index 8ff0b5ec5..8b0c66dd8 100644 --- a/tests/unit/meson.build +++ b/tests/unit/meson.build @@ -39,6 +39,8 @@ liblixutil_test_support = declare_dependency( ) libutil_tests_sources = files( + 'libutil/async-collect.cc', + 'libutil/async-semaphore.cc', 'libutil/canon-path.cc', 'libutil/checked-arithmetic.cc', 'libutil/chunked-vector.cc', @@ -76,6 +78,7 @@ libutil_tester = executable( liblixexpr_mstatic, liblixutil_test_support, nlohmann_json, + kj, ], cpp_pch : cpp_pch, ) @@ -262,9 +265,14 @@ test( protocol : 'gtest', ) +libmain_tests_sources = files( + 'libmain/crash.cc', + 'libmain/progress-bar.cc', +) + libmain_tester = executable( 'liblixmain-tests', - files('libmain/progress-bar.cc'), + libmain_tests_sources, dependencies : [ liblixmain, liblixexpr, |