aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/libmain/shared.cc1
-rw-r--r--src/libstore/remote-store.cc465
-rw-r--r--src/libstore/remote-store.hh29
-rw-r--r--src/libutil/pool.hh151
-rw-r--r--src/libutil/ref.hh67
-rw-r--r--src/libutil/serialise.cc22
-rw-r--r--src/libutil/serialise.hh23
-rw-r--r--src/libutil/sync.hh78
-rw-r--r--src/libutil/types.hh61
9 files changed, 584 insertions, 313 deletions
diff --git a/src/libmain/shared.cc b/src/libmain/shared.cc
index 8f2aa8420..c27302227 100644
--- a/src/libmain/shared.cc
+++ b/src/libmain/shared.cc
@@ -126,6 +126,7 @@ void initNix()
std::cerr.rdbuf()->pubsetbuf(buf, sizeof(buf));
#endif
+ // FIXME: do we need this? It's not thread-safe.
std::ios::sync_with_stdio(false);
if (getEnv("IN_SYSTEMD") == "1")
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc
index ab2ebb9ae..2f540c640 100644
--- a/src/libstore/remote-store.cc
+++ b/src/libstore/remote-store.cc
@@ -6,6 +6,7 @@
#include "affinity.hh"
#include "globals.hh"
#include "derivations.hh"
+#include "pool.hh"
#include <sys/types.h>
#include <sys/stat.h>
@@ -13,9 +14,8 @@
#include <sys/un.h>
#include <errno.h>
#include <fcntl.h>
-
-#include <iostream>
#include <unistd.h>
+
#include <cstring>
namespace nix {
@@ -39,62 +39,25 @@ template<class T> T readStorePaths(Source & from)
template PathSet readStorePaths(Source & from);
-RemoteStore::RemoteStore()
+RemoteStore::RemoteStore(size_t maxConnections)
+ : connections(make_ref<Pool<Connection>>(
+ maxConnections,
+ [this]() { return openConnection(); },
+ [](const ref<Connection> & r) { return r->to.good() && r->from.good(); }
+ ))
{
- initialised = false;
}
-void RemoteStore::openConnection(bool reserveSpace)
+ref<RemoteStore::Connection> RemoteStore::openConnection(bool reserveSpace)
{
- if (initialised) return;
- initialised = true;
+ auto conn = make_ref<Connection>();
/* Connect to a daemon that does the privileged work for us. */
- connectToDaemon();
-
- from.fd = fdSocket;
- to.fd = fdSocket;
-
- /* Send the magic greeting, check for the reply. */
- try {
- to << WORKER_MAGIC_1;
- to.flush();
- unsigned int magic = readInt(from);
- if (magic != WORKER_MAGIC_2) throw Error("protocol mismatch");
-
- daemonVersion = readInt(from);
- if (GET_PROTOCOL_MAJOR(daemonVersion) != GET_PROTOCOL_MAJOR(PROTOCOL_VERSION))
- throw Error("Nix daemon protocol version not supported");
- to << PROTOCOL_VERSION;
-
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 14) {
- int cpu = settings.lockCPU ? lockToCurrentCPU() : -1;
- if (cpu != -1)
- to << 1 << cpu;
- else
- to << 0;
- }
-
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 11)
- to << reserveSpace;
-
- processStderr();
- }
- catch (Error & e) {
- throw Error(format("cannot start daemon worker: %1%") % e.msg());
- }
-
- setOptions();
-}
-
-
-void RemoteStore::connectToDaemon()
-{
- fdSocket = socket(PF_UNIX, SOCK_STREAM, 0);
- if (fdSocket == -1)
+ conn->fd = socket(PF_UNIX, SOCK_STREAM, 0);
+ if (conn->fd == -1)
throw SysError("cannot create Unix domain socket");
- closeOnExec(fdSocket);
+ closeOnExec(conn->fd);
string socketPath = settings.nixDaemonSocketFile;
@@ -111,111 +74,135 @@ void RemoteStore::connectToDaemon()
addr.sun_family = AF_UNIX;
if (socketPathRel.size() >= sizeof(addr.sun_path))
throw Error(format("socket path ‘%1%’ is too long") % socketPathRel);
- using namespace std;
strcpy(addr.sun_path, socketPathRel.c_str());
- if (connect(fdSocket, (struct sockaddr *) &addr, sizeof(addr)) == -1)
+ if (connect(conn->fd, (struct sockaddr *) &addr, sizeof(addr)) == -1)
throw SysError(format("cannot connect to daemon at ‘%1%’") % socketPath);
if (fchdir(fdPrevDir) == -1)
throw SysError("couldn't change back to previous directory");
-}
+ conn->from.fd = conn->fd;
+ conn->to.fd = conn->fd;
-RemoteStore::~RemoteStore()
-{
+ /* Send the magic greeting, check for the reply. */
try {
- to.flush();
- fdSocket.close();
- } catch (...) {
- ignoreException();
+ conn->to << WORKER_MAGIC_1;
+ conn->to.flush();
+ unsigned int magic = readInt(conn->from);
+ if (magic != WORKER_MAGIC_2) throw Error("protocol mismatch");
+
+ conn->daemonVersion = readInt(conn->from);
+ if (GET_PROTOCOL_MAJOR(conn->daemonVersion) != GET_PROTOCOL_MAJOR(PROTOCOL_VERSION))
+ throw Error("Nix daemon protocol version not supported");
+ conn->to << PROTOCOL_VERSION;
+
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 14) {
+ int cpu = settings.lockCPU ? lockToCurrentCPU() : -1;
+ if (cpu != -1)
+ conn->to << 1 << cpu;
+ else
+ conn->to << 0;
+ }
+
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 11)
+ conn->to << reserveSpace;
+
+ conn->processStderr();
}
+ catch (Error & e) {
+ throw Error(format("cannot start daemon worker: %1%") % e.msg());
+ }
+
+ setOptions(conn);
+
+ return conn;
}
-void RemoteStore::setOptions()
+void RemoteStore::setOptions(ref<Connection> conn)
{
- to << wopSetOptions
+ conn->to << wopSetOptions
<< settings.keepFailed
<< settings.keepGoing
<< settings.tryFallback
<< verbosity
<< settings.maxBuildJobs
<< settings.maxSilentTime;
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 2)
- to << settings.useBuildHook;
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 4)
- to << settings.buildVerbosity
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 2)
+ conn->to << settings.useBuildHook;
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 4)
+ conn->to << settings.buildVerbosity
<< logType
<< settings.printBuildTrace;
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 6)
- to << settings.buildCores;
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 10)
- to << settings.useSubstitutes;
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 6)
+ conn->to << settings.buildCores;
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 10)
+ conn->to << settings.useSubstitutes;
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 12) {
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 12) {
Settings::SettingsMap overrides = settings.getOverrides();
if (overrides["ssh-auth-sock"] == "")
overrides["ssh-auth-sock"] = getEnv("SSH_AUTH_SOCK");
- to << overrides.size();
+ conn->to << overrides.size();
for (auto & i : overrides)
- to << i.first << i.second;
+ conn->to << i.first << i.second;
}
- processStderr();
+ conn->processStderr();
}
bool RemoteStore::isValidPath(const Path & path)
{
- openConnection();
- to << wopIsValidPath << path;
- processStderr();
- unsigned int reply = readInt(from);
+ auto conn(connections->get());
+ conn->to << wopIsValidPath << path;
+ conn->processStderr();
+ unsigned int reply = readInt(conn->from);
return reply != 0;
}
PathSet RemoteStore::queryValidPaths(const PathSet & paths)
{
- openConnection();
- if (GET_PROTOCOL_MINOR(daemonVersion) < 12) {
+ auto conn(connections->get());
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
PathSet res;
for (auto & i : paths)
if (isValidPath(i)) res.insert(i);
return res;
} else {
- to << wopQueryValidPaths << paths;
- processStderr();
- return readStorePaths<PathSet>(from);
+ conn->to << wopQueryValidPaths << paths;
+ conn->processStderr();
+ return readStorePaths<PathSet>(conn->from);
}
}
PathSet RemoteStore::queryAllValidPaths()
{
- openConnection();
- to << wopQueryAllValidPaths;
- processStderr();
- return readStorePaths<PathSet>(from);
+ auto conn(connections->get());
+ conn->to << wopQueryAllValidPaths;
+ conn->processStderr();
+ return readStorePaths<PathSet>(conn->from);
}
PathSet RemoteStore::querySubstitutablePaths(const PathSet & paths)
{
- openConnection();
- if (GET_PROTOCOL_MINOR(daemonVersion) < 12) {
+ auto conn(connections->get());
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
PathSet res;
for (auto & i : paths) {
- to << wopHasSubstitutes << i;
- processStderr();
- if (readInt(from)) res.insert(i);
+ conn->to << wopHasSubstitutes << i;
+ conn->processStderr();
+ if (readInt(conn->from)) res.insert(i);
}
return res;
} else {
- to << wopQuerySubstitutablePaths << paths;
- processStderr();
- return readStorePaths<PathSet>(from);
+ conn->to << wopQuerySubstitutablePaths << paths;
+ conn->processStderr();
+ return readStorePaths<PathSet>(conn->from);
}
}
@@ -225,39 +212,39 @@ void RemoteStore::querySubstitutablePathInfos(const PathSet & paths,
{
if (paths.empty()) return;
- openConnection();
+ auto conn(connections->get());
- if (GET_PROTOCOL_MINOR(daemonVersion) < 3) return;
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 3) return;
- if (GET_PROTOCOL_MINOR(daemonVersion) < 12) {
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
for (auto & i : paths) {
SubstitutablePathInfo info;
- to << wopQuerySubstitutablePathInfo << i;
- processStderr();
- unsigned int reply = readInt(from);
+ conn->to << wopQuerySubstitutablePathInfo << i;
+ conn->processStderr();
+ unsigned int reply = readInt(conn->from);
if (reply == 0) continue;
- info.deriver = readString(from);
+ info.deriver = readString(conn->from);
if (info.deriver != "") assertStorePath(info.deriver);
- info.references = readStorePaths<PathSet>(from);
- info.downloadSize = readLongLong(from);
- info.narSize = GET_PROTOCOL_MINOR(daemonVersion) >= 7 ? readLongLong(from) : 0;
+ info.references = readStorePaths<PathSet>(conn->from);
+ info.downloadSize = readLongLong(conn->from);
+ info.narSize = GET_PROTOCOL_MINOR(conn->daemonVersion) >= 7 ? readLongLong(conn->from) : 0;
infos[i] = info;
}
} else {
- to << wopQuerySubstitutablePathInfos << paths;
- processStderr();
- unsigned int count = readInt(from);
+ conn->to << wopQuerySubstitutablePathInfos << paths;
+ conn->processStderr();
+ unsigned int count = readInt(conn->from);
for (unsigned int n = 0; n < count; n++) {
- Path path = readStorePath(from);
+ Path path = readStorePath(conn->from);
SubstitutablePathInfo & info(infos[path]);
- info.deriver = readString(from);
+ info.deriver = readString(conn->from);
if (info.deriver != "") assertStorePath(info.deriver);
- info.references = readStorePaths<PathSet>(from);
- info.downloadSize = readLongLong(from);
- info.narSize = readLongLong(from);
+ info.references = readStorePaths<PathSet>(conn->from);
+ info.downloadSize = readLongLong(conn->from);
+ info.narSize = readLongLong(conn->from);
}
}
@@ -266,27 +253,27 @@ void RemoteStore::querySubstitutablePathInfos(const PathSet & paths,
ValidPathInfo RemoteStore::queryPathInfo(const Path & path)
{
- openConnection();
- to << wopQueryPathInfo << path;
- processStderr();
+ auto conn(connections->get());
+ conn->to << wopQueryPathInfo << path;
+ conn->processStderr();
ValidPathInfo info;
info.path = path;
- info.deriver = readString(from);
+ info.deriver = readString(conn->from);
if (info.deriver != "") assertStorePath(info.deriver);
- info.narHash = parseHash(htSHA256, readString(from));
- info.references = readStorePaths<PathSet>(from);
- info.registrationTime = readInt(from);
- info.narSize = readLongLong(from);
+ info.narHash = parseHash(htSHA256, readString(conn->from));
+ info.references = readStorePaths<PathSet>(conn->from);
+ info.registrationTime = readInt(conn->from);
+ info.narSize = readLongLong(conn->from);
return info;
}
Hash RemoteStore::queryPathHash(const Path & path)
{
- openConnection();
- to << wopQueryPathHash << path;
- processStderr();
- string hash = readString(from);
+ auto conn(connections->get());
+ conn->to << wopQueryPathHash << path;
+ conn->processStderr();
+ string hash = readString(conn->from);
return parseHash(htSHA256, hash);
}
@@ -294,10 +281,10 @@ Hash RemoteStore::queryPathHash(const Path & path)
void RemoteStore::queryReferences(const Path & path,
PathSet & references)
{
- openConnection();
- to << wopQueryReferences << path;
- processStderr();
- PathSet references2 = readStorePaths<PathSet>(from);
+ auto conn(connections->get());
+ conn->to << wopQueryReferences << path;
+ conn->processStderr();
+ PathSet references2 = readStorePaths<PathSet>(conn->from);
references.insert(references2.begin(), references2.end());
}
@@ -305,20 +292,20 @@ void RemoteStore::queryReferences(const Path & path,
void RemoteStore::queryReferrers(const Path & path,
PathSet & referrers)
{
- openConnection();
- to << wopQueryReferrers << path;
- processStderr();
- PathSet referrers2 = readStorePaths<PathSet>(from);
+ auto conn(connections->get());
+ conn->to << wopQueryReferrers << path;
+ conn->processStderr();
+ PathSet referrers2 = readStorePaths<PathSet>(conn->from);
referrers.insert(referrers2.begin(), referrers2.end());
}
Path RemoteStore::queryDeriver(const Path & path)
{
- openConnection();
- to << wopQueryDeriver << path;
- processStderr();
- Path drvPath = readString(from);
+ auto conn(connections->get());
+ conn->to << wopQueryDeriver << path;
+ conn->processStderr();
+ Path drvPath = readString(conn->from);
if (drvPath != "") assertStorePath(drvPath);
return drvPath;
}
@@ -326,37 +313,37 @@ Path RemoteStore::queryDeriver(const Path & path)
PathSet RemoteStore::queryValidDerivers(const Path & path)
{
- openConnection();
- to << wopQueryValidDerivers << path;
- processStderr();
- return readStorePaths<PathSet>(from);
+ auto conn(connections->get());
+ conn->to << wopQueryValidDerivers << path;
+ conn->processStderr();
+ return readStorePaths<PathSet>(conn->from);
}
PathSet RemoteStore::queryDerivationOutputs(const Path & path)
{
- openConnection();
- to << wopQueryDerivationOutputs << path;
- processStderr();
- return readStorePaths<PathSet>(from);
+ auto conn(connections->get());
+ conn->to << wopQueryDerivationOutputs << path;
+ conn->processStderr();
+ return readStorePaths<PathSet>(conn->from);
}
PathSet RemoteStore::queryDerivationOutputNames(const Path & path)
{
- openConnection();
- to << wopQueryDerivationOutputNames << path;
- processStderr();
- return readStrings<PathSet>(from);
+ auto conn(connections->get());
+ conn->to << wopQueryDerivationOutputNames << path;
+ conn->processStderr();
+ return readStrings<PathSet>(conn->from);
}
Path RemoteStore::queryPathFromHashPart(const string & hashPart)
{
- openConnection();
- to << wopQueryPathFromHashPart << hashPart;
- processStderr();
- Path path = readString(from);
+ auto conn(connections->get());
+ conn->to << wopQueryPathFromHashPart << hashPart;
+ conn->processStderr();
+ Path path = readString(conn->from);
if (!path.empty()) assertStorePath(path);
return path;
}
@@ -367,32 +354,32 @@ Path RemoteStore::addToStore(const string & name, const Path & _srcPath,
{
if (repair) throw Error("repairing is not supported when building through the Nix daemon");
- openConnection();
+ auto conn(connections->get());
Path srcPath(absPath(_srcPath));
- to << wopAddToStore << name
+ conn->to << wopAddToStore << name
<< ((hashAlgo == htSHA256 && recursive) ? 0 : 1) /* backwards compatibility hack */
<< (recursive ? 1 : 0)
<< printHashType(hashAlgo);
try {
- to.written = 0;
- to.warn = true;
- dumpPath(srcPath, to, filter);
- to.warn = false;
- processStderr();
+ conn->to.written = 0;
+ conn->to.warn = true;
+ dumpPath(srcPath, conn->to, filter);
+ conn->to.warn = false;
+ conn->processStderr();
} catch (SysError & e) {
/* Daemon closed while we were sending the path. Probably OOM
or I/O error. */
if (e.errNo == EPIPE)
try {
- processStderr();
+ conn->processStderr();
} catch (EndOfFile & e) { }
throw;
}
- return readStorePath(from);
+ return readStorePath(conn->from);
}
@@ -401,43 +388,43 @@ Path RemoteStore::addTextToStore(const string & name, const string & s,
{
if (repair) throw Error("repairing is not supported when building through the Nix daemon");
- openConnection();
- to << wopAddTextToStore << name << s << references;
+ auto conn(connections->get());
+ conn->to << wopAddTextToStore << name << s << references;
- processStderr();
- return readStorePath(from);
+ conn->processStderr();
+ return readStorePath(conn->from);
}
void RemoteStore::exportPath(const Path & path, bool sign,
Sink & sink)
{
- openConnection();
- to << wopExportPath << path << (sign ? 1 : 0);
- processStderr(&sink); /* sink receives the actual data */
- readInt(from);
+ auto conn(connections->get());
+ conn->to << wopExportPath << path << (sign ? 1 : 0);
+ conn->processStderr(&sink); /* sink receives the actual data */
+ readInt(conn->from);
}
Paths RemoteStore::importPaths(bool requireSignature, Source & source)
{
- openConnection();
- to << wopImportPaths;
+ auto conn(connections->get());
+ conn->to << wopImportPaths;
/* We ignore requireSignature, since the worker forces it to true
anyway. */
- processStderr(0, &source);
- return readStorePaths<Paths>(from);
+ conn->processStderr(0, &source);
+ return readStorePaths<Paths>(conn->from);
}
void RemoteStore::buildPaths(const PathSet & drvPaths, BuildMode buildMode)
{
- openConnection();
- to << wopBuildPaths;
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 13) {
- to << drvPaths;
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 15)
- to << buildMode;
+ auto conn(connections->get());
+ conn->to << wopBuildPaths;
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 13) {
+ conn->to << drvPaths;
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 15)
+ conn->to << buildMode;
else
/* Old daemons did not take a 'buildMode' parameter, so we
need to validate it here on the client side. */
@@ -449,22 +436,22 @@ void RemoteStore::buildPaths(const PathSet & drvPaths, BuildMode buildMode)
PathSet drvPaths2;
for (auto & i : drvPaths)
drvPaths2.insert(string(i, 0, i.find('!')));
- to << drvPaths2;
+ conn->to << drvPaths2;
}
- processStderr();
- readInt(from);
+ conn->processStderr();
+ readInt(conn->from);
}
BuildResult RemoteStore::buildDerivation(const Path & drvPath, const BasicDerivation & drv,
BuildMode buildMode)
{
- openConnection();
- to << wopBuildDerivation << drvPath << drv << buildMode;
- processStderr();
+ auto conn(connections->get());
+ conn->to << wopBuildDerivation << drvPath << drv << buildMode;
+ conn->processStderr();
BuildResult res;
unsigned int status;
- from >> status >> res.errorMsg;
+ conn->from >> status >> res.errorMsg;
res.status = (BuildResult::Status) status;
return res;
}
@@ -472,50 +459,50 @@ BuildResult RemoteStore::buildDerivation(const Path & drvPath, const BasicDeriva
void RemoteStore::ensurePath(const Path & path)
{
- openConnection();
- to << wopEnsurePath << path;
- processStderr();
- readInt(from);
+ auto conn(connections->get());
+ conn->to << wopEnsurePath << path;
+ conn->processStderr();
+ readInt(conn->from);
}
void RemoteStore::addTempRoot(const Path & path)
{
- openConnection();
- to << wopAddTempRoot << path;
- processStderr();
- readInt(from);
+ auto conn(connections->get());
+ conn->to << wopAddTempRoot << path;
+ conn->processStderr();
+ readInt(conn->from);
}
void RemoteStore::addIndirectRoot(const Path & path)
{
- openConnection();
- to << wopAddIndirectRoot << path;
- processStderr();
- readInt(from);
+ auto conn(connections->get());
+ conn->to << wopAddIndirectRoot << path;
+ conn->processStderr();
+ readInt(conn->from);
}
void RemoteStore::syncWithGC()
{
- openConnection();
- to << wopSyncWithGC;
- processStderr();
- readInt(from);
+ auto conn(connections->get());
+ conn->to << wopSyncWithGC;
+ conn->processStderr();
+ readInt(conn->from);
}
Roots RemoteStore::findRoots()
{
- openConnection();
- to << wopFindRoots;
- processStderr();
- unsigned int count = readInt(from);
+ auto conn(connections->get());
+ conn->to << wopFindRoots;
+ conn->processStderr();
+ unsigned int count = readInt(conn->from);
Roots result;
while (count--) {
- Path link = readString(from);
- Path target = readStorePath(from);
+ Path link = readString(conn->from);
+ Path target = readStorePath(conn->from);
result[link] = target;
}
return result;
@@ -524,56 +511,68 @@ Roots RemoteStore::findRoots()
void RemoteStore::collectGarbage(const GCOptions & options, GCResults & results)
{
- openConnection(false);
+ auto conn(connections->get());
- to << wopCollectGarbage << options.action << options.pathsToDelete << options.ignoreLiveness
+ conn->to << wopCollectGarbage << options.action << options.pathsToDelete << options.ignoreLiveness
<< options.maxFreed << 0;
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 5)
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 5)
/* removed options */
- to << 0 << 0;
+ conn->to << 0 << 0;
- processStderr();
+ conn->processStderr();
- results.paths = readStrings<PathSet>(from);
- results.bytesFreed = readLongLong(from);
- readLongLong(from); // obsolete
+ results.paths = readStrings<PathSet>(conn->from);
+ results.bytesFreed = readLongLong(conn->from);
+ readLongLong(conn->from); // obsolete
}
PathSet RemoteStore::queryFailedPaths()
{
- openConnection();
- to << wopQueryFailedPaths;
- processStderr();
- return readStorePaths<PathSet>(from);
+ auto conn(connections->get());
+ conn->to << wopQueryFailedPaths;
+ conn->processStderr();
+ return readStorePaths<PathSet>(conn->from);
}
void RemoteStore::clearFailedPaths(const PathSet & paths)
{
- openConnection();
- to << wopClearFailedPaths << paths;
- processStderr();
- readInt(from);
+ auto conn(connections->get());
+ conn->to << wopClearFailedPaths << paths;
+ conn->processStderr();
+ readInt(conn->from);
}
void RemoteStore::optimiseStore()
{
- openConnection();
- to << wopOptimiseStore;
- processStderr();
- readInt(from);
+ auto conn(connections->get());
+ conn->to << wopOptimiseStore;
+ conn->processStderr();
+ readInt(conn->from);
}
bool RemoteStore::verifyStore(bool checkContents, bool repair)
{
- openConnection();
- to << wopVerifyStore << checkContents << repair;
- processStderr();
- return readInt(from) != 0;
+ auto conn(connections->get());
+ conn->to << wopVerifyStore << checkContents << repair;
+ conn->processStderr();
+ return readInt(conn->from) != 0;
+}
+
+
+RemoteStore::Connection::~Connection()
+{
+ try {
+ to.flush();
+ fd.close();
+ } catch (...) {
+ ignoreException();
+ }
}
-void RemoteStore::processStderr(Sink * sink, Source * source)
+
+void RemoteStore::Connection::processStderr(Sink * sink, Source * source)
{
to.flush();
unsigned int msg;
diff --git a/src/libstore/remote-store.hh b/src/libstore/remote-store.hh
index f15182285..af417b84f 100644
--- a/src/libstore/remote-store.hh
+++ b/src/libstore/remote-store.hh
@@ -1,5 +1,6 @@
#pragma once
+#include <limits>
#include <string>
#include "store-api.hh"
@@ -12,15 +13,14 @@ class Pipe;
class Pid;
struct FdSink;
struct FdSource;
+template<typename T> class Pool;
class RemoteStore : public Store
{
public:
- RemoteStore();
-
- ~RemoteStore();
+ RemoteStore(size_t maxConnections = std::numeric_limits<size_t>::max());
/* Implementations of abstract store API methods. */
@@ -91,19 +91,24 @@ public:
bool verifyStore(bool checkContents, bool repair) override;
private:
- AutoCloseFD fdSocket;
- FdSink to;
- FdSource from;
- unsigned int daemonVersion;
- bool initialised;
- void openConnection(bool reserveSpace = true);
+ struct Connection
+ {
+ AutoCloseFD fd;
+ FdSink to;
+ FdSource from;
+ unsigned int daemonVersion;
+
+ ~Connection();
+
+ void processStderr(Sink * sink = 0, Source * source = 0);
+ };
- void processStderr(Sink * sink = 0, Source * source = 0);
+ ref<Pool<Connection>> connections;
- void connectToDaemon();
+ ref<Connection> openConnection(bool reserveSpace = true);
- void setOptions();
+ void setOptions(ref<Connection> conn);
};
diff --git a/src/libutil/pool.hh b/src/libutil/pool.hh
new file mode 100644
index 000000000..f291cd578
--- /dev/null
+++ b/src/libutil/pool.hh
@@ -0,0 +1,151 @@
+#pragma once
+
+#include <functional>
+#include <limits>
+#include <list>
+#include <memory>
+#include <cassert>
+
+#include "sync.hh"
+#include "ref.hh"
+
+namespace nix {
+
+/* This template class implements a simple pool manager of resources
+ of some type R, such as database connections. It is used as
+ follows:
+
+ class Connection { ... };
+
+ Pool<Connection> pool;
+
+ {
+ auto conn(pool.get());
+ conn->exec("select ...");
+ }
+
+ Here, the Connection object referenced by ‘conn’ is automatically
+ returned to the pool when ‘conn’ goes out of scope.
+*/
+
+template <class R>
+class Pool
+{
+public:
+
+ /* A function that produces new instances of R on demand. */
+ typedef std::function<ref<R>()> Factory;
+
+ /* A function that checks whether an instance of R is still
+ usable. Unusable instances are removed from the pool. */
+ typedef std::function<bool(const ref<R> &)> Validator;
+
+private:
+
+ Factory factory;
+ Validator validator;
+
+ struct State
+ {
+ size_t inUse = 0;
+ size_t max;
+ std::vector<ref<R>> idle;
+ };
+
+ Sync<State> state;
+
+ std::condition_variable wakeup;
+
+public:
+
+ Pool(size_t max = std::numeric_limits<size_t>::max(),
+ const Factory & factory = []() { return make_ref<R>(); },
+ const Validator & validator = [](ref<R> r) { return true; })
+ : factory(factory)
+ , validator(validator)
+ {
+ auto state_(state.lock());
+ state_->max = max;
+ }
+
+ ~Pool()
+ {
+ auto state_(state.lock());
+ assert(!state_->inUse);
+ state_->max = 0;
+ state_->idle.clear();
+ }
+
+ class Handle
+ {
+ private:
+ Pool & pool;
+ std::shared_ptr<R> r;
+
+ friend Pool;
+
+ Handle(Pool & pool, std::shared_ptr<R> r) : pool(pool), r(r) { }
+
+ public:
+ Handle(Handle && h) : pool(h.pool), r(h.r) { h.r.reset(); }
+
+ Handle(const Handle & l) = delete;
+
+ ~Handle()
+ {
+ if (!r) return;
+ {
+ auto state_(pool.state.lock());
+ state_->idle.push_back(ref<R>(r));
+ assert(state_->inUse);
+ state_->inUse--;
+ }
+ pool.wakeup.notify_one();
+ }
+
+ R * operator -> () { return &*r; }
+ R & operator * () { return *r; }
+ };
+
+ Handle get()
+ {
+ {
+ auto state_(state.lock());
+
+ /* If we're over the maximum number of instance, we need
+ to wait until a slot becomes available. */
+ while (state_->idle.empty() && state_->inUse >= state_->max)
+ state_.wait(wakeup);
+
+ while (!state_->idle.empty()) {
+ auto p = state_->idle.back();
+ state_->idle.pop_back();
+ if (validator(p)) {
+ state_->inUse++;
+ return Handle(*this, p);
+ }
+ }
+
+ state_->inUse++;
+ }
+
+ /* We need to create a new instance. Because that might take a
+ while, we don't hold the lock in the meantime. */
+ try {
+ Handle h(*this, factory());
+ return h;
+ } catch (...) {
+ auto state_(state.lock());
+ state_->inUse--;
+ throw;
+ }
+ }
+
+ unsigned int count()
+ {
+ auto state_(state.lock());
+ return state_->idle.size() + state_->inUse;
+ }
+};
+
+}
diff --git a/src/libutil/ref.hh b/src/libutil/ref.hh
new file mode 100644
index 000000000..a6d338d79
--- /dev/null
+++ b/src/libutil/ref.hh
@@ -0,0 +1,67 @@
+#pragma once
+
+#include <memory>
+#include <exception>
+
+namespace nix {
+
+/* A simple non-nullable reference-counted pointer. Actually a wrapper
+ around std::shared_ptr that prevents non-null constructions. */
+template<typename T>
+class ref
+{
+private:
+
+ std::shared_ptr<T> p;
+
+public:
+
+ ref<T>(const ref<T> & r)
+ : p(r.p)
+ { }
+
+ explicit ref<T>(const std::shared_ptr<T> & p)
+ : p(p)
+ {
+ if (!p)
+ throw std::invalid_argument("null pointer cast to ref");
+ }
+
+ T* operator ->() const
+ {
+ return &*p;
+ }
+
+ T& operator *() const
+ {
+ return *p;
+ }
+
+ operator std::shared_ptr<T> ()
+ {
+ return p;
+ }
+
+ template<typename T2>
+ operator ref<T2> ()
+ {
+ return ref<T2>((std::shared_ptr<T2>) p);
+ }
+
+private:
+
+ template<typename T2, typename... Args>
+ friend ref<T2>
+ make_ref(Args&&... args);
+
+};
+
+template<typename T, typename... Args>
+inline ref<T>
+make_ref(Args&&... args)
+{
+ auto p = std::make_shared<T>(std::forward<Args>(args)...);
+ return ref<T>(p);
+}
+
+}
diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc
index f136a1324..c9620e2bf 100644
--- a/src/libutil/serialise.cc
+++ b/src/libutil/serialise.cc
@@ -72,7 +72,17 @@ void FdSink::write(const unsigned char * data, size_t len)
warned = true;
}
}
- writeFull(fd, data, len);
+ try {
+ writeFull(fd, data, len);
+ } catch (SysError & e) {
+ _good = true;
+ }
+}
+
+
+bool FdSink::good()
+{
+ return _good;
}
@@ -119,12 +129,18 @@ size_t FdSource::readUnbuffered(unsigned char * data, size_t len)
checkInterrupt();
n = ::read(fd, (char *) data, bufSize);
} while (n == -1 && errno == EINTR);
- if (n == -1) throw SysError("reading from file");
- if (n == 0) throw EndOfFile("unexpected end-of-file");
+ if (n == -1) { _good = false; throw SysError("reading from file"); }
+ if (n == 0) { _good = false; throw EndOfFile("unexpected end-of-file"); }
return n;
}
+bool FdSource::good()
+{
+ return _good;
+}
+
+
size_t StringSource::read(unsigned char * data, size_t len)
{
if (pos == s.size()) throw EndOfFile("end of string reached");
diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh
index 979ff849f..9e269f392 100644
--- a/src/libutil/serialise.hh
+++ b/src/libutil/serialise.hh
@@ -12,6 +12,7 @@ struct Sink
{
virtual ~Sink() { }
virtual void operator () (const unsigned char * data, size_t len) = 0;
+ virtual bool good() { return true; }
};
@@ -25,7 +26,7 @@ struct BufferedSink : Sink
: bufSize(bufSize), bufPos(0), buffer(0) { }
~BufferedSink();
- void operator () (const unsigned char * data, size_t len);
+ void operator () (const unsigned char * data, size_t len) override;
void flush();
@@ -47,6 +48,8 @@ struct Source
return the number of bytes stored. If blocks until at least
one byte is available. */
virtual size_t read(unsigned char * data, size_t len) = 0;
+
+ virtual bool good() { return true; }
};
@@ -60,7 +63,7 @@ struct BufferedSource : Source
: bufSize(bufSize), bufPosIn(0), bufPosOut(0), buffer(0) { }
~BufferedSource();
- size_t read(unsigned char * data, size_t len);
+ size_t read(unsigned char * data, size_t len) override;
/* Underlying read call, to be overridden. */
virtual size_t readUnbuffered(unsigned char * data, size_t len) = 0;
@@ -80,7 +83,12 @@ struct FdSink : BufferedSink
FdSink(int fd) : fd(fd), warn(false), written(0) { }
~FdSink();
- void write(const unsigned char * data, size_t len);
+ void write(const unsigned char * data, size_t len) override;
+
+ bool good() override;
+
+private:
+ bool _good = true;
};
@@ -90,7 +98,10 @@ struct FdSource : BufferedSource
int fd;
FdSource() : fd(-1) { }
FdSource(int fd) : fd(fd) { }
- size_t readUnbuffered(unsigned char * data, size_t len);
+ size_t readUnbuffered(unsigned char * data, size_t len) override;
+ bool good() override;
+private:
+ bool _good = true;
};
@@ -98,7 +109,7 @@ struct FdSource : BufferedSource
struct StringSink : Sink
{
string s;
- void operator () (const unsigned char * data, size_t len);
+ void operator () (const unsigned char * data, size_t len) override;
};
@@ -108,7 +119,7 @@ struct StringSource : Source
const string & s;
size_t pos;
StringSource(const string & _s) : s(_s), pos(0) { }
- size_t read(unsigned char * data, size_t len);
+ size_t read(unsigned char * data, size_t len) override;
};
diff --git a/src/libutil/sync.hh b/src/libutil/sync.hh
new file mode 100644
index 000000000..c99c098ac
--- /dev/null
+++ b/src/libutil/sync.hh
@@ -0,0 +1,78 @@
+#pragma once
+
+#include <mutex>
+#include <condition_variable>
+#include <cassert>
+
+namespace nix {
+
+/* This template class ensures synchronized access to a value of type
+ T. It is used as follows:
+
+ struct Data { int x; ... };
+
+ Sync<Data> data;
+
+ {
+ auto data_(data.lock());
+ data_->x = 123;
+ }
+
+ Here, "data" is automatically unlocked when "data_" goes out of
+ scope.
+*/
+
+template<class T>
+class Sync
+{
+private:
+ std::mutex mutex;
+ T data;
+
+public:
+
+ Sync() { }
+ Sync(const T & data) : data(data) { }
+
+ class Lock
+ {
+ private:
+ Sync * s;
+ std::unique_lock<std::mutex> lk;
+ friend Sync;
+ Lock(Sync * s) : s(s), lk(s->mutex) { }
+ public:
+ Lock(Lock && l) : s(l.s) { abort(); }
+ Lock(const Lock & l) = delete;
+ ~Lock() { }
+ T * operator -> () { return &s->data; }
+ T & operator * () { return s->data; }
+
+ void wait(std::condition_variable & cv)
+ {
+ assert(s);
+ cv.wait(lk);
+ }
+
+ template<class Rep, class Period, class Predicate>
+ bool wait_for(std::condition_variable & cv,
+ const std::chrono::duration<Rep, Period> & duration,
+ Predicate pred)
+ {
+ assert(s);
+ return cv.wait_for(lk, duration, pred);
+ }
+
+ template<class Clock, class Duration>
+ std::cv_status wait_until(std::condition_variable & cv,
+ const std::chrono::time_point<Clock, Duration> & duration)
+ {
+ assert(s);
+ return cv.wait_until(lk, duration);
+ }
+ };
+
+ Lock lock() { return Lock(this); }
+};
+
+}
diff --git a/src/libutil/types.hh b/src/libutil/types.hh
index 0eae46c5f..33aaf5fc9 100644
--- a/src/libutil/types.hh
+++ b/src/libutil/types.hh
@@ -2,6 +2,8 @@
#include "config.h"
+#include "ref.hh"
+
#include <string>
#include <list>
#include <set>
@@ -97,63 +99,4 @@ typedef enum {
} Verbosity;
-/* A simple non-nullable reference-counted pointer. Actually a wrapper
- around std::shared_ptr that prevents non-null constructions. */
-template<typename T>
-class ref
-{
-private:
-
- std::shared_ptr<T> p;
-
-public:
-
- ref<T>(const ref<T> & r)
- : p(r.p)
- { }
-
- explicit ref<T>(const std::shared_ptr<T> & p)
- : p(p)
- {
- if (!p)
- throw std::invalid_argument("null pointer cast to ref");
- }
-
- T* operator ->() const
- {
- return &*p;
- }
-
- T& operator *() const
- {
- return *p;
- }
-
- operator std::shared_ptr<T> ()
- {
- return p;
- }
-
- template<typename T2>
- operator ref<T2> ()
- {
- return ref<T2>((std::shared_ptr<T2>) p);
- }
-
-private:
-
- template<typename T2, typename... Args>
- friend ref<T2>
- make_ref(Args&&... args);
-
-};
-
-template<typename T, typename... Args>
-inline ref<T>
-make_ref(Args&&... args)
-{
- auto p = std::make_shared<T>(std::forward<Args>(args)...);
- return ref<T>(p);
-}
-
}