aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEelco Dolstra <edolstra@gmail.com>2020-02-14 22:34:14 +0100
committerEelco Dolstra <edolstra@gmail.com>2020-02-14 23:05:49 +0100
commit4c242639673a832f5b2dd3439205811c42992bb4 (patch)
treea3a9de0ed36cce513337d42d7baa4599b6b8c3d6
parente375da6899a939d5b987eff8a5a85fd083b24849 (diff)
nix eval-hydra-jobs: Support parallel evaluation
Example usage: $ nix eval-hydra-jobs -f '<nixpkgs/pkgs/top-level/release.nix>' '' \ --max-memory-size 2048 --workers 8
-rw-r--r--src/nix/eval-hydra-jobs.cc327
1 files changed, 235 insertions, 92 deletions
diff --git a/src/nix/eval-hydra-jobs.cc b/src/nix/eval-hydra-jobs.cc
index dd3fcf8be..60e41c3eb 100644
--- a/src/nix/eval-hydra-jobs.cc
+++ b/src/nix/eval-hydra-jobs.cc
@@ -5,6 +5,10 @@
#include "common-args.hh"
#include "json.hh"
#include "get-drvs.hh"
+#include "attr-path.hh"
+
+#include <nlohmann/json.hpp>
+#include <sys/resource.h>
using namespace nix;
@@ -36,6 +40,8 @@ static std::string queryMetaStrings(EvalState & state, DrvInfo & drv, const stri
struct CmdEvalHydraJobs : MixJSON, MixDryRun, InstallableCommand
{
std::optional<Path> gcRootsDir;
+ size_t nrWorkers = 1;
+ size_t maxMemorySize = 4ULL * 1024;
CmdEvalHydraJobs()
{
@@ -44,6 +50,10 @@ struct CmdEvalHydraJobs : MixJSON, MixDryRun, InstallableCommand
.description("garbage collector roots directory")
.labels({"path"})
.dest(&gcRootsDir);
+
+ mkIntFlag(0, "workers", "number of concurrent worker processes", &nrWorkers);
+
+ mkIntFlag(0, "max-memory-size", "maximum memory usage per worker process (in MiB)", &maxMemorySize);
}
std::string description() override
@@ -61,11 +71,11 @@ struct CmdEvalHydraJobs : MixJSON, MixDryRun, InstallableCommand
};
}
- void run(ref<Store> store) override
+ void worker(AutoCloseFD & to, AutoCloseFD & from)
{
auto state = getEvalState();
- if (!gcRootsDir) warn("'--gc-roots-dir' not specified");
+ // FIXME: should re-open state->store.
if (dryRun) settings.readOnlyMode = true;
@@ -73,118 +83,251 @@ struct CmdEvalHydraJobs : MixJSON, MixDryRun, InstallableCommand
to the environment. */
evalSettings.restrictEval = true;
- auto v = installable->toValue(*state).first;
+ auto autoArgs = getAutoArgs(*state);
- auto jsonObj = json ? std::make_unique<JSONObject>(std::cout, true) : nullptr;
+ auto vTop = installable->toValue(*state).first;
- std::function<void(Value & vIn, const string & attrPath)> findJobs;
+ auto vRoot = state->allocValue();
+ state->autoCallFunction(*autoArgs, *vTop, *vRoot);
- auto autoArgs = getAutoArgs(*state);
+ while (true) {
+ /* Wait for the master to send us a job name. */
+ writeLine(to.get(), "next");
+
+ auto s = readLine(from.get());
+ if (s == "exit") break;
+ if (!hasPrefix(s, "do ")) abort();
+ std::string attrPath(s, 3);
+
+ debug("worker process %d at '%s'", getpid(), attrPath);
+
+ /* Evaluate it and send info back to the master. */
+ nlohmann::json reply;
- findJobs = [&](Value & vIn, const string & attrPath)
- {
try {
- Activity act(*logger, lvlInfo, actUnknown, fmt("evaluating '%s'", attrPath));
-
- checkInterrupt();
-
- auto v = state->allocValue();
- state->autoCallFunction(*autoArgs, vIn, *v);
-
- if (v->type == tAttrs) {
- auto drv = getDerivation(*state, *v, false);
-
- if (drv) {
-
- DrvInfo::Outputs outputs = drv->queryOutputs();
-
- if (drv->querySystem() == "unknown")
- throw EvalError("derivation must have a 'system' attribute");
-
- auto drvPath = drv->queryDrvPath();
-
- if (jsonObj) {
- auto res = jsonObj->object(attrPath);
- res.attr("nixName", drv->queryName());
- res.attr("system", drv->querySystem());
- res.attr("drvPath", drvPath);
- res.attr("description", drv->queryMetaString("description"));
- res.attr("license", queryMetaStrings(*state, *drv, "license", "shortName"));
- res.attr("homepage", drv->queryMetaString("homepage"));
- res.attr("maintainers", queryMetaStrings(*state, *drv, "maintainers", "email"));
- res.attr("schedulingPriority", drv->queryMetaInt("schedulingPriority", 100));
- res.attr("timeout", drv->queryMetaInt("timeout", 36000));
- res.attr("maxSilent", drv->queryMetaInt("maxSilent", 7200));
- res.attr("isChannel", drv->queryMetaBool("isHydraChannel", false));
-
- /* If this is an aggregate, then get its constituents. */
- auto a = v->attrs->get(state->symbols.create("_hydraAggregate"));
- if (a && state->forceBool(*a->value, *a->pos)) {
- auto a = v->attrs->get(state->symbols.create("constituents"));
- if (!a)
- throw EvalError("derivation must have a ‘constituents’ attribute");
- PathSet context;
- state->coerceToString(*a->pos, *a->value, context, true, false);
- PathSet drvs;
- for (auto & i : context)
- if (i.at(0) == '!') {
- size_t index = i.find("!", 1);
- drvs.insert(string(i, index + 1));
- }
- res.attr("constituents", concatStringsSep(" ", drvs));
+ auto v = findAlongAttrPath(*state, attrPath, *autoArgs, *vRoot).first;
+
+ state->forceValue(*v);
+
+ if (auto drv = getDerivation(*state, *v, false)) {
+
+ DrvInfo::Outputs outputs = drv->queryOutputs();
+
+ if (drv->querySystem() == "unknown")
+ throw EvalError("derivation must have a 'system' attribute");
+
+ auto drvPath = drv->queryDrvPath();
+
+ nlohmann::json job;
+
+ job["nixName"] = drv->queryName();
+ job["system"] =drv->querySystem();
+ job["drvPath"] = drvPath;
+ job["description"] = drv->queryMetaString("description");
+ job["license"] = queryMetaStrings(*state, *drv, "license", "shortName");
+ job["homepage"] = drv->queryMetaString("homepage");
+ job["maintainers"] = queryMetaStrings(*state, *drv, "maintainers", "email");
+ job["schedulingPriority"] = drv->queryMetaInt("schedulingPriority", 100);
+ job["timeout"] = drv->queryMetaInt("timeout", 36000);
+ job["maxSilent"] = drv->queryMetaInt("maxSilent", 7200);
+ job["isChannel"] = drv->queryMetaBool("isHydraChannel", false);
+
+ /* If this is an aggregate, then get its constituents. */
+ auto a = v->attrs->get(state->symbols.create("_hydraAggregate"));
+ if (a && state->forceBool(*a->value, *a->pos)) {
+ auto a = v->attrs->get(state->symbols.create("constituents"));
+ if (!a)
+ throw EvalError("derivation must have a ‘constituents’ attribute");
+ PathSet context;
+ state->coerceToString(*a->pos, *a->value, context, true, false);
+ PathSet drvs;
+ for (auto & i : context)
+ if (i.at(0) == '!') {
+ size_t index = i.find("!", 1);
+ drvs.insert(string(i, index + 1));
}
+ job["constituents"] = concatStringsSep(" ", drvs);
+ }
- /* Register the derivation as a GC root. !!! This
- registers roots for jobs that we may have already
- done. */
- auto localStore = state->store.dynamic_pointer_cast<LocalFSStore>();
- if (gcRootsDir && localStore) {
- Path root = *gcRootsDir + "/" + std::string(baseNameOf(drvPath));
- if (!pathExists(root))
- localStore->addPermRoot(localStore->parseStorePath(drvPath), root, false);
- }
+ /* Register the derivation as a GC root. !!! This
+ registers roots for jobs that we may have already
+ done. */
+ auto localStore = state->store.dynamic_pointer_cast<LocalFSStore>();
+ if (gcRootsDir && localStore) {
+ Path root = *gcRootsDir + "/" + std::string(baseNameOf(drvPath));
+ if (!pathExists(root))
+ localStore->addPermRoot(localStore->parseStorePath(drvPath), root, false);
+ }
- auto res2 = res.object("outputs");
- for (auto & j : outputs)
- res2.attr(j.first, j.second);
- } else
- std::cout << fmt("%d: %d\n", attrPath, drvPath);
+ nlohmann::json out;
+ for (auto & j : outputs)
+ out[j.first] = j.second;
+ job["outputs"] = std::move(out);
+
+ reply["job"] = std::move(job);
+ }
+ else if (v->type == tAttrs) {
+ auto attrs = nlohmann::json::array();
+ StringSet ss;
+ for (auto & i : v->attrs->lexicographicOrder()) {
+ std::string name(i->name);
+ if (name.find('.') != std::string::npos || name.find(' ') != std::string::npos) {
+ printError("skipping job with illegal name '%s'", name);
+ continue;
+ }
+ attrs.push_back(name);
}
+ reply["attrs"] = std::move(attrs);
+ }
+
+ } catch (EvalError & e) {
+ reply["error"] = filterANSIEscapes(e.msg(), true);
+ }
+
+ writeLine(to.get(), reply.dump());
- else {
- if (!state->isDerivation(*v)) {
- for (auto & i : v->attrs->lexicographicOrder()) {
- std::string name(i->name);
+ /* If our RSS exceeds the maximum, exit. The master will
+ start a new process. */
+ struct rusage r;
+ getrusage(RUSAGE_SELF, &r);
+ if ((size_t) r.ru_maxrss > maxMemorySize * 1024) break;
+ }
- /* Skip jobs with dots in the name. */
- if (name.find('.') != std::string::npos) {
- printError("skipping job with illegal name '%s'", name);
- continue;
+ writeLine(to.get(), "restart");
+ }
+
+ void run(ref<Store> store) override
+ {
+ if (!gcRootsDir) warn("'--gc-roots-dir' not specified");
+
+ struct State
+ {
+ std::set<std::string> todo{""};
+ std::set<std::string> active;
+ nlohmann::json result;
+ };
+
+ std::condition_variable wakeup;
+
+ Sync<State> state_;
+
+ /* Start a handler thread per worker process. */
+ auto handler = [this, &state_, &wakeup]()
+ {
+ try {
+ pid_t pid = -1;
+ AutoCloseFD from, to;
+
+ while (true) {
+
+ /* Start a new worker process if necessary. */
+ if (pid == -1) {
+ Pipe toPipe, fromPipe;
+ toPipe.create();
+ fromPipe.create();
+ pid = startProcess(
+ [this,
+ to{std::make_shared<AutoCloseFD>(std::move(fromPipe.writeSide))},
+ from{std::make_shared<AutoCloseFD>(std::move(toPipe.readSide))}
+ ]()
+ {
+ try {
+ worker(*to, *from);
+ } catch (Error & e) {
+ printError("unexpected worker error: %s", e.msg());
+ _exit(1);
}
+ },
+ ProcessOptions { .allowVfork = false });
+ from = std::move(fromPipe.readSide);
+ to = std::move(toPipe.writeSide);
+ debug("created worker process %d", pid);
+ }
- findJobs(*i->value, (attrPath.empty() ? "" : attrPath + ".") + name);
- }
+ /* Check whether the existing worker process is still there. */
+ auto s = readLine(from.get());
+ if (s == "restart") {
+ pid = -1;
+ continue;
+ } else if (s != "next")
+ throw Error("unexpected worker request: %s", s);
+
+ /* Wait for a job name to become available. */
+ std::string attrPath;
+
+ while (true) {
+ checkInterrupt();
+ auto state(state_.lock());
+ if (state->todo.empty() && state->active.empty()) {
+ writeLine(to.get(), "exit");
+ return;
}
+ if (!state->todo.empty()) {
+ attrPath = *state->todo.begin();
+ state->todo.erase(state->todo.begin());
+ state->active.insert(attrPath);
+ break;
+ } else
+ state.wait(wakeup);
}
- }
- else if (v->type == tNull) {
- // allow null values, meaning 'do nothing'
- }
+ /* Tell the worker to evaluate it. */
+ writeLine(to.get(), "do " + attrPath);
- else
- throw TypeError("unsupported value: %s", *v);
+ /* Wait for the response. */
+ auto response = nlohmann::json::parse(readLine(from.get()));
- } catch (EvalError & e) {
- if (jsonObj)
- jsonObj->object(attrPath).attr("error", filterANSIEscapes(e.msg(), true));
- else
- printError("in job '%s': %s", attrPath, e.what());
+ /* Handle the response. */
+ StringSet newAttrs;
+
+ if (response.find("job") != response.end()) {
+ auto state(state_.lock());
+ if (json)
+ state->result[attrPath] = response["job"];
+ else
+ std::cout << fmt("%d: %d\n", attrPath, (std::string) response["job"]["drvPath"]);
+ }
+
+ if (response.find("attrs") != response.end()) {
+ for (auto & i : response["attrs"]) {
+ auto s = (attrPath.empty() ? "" : attrPath + ".") + (std::string) i;
+ newAttrs.insert(s);
+ }
+ }
+
+ if (response.find("error") != response.end()) {
+ auto state(state_.lock());
+ if (json)
+ state->result[attrPath]["error"] = response["error"];
+ else
+ printError("error in job '%s': %s",
+ attrPath, (std::string) response["error"]);
+ }
+
+ /* Add newly discovered job names to the queue. */
+ {
+ auto state(state_.lock());
+ state->active.erase(attrPath);
+ for (auto & s : newAttrs)
+ state->todo.insert(s);
+ wakeup.notify_all();
+ }
+ }
+ } catch (Error & e) {
+ printError("unexpected handler thread error: %s", e.msg());
+ abort();
}
};
- findJobs(*v, "");
+ std::vector<std::thread> threads;
+ for (size_t i = 0; i < nrWorkers; i++)
+ threads.emplace_back(std::thread(handler));
+
+ for (auto & thread : threads)
+ thread.join();
+
+ if (json) std::cout << state_.lock()->result.dump(2) << "\n";
}
};