aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/libstore/remote-store.cc447
-rw-r--r--src/libstore/remote-store.hh22
-rw-r--r--src/libutil/pool.hh102
-rw-r--r--src/libutil/sync.hh78
4 files changed, 414 insertions, 235 deletions
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc
index ab2ebb9ae..847da107a 100644
--- a/src/libstore/remote-store.cc
+++ b/src/libstore/remote-store.cc
@@ -6,6 +6,7 @@
#include "affinity.hh"
#include "globals.hh"
#include "derivations.hh"
+#include "pool.hh"
#include <sys/types.h>
#include <sys/stat.h>
@@ -14,7 +15,6 @@
#include <errno.h>
#include <fcntl.h>
-#include <iostream>
#include <unistd.h>
#include <cstring>
@@ -40,61 +40,20 @@ template PathSet readStorePaths(Source & from);
RemoteStore::RemoteStore()
+ : connections(make_ref<Pool<Connection>>([this]() { return openConnection(); }))
{
- initialised = false;
}
-void RemoteStore::openConnection(bool reserveSpace)
+ref<RemoteStore::Connection> RemoteStore::openConnection(bool reserveSpace)
{
- if (initialised) return;
- initialised = true;
+ auto conn = make_ref<Connection>();
/* Connect to a daemon that does the privileged work for us. */
- connectToDaemon();
-
- from.fd = fdSocket;
- to.fd = fdSocket;
-
- /* Send the magic greeting, check for the reply. */
- try {
- to << WORKER_MAGIC_1;
- to.flush();
- unsigned int magic = readInt(from);
- if (magic != WORKER_MAGIC_2) throw Error("protocol mismatch");
-
- daemonVersion = readInt(from);
- if (GET_PROTOCOL_MAJOR(daemonVersion) != GET_PROTOCOL_MAJOR(PROTOCOL_VERSION))
- throw Error("Nix daemon protocol version not supported");
- to << PROTOCOL_VERSION;
-
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 14) {
- int cpu = settings.lockCPU ? lockToCurrentCPU() : -1;
- if (cpu != -1)
- to << 1 << cpu;
- else
- to << 0;
- }
-
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 11)
- to << reserveSpace;
-
- processStderr();
- }
- catch (Error & e) {
- throw Error(format("cannot start daemon worker: %1%") % e.msg());
- }
-
- setOptions();
-}
-
-
-void RemoteStore::connectToDaemon()
-{
- fdSocket = socket(PF_UNIX, SOCK_STREAM, 0);
- if (fdSocket == -1)
+ conn->fd = socket(PF_UNIX, SOCK_STREAM, 0);
+ if (conn->fd == -1)
throw SysError("cannot create Unix domain socket");
- closeOnExec(fdSocket);
+ closeOnExec(conn->fd);
string socketPath = settings.nixDaemonSocketFile;
@@ -111,111 +70,147 @@ void RemoteStore::connectToDaemon()
addr.sun_family = AF_UNIX;
if (socketPathRel.size() >= sizeof(addr.sun_path))
throw Error(format("socket path ‘%1%’ is too long") % socketPathRel);
- using namespace std;
strcpy(addr.sun_path, socketPathRel.c_str());
- if (connect(fdSocket, (struct sockaddr *) &addr, sizeof(addr)) == -1)
+ if (connect(conn->fd, (struct sockaddr *) &addr, sizeof(addr)) == -1)
throw SysError(format("cannot connect to daemon at ‘%1%’") % socketPath);
if (fchdir(fdPrevDir) == -1)
throw SysError("couldn't change back to previous directory");
+
+ conn->from.fd = conn->fd;
+ conn->to.fd = conn->fd;
+
+ /* Send the magic greeting, check for the reply. */
+ try {
+ conn->to << WORKER_MAGIC_1;
+ conn->to.flush();
+ unsigned int magic = readInt(conn->from);
+ if (magic != WORKER_MAGIC_2) throw Error("protocol mismatch");
+
+ conn->daemonVersion = readInt(conn->from);
+ if (GET_PROTOCOL_MAJOR(conn->daemonVersion) != GET_PROTOCOL_MAJOR(PROTOCOL_VERSION))
+ throw Error("Nix daemon protocol version not supported");
+ conn->to << PROTOCOL_VERSION;
+
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 14) {
+ int cpu = settings.lockCPU ? lockToCurrentCPU() : -1;
+ if (cpu != -1)
+ conn->to << 1 << cpu;
+ else
+ conn->to << 0;
+ }
+
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 11)
+ conn->to << reserveSpace;
+
+ conn->processStderr();
+ }
+ catch (Error & e) {
+ throw Error(format("cannot start daemon worker: %1%") % e.msg());
+ }
+
+ setOptions(conn);
+
+ return conn;
}
RemoteStore::~RemoteStore()
{
try {
- to.flush();
- fdSocket.close();
+ //to.flush();
+ //fdSocket.close();
+ // FIXME: close pool
} catch (...) {
ignoreException();
}
}
-void RemoteStore::setOptions()
+void RemoteStore::setOptions(ref<Connection> conn)
{
- to << wopSetOptions
+ conn->to << wopSetOptions
<< settings.keepFailed
<< settings.keepGoing
<< settings.tryFallback
<< verbosity
<< settings.maxBuildJobs
<< settings.maxSilentTime;
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 2)
- to << settings.useBuildHook;
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 4)
- to << settings.buildVerbosity
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 2)
+ conn->to << settings.useBuildHook;
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 4)
+ conn->to << settings.buildVerbosity
<< logType
<< settings.printBuildTrace;
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 6)
- to << settings.buildCores;
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 10)
- to << settings.useSubstitutes;
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 6)
+ conn->to << settings.buildCores;
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 10)
+ conn->to << settings.useSubstitutes;
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 12) {
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 12) {
Settings::SettingsMap overrides = settings.getOverrides();
if (overrides["ssh-auth-sock"] == "")
overrides["ssh-auth-sock"] = getEnv("SSH_AUTH_SOCK");
- to << overrides.size();
+ conn->to << overrides.size();
for (auto & i : overrides)
- to << i.first << i.second;
+ conn->to << i.first << i.second;
}
- processStderr();
+ conn->processStderr();
}
bool RemoteStore::isValidPath(const Path & path)
{
- openConnection();
- to << wopIsValidPath << path;
- processStderr();
- unsigned int reply = readInt(from);
+ auto conn(connections->get());
+ conn->to << wopIsValidPath << path;
+ conn->processStderr();
+ unsigned int reply = readInt(conn->from);
return reply != 0;
}
PathSet RemoteStore::queryValidPaths(const PathSet & paths)
{
- openConnection();
- if (GET_PROTOCOL_MINOR(daemonVersion) < 12) {
+ auto conn(connections->get());
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
PathSet res;
for (auto & i : paths)
if (isValidPath(i)) res.insert(i);
return res;
} else {
- to << wopQueryValidPaths << paths;
- processStderr();
- return readStorePaths<PathSet>(from);
+ conn->to << wopQueryValidPaths << paths;
+ conn->processStderr();
+ return readStorePaths<PathSet>(conn->from);
}
}
PathSet RemoteStore::queryAllValidPaths()
{
- openConnection();
- to << wopQueryAllValidPaths;
- processStderr();
- return readStorePaths<PathSet>(from);
+ auto conn(connections->get());
+ conn->to << wopQueryAllValidPaths;
+ conn->processStderr();
+ return readStorePaths<PathSet>(conn->from);
}
PathSet RemoteStore::querySubstitutablePaths(const PathSet & paths)
{
- openConnection();
- if (GET_PROTOCOL_MINOR(daemonVersion) < 12) {
+ auto conn(connections->get());
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
PathSet res;
for (auto & i : paths) {
- to << wopHasSubstitutes << i;
- processStderr();
- if (readInt(from)) res.insert(i);
+ conn->to << wopHasSubstitutes << i;
+ conn->processStderr();
+ if (readInt(conn->from)) res.insert(i);
}
return res;
} else {
- to << wopQuerySubstitutablePaths << paths;
- processStderr();
- return readStorePaths<PathSet>(from);
+ conn->to << wopQuerySubstitutablePaths << paths;
+ conn->processStderr();
+ return readStorePaths<PathSet>(conn->from);
}
}
@@ -225,39 +220,39 @@ void RemoteStore::querySubstitutablePathInfos(const PathSet & paths,
{
if (paths.empty()) return;
- openConnection();
+ auto conn(connections->get());
- if (GET_PROTOCOL_MINOR(daemonVersion) < 3) return;
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 3) return;
- if (GET_PROTOCOL_MINOR(daemonVersion) < 12) {
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
for (auto & i : paths) {
SubstitutablePathInfo info;
- to << wopQuerySubstitutablePathInfo << i;
- processStderr();
- unsigned int reply = readInt(from);
+ conn->to << wopQuerySubstitutablePathInfo << i;
+ conn->processStderr();
+ unsigned int reply = readInt(conn->from);
if (reply == 0) continue;
- info.deriver = readString(from);
+ info.deriver = readString(conn->from);
if (info.deriver != "") assertStorePath(info.deriver);
- info.references = readStorePaths<PathSet>(from);
- info.downloadSize = readLongLong(from);
- info.narSize = GET_PROTOCOL_MINOR(daemonVersion) >= 7 ? readLongLong(from) : 0;
+ info.references = readStorePaths<PathSet>(conn->from);
+ info.downloadSize = readLongLong(conn->from);
+ info.narSize = GET_PROTOCOL_MINOR(conn->daemonVersion) >= 7 ? readLongLong(conn->from) : 0;
infos[i] = info;
}
} else {
- to << wopQuerySubstitutablePathInfos << paths;
- processStderr();
- unsigned int count = readInt(from);
+ conn->to << wopQuerySubstitutablePathInfos << paths;
+ conn->processStderr();
+ unsigned int count = readInt(conn->from);
for (unsigned int n = 0; n < count; n++) {
- Path path = readStorePath(from);
+ Path path = readStorePath(conn->from);
SubstitutablePathInfo & info(infos[path]);
- info.deriver = readString(from);
+ info.deriver = readString(conn->from);
if (info.deriver != "") assertStorePath(info.deriver);
- info.references = readStorePaths<PathSet>(from);
- info.downloadSize = readLongLong(from);
- info.narSize = readLongLong(from);
+ info.references = readStorePaths<PathSet>(conn->from);
+ info.downloadSize = readLongLong(conn->from);
+ info.narSize = readLongLong(conn->from);
}
}
@@ -266,27 +261,27 @@ void RemoteStore::querySubstitutablePathInfos(const PathSet & paths,
ValidPathInfo RemoteStore::queryPathInfo(const Path & path)
{
- openConnection();
- to << wopQueryPathInfo << path;
- processStderr();
+ auto conn(connections->get());
+ conn->to << wopQueryPathInfo << path;
+ conn->processStderr();
ValidPathInfo info;
info.path = path;
- info.deriver = readString(from);
+ info.deriver = readString(conn->from);
if (info.deriver != "") assertStorePath(info.deriver);
- info.narHash = parseHash(htSHA256, readString(from));
- info.references = readStorePaths<PathSet>(from);
- info.registrationTime = readInt(from);
- info.narSize = readLongLong(from);
+ info.narHash = parseHash(htSHA256, readString(conn->from));
+ info.references = readStorePaths<PathSet>(conn->from);
+ info.registrationTime = readInt(conn->from);
+ info.narSize = readLongLong(conn->from);
return info;
}
Hash RemoteStore::queryPathHash(const Path & path)
{
- openConnection();
- to << wopQueryPathHash << path;
- processStderr();
- string hash = readString(from);
+ auto conn(connections->get());
+ conn->to << wopQueryPathHash << path;
+ conn->processStderr();
+ string hash = readString(conn->from);
return parseHash(htSHA256, hash);
}
@@ -294,10 +289,10 @@ Hash RemoteStore::queryPathHash(const Path & path)
void RemoteStore::queryReferences(const Path & path,
PathSet & references)
{
- openConnection();
- to << wopQueryReferences << path;
- processStderr();
- PathSet references2 = readStorePaths<PathSet>(from);
+ auto conn(connections->get());
+ conn->to << wopQueryReferences << path;
+ conn->processStderr();
+ PathSet references2 = readStorePaths<PathSet>(conn->from);
references.insert(references2.begin(), references2.end());
}
@@ -305,20 +300,20 @@ void RemoteStore::queryReferences(const Path & path,
void RemoteStore::queryReferrers(const Path & path,
PathSet & referrers)
{
- openConnection();
- to << wopQueryReferrers << path;
- processStderr();
- PathSet referrers2 = readStorePaths<PathSet>(from);
+ auto conn(connections->get());
+ conn->to << wopQueryReferrers << path;
+ conn->processStderr();
+ PathSet referrers2 = readStorePaths<PathSet>(conn->from);
referrers.insert(referrers2.begin(), referrers2.end());
}
Path RemoteStore::queryDeriver(const Path & path)
{
- openConnection();
- to << wopQueryDeriver << path;
- processStderr();
- Path drvPath = readString(from);
+ auto conn(connections->get());
+ conn->to << wopQueryDeriver << path;
+ conn->processStderr();
+ Path drvPath = readString(conn->from);
if (drvPath != "") assertStorePath(drvPath);
return drvPath;
}
@@ -326,37 +321,37 @@ Path RemoteStore::queryDeriver(const Path & path)
PathSet RemoteStore::queryValidDerivers(const Path & path)
{
- openConnection();
- to << wopQueryValidDerivers << path;
- processStderr();
- return readStorePaths<PathSet>(from);
+ auto conn(connections->get());
+ conn->to << wopQueryValidDerivers << path;
+ conn->processStderr();
+ return readStorePaths<PathSet>(conn->from);
}
PathSet RemoteStore::queryDerivationOutputs(const Path & path)
{
- openConnection();
- to << wopQueryDerivationOutputs << path;
- processStderr();
- return readStorePaths<PathSet>(from);
+ auto conn(connections->get());
+ conn->to << wopQueryDerivationOutputs << path;
+ conn->processStderr();
+ return readStorePaths<PathSet>(conn->from);
}
PathSet RemoteStore::queryDerivationOutputNames(const Path & path)
{
- openConnection();
- to << wopQueryDerivationOutputNames << path;
- processStderr();
- return readStrings<PathSet>(from);
+ auto conn(connections->get());
+ conn->to << wopQueryDerivationOutputNames << path;
+ conn->processStderr();
+ return readStrings<PathSet>(conn->from);
}
Path RemoteStore::queryPathFromHashPart(const string & hashPart)
{
- openConnection();
- to << wopQueryPathFromHashPart << hashPart;
- processStderr();
- Path path = readString(from);
+ auto conn(connections->get());
+ conn->to << wopQueryPathFromHashPart << hashPart;
+ conn->processStderr();
+ Path path = readString(conn->from);
if (!path.empty()) assertStorePath(path);
return path;
}
@@ -367,32 +362,32 @@ Path RemoteStore::addToStore(const string & name, const Path & _srcPath,
{
if (repair) throw Error("repairing is not supported when building through the Nix daemon");
- openConnection();
+ auto conn(connections->get());
Path srcPath(absPath(_srcPath));
- to << wopAddToStore << name
+ conn->to << wopAddToStore << name
<< ((hashAlgo == htSHA256 && recursive) ? 0 : 1) /* backwards compatibility hack */
<< (recursive ? 1 : 0)
<< printHashType(hashAlgo);
try {
- to.written = 0;
- to.warn = true;
- dumpPath(srcPath, to, filter);
- to.warn = false;
- processStderr();
+ conn->to.written = 0;
+ conn->to.warn = true;
+ dumpPath(srcPath, conn->to, filter);
+ conn->to.warn = false;
+ conn->processStderr();
} catch (SysError & e) {
/* Daemon closed while we were sending the path. Probably OOM
or I/O error. */
if (e.errNo == EPIPE)
try {
- processStderr();
+ conn->processStderr();
} catch (EndOfFile & e) { }
throw;
}
- return readStorePath(from);
+ return readStorePath(conn->from);
}
@@ -401,43 +396,43 @@ Path RemoteStore::addTextToStore(const string & name, const string & s,
{
if (repair) throw Error("repairing is not supported when building through the Nix daemon");
- openConnection();
- to << wopAddTextToStore << name << s << references;
+ auto conn(connections->get());
+ conn->to << wopAddTextToStore << name << s << references;
- processStderr();
- return readStorePath(from);
+ conn->processStderr();
+ return readStorePath(conn->from);
}
void RemoteStore::exportPath(const Path & path, bool sign,
Sink & sink)
{
- openConnection();
- to << wopExportPath << path << (sign ? 1 : 0);
- processStderr(&sink); /* sink receives the actual data */
- readInt(from);
+ auto conn(connections->get());
+ conn->to << wopExportPath << path << (sign ? 1 : 0);
+ conn->processStderr(&sink); /* sink receives the actual data */
+ readInt(conn->from);
}
Paths RemoteStore::importPaths(bool requireSignature, Source & source)
{
- openConnection();
- to << wopImportPaths;
+ auto conn(connections->get());
+ conn->to << wopImportPaths;
/* We ignore requireSignature, since the worker forces it to true
anyway. */
- processStderr(0, &source);
- return readStorePaths<Paths>(from);
+ conn->processStderr(0, &source);
+ return readStorePaths<Paths>(conn->from);
}
void RemoteStore::buildPaths(const PathSet & drvPaths, BuildMode buildMode)
{
- openConnection();
- to << wopBuildPaths;
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 13) {
- to << drvPaths;
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 15)
- to << buildMode;
+ auto conn(connections->get());
+ conn->to << wopBuildPaths;
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 13) {
+ conn->to << drvPaths;
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 15)
+ conn->to << buildMode;
else
/* Old daemons did not take a 'buildMode' parameter, so we
need to validate it here on the client side. */
@@ -449,22 +444,22 @@ void RemoteStore::buildPaths(const PathSet & drvPaths, BuildMode buildMode)
PathSet drvPaths2;
for (auto & i : drvPaths)
drvPaths2.insert(string(i, 0, i.find('!')));
- to << drvPaths2;
+ conn->to << drvPaths2;
}
- processStderr();
- readInt(from);
+ conn->processStderr();
+ readInt(conn->from);
}
BuildResult RemoteStore::buildDerivation(const Path & drvPath, const BasicDerivation & drv,
BuildMode buildMode)
{
- openConnection();
- to << wopBuildDerivation << drvPath << drv << buildMode;
- processStderr();
+ auto conn(connections->get());
+ conn->to << wopBuildDerivation << drvPath << drv << buildMode;
+ conn->processStderr();
BuildResult res;
unsigned int status;
- from >> status >> res.errorMsg;
+ conn->from >> status >> res.errorMsg;
res.status = (BuildResult::Status) status;
return res;
}
@@ -472,50 +467,50 @@ BuildResult RemoteStore::buildDerivation(const Path & drvPath, const BasicDeriva
void RemoteStore::ensurePath(const Path & path)
{
- openConnection();
- to << wopEnsurePath << path;
- processStderr();
- readInt(from);
+ auto conn(connections->get());
+ conn->to << wopEnsurePath << path;
+ conn->processStderr();
+ readInt(conn->from);
}
void RemoteStore::addTempRoot(const Path & path)
{
- openConnection();
- to << wopAddTempRoot << path;
- processStderr();
- readInt(from);
+ auto conn(connections->get());
+ conn->to << wopAddTempRoot << path;
+ conn->processStderr();
+ readInt(conn->from);
}
void RemoteStore::addIndirectRoot(const Path & path)
{
- openConnection();
- to << wopAddIndirectRoot << path;
- processStderr();
- readInt(from);
+ auto conn(connections->get());
+ conn->to << wopAddIndirectRoot << path;
+ conn->processStderr();
+ readInt(conn->from);
}
void RemoteStore::syncWithGC()
{
- openConnection();
- to << wopSyncWithGC;
- processStderr();
- readInt(from);
+ auto conn(connections->get());
+ conn->to << wopSyncWithGC;
+ conn->processStderr();
+ readInt(conn->from);
}
Roots RemoteStore::findRoots()
{
- openConnection();
- to << wopFindRoots;
- processStderr();
- unsigned int count = readInt(from);
+ auto conn(connections->get());
+ conn->to << wopFindRoots;
+ conn->processStderr();
+ unsigned int count = readInt(conn->from);
Roots result;
while (count--) {
- Path link = readString(from);
- Path target = readStorePath(from);
+ Path link = readString(conn->from);
+ Path target = readStorePath(conn->from);
result[link] = target;
}
return result;
@@ -524,56 +519,56 @@ Roots RemoteStore::findRoots()
void RemoteStore::collectGarbage(const GCOptions & options, GCResults & results)
{
- openConnection(false);
+ auto conn(connections->get());
- to << wopCollectGarbage << options.action << options.pathsToDelete << options.ignoreLiveness
+ conn->to << wopCollectGarbage << options.action << options.pathsToDelete << options.ignoreLiveness
<< options.maxFreed << 0;
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 5)
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 5)
/* removed options */
- to << 0 << 0;
+ conn->to << 0 << 0;
- processStderr();
+ conn->processStderr();
- results.paths = readStrings<PathSet>(from);
- results.bytesFreed = readLongLong(from);
- readLongLong(from); // obsolete
+ results.paths = readStrings<PathSet>(conn->from);
+ results.bytesFreed = readLongLong(conn->from);
+ readLongLong(conn->from); // obsolete
}
PathSet RemoteStore::queryFailedPaths()
{
- openConnection();
- to << wopQueryFailedPaths;
- processStderr();
- return readStorePaths<PathSet>(from);
+ auto conn(connections->get());
+ conn->to << wopQueryFailedPaths;
+ conn->processStderr();
+ return readStorePaths<PathSet>(conn->from);
}
void RemoteStore::clearFailedPaths(const PathSet & paths)
{
- openConnection();
- to << wopClearFailedPaths << paths;
- processStderr();
- readInt(from);
+ auto conn(connections->get());
+ conn->to << wopClearFailedPaths << paths;
+ conn->processStderr();
+ readInt(conn->from);
}
void RemoteStore::optimiseStore()
{
- openConnection();
- to << wopOptimiseStore;
- processStderr();
- readInt(from);
+ auto conn(connections->get());
+ conn->to << wopOptimiseStore;
+ conn->processStderr();
+ readInt(conn->from);
}
bool RemoteStore::verifyStore(bool checkContents, bool repair)
{
- openConnection();
- to << wopVerifyStore << checkContents << repair;
- processStderr();
- return readInt(from) != 0;
+ auto conn(connections->get());
+ conn->to << wopVerifyStore << checkContents << repair;
+ conn->processStderr();
+ return readInt(conn->from) != 0;
}
-void RemoteStore::processStderr(Sink * sink, Source * source)
+void RemoteStore::Connection::processStderr(Sink * sink, Source * source)
{
to.flush();
unsigned int msg;
diff --git a/src/libstore/remote-store.hh b/src/libstore/remote-store.hh
index f15182285..b16a6b51d 100644
--- a/src/libstore/remote-store.hh
+++ b/src/libstore/remote-store.hh
@@ -12,6 +12,7 @@ class Pipe;
class Pid;
struct FdSink;
struct FdSource;
+template<typename T> class Pool;
class RemoteStore : public Store
@@ -91,19 +92,22 @@ public:
bool verifyStore(bool checkContents, bool repair) override;
private:
- AutoCloseFD fdSocket;
- FdSink to;
- FdSource from;
- unsigned int daemonVersion;
- bool initialised;
- void openConnection(bool reserveSpace = true);
+ struct Connection
+ {
+ AutoCloseFD fd;
+ FdSink to;
+ FdSource from;
+ unsigned int daemonVersion;
- void processStderr(Sink * sink = 0, Source * source = 0);
+ void processStderr(Sink * sink = 0, Source * source = 0);
+ };
- void connectToDaemon();
+ ref<Pool<Connection>> connections;
- void setOptions();
+ ref<Connection> openConnection(bool reserveSpace = true);
+
+ void setOptions(ref<Connection> conn);
};
diff --git a/src/libutil/pool.hh b/src/libutil/pool.hh
new file mode 100644
index 000000000..d63912e28
--- /dev/null
+++ b/src/libutil/pool.hh
@@ -0,0 +1,102 @@
+#pragma once
+
+#include <memory>
+#include <list>
+#include <functional>
+
+#include "sync.hh"
+#include "ref.hh"
+
+namespace nix {
+
+/* This template class implements a simple pool manager of resources
+ of some type R, such as database connections. It is used as
+ follows:
+
+ class Connection { ... };
+
+ Pool<Connection> pool;
+
+ {
+ auto conn(pool.get());
+ conn->exec("select ...");
+ }
+
+ Here, the Connection object referenced by ‘conn’ is automatically
+ returned to the pool when ‘conn’ goes out of scope.
+*/
+
+template <class R>
+class Pool
+{
+public:
+
+ typedef std::function<ref<R>()> Factory;
+
+private:
+
+ Factory factory;
+
+ struct State
+ {
+ unsigned int count = 0;
+ std::list<ref<R>> idle;
+ };
+
+ Sync<State> state;
+
+public:
+
+ Pool(const Factory & factory = []() { return make_ref<R>(); })
+ : factory(factory)
+ { }
+
+ class Handle
+ {
+ private:
+ Pool & pool;
+ ref<R> r;
+
+ friend Pool;
+
+ Handle(Pool & pool, std::shared_ptr<R> r) : pool(pool), r(r) { }
+
+ public:
+ Handle(Handle && h) : pool(h.pool), r(h.r) { abort(); }
+
+ Handle(const Handle & l) = delete;
+
+ ~Handle()
+ {
+ auto state_(pool.state.lock());
+ state_->idle.push_back(r);
+ }
+
+ R * operator -> () { return &*r; }
+ R & operator * () { return *r; }
+ };
+
+ Handle get()
+ {
+ {
+ auto state_(state.lock());
+ if (!state_->idle.empty()) {
+ auto p = state_->idle.back();
+ state_->idle.pop_back();
+ return Handle(*this, p);
+ }
+ state_->count++;
+ }
+ /* Note: we don't hold the lock while creating a new instance,
+ because creation might take a long time. */
+ return Handle(*this, factory());
+ }
+
+ unsigned int count()
+ {
+ auto state_(state.lock());
+ return state_->count;
+ }
+};
+
+}
diff --git a/src/libutil/sync.hh b/src/libutil/sync.hh
new file mode 100644
index 000000000..3abffa7c7
--- /dev/null
+++ b/src/libutil/sync.hh
@@ -0,0 +1,78 @@
+#pragma once
+
+#include <mutex>
+#include <condition_variable>
+#include <cassert>
+
+namespace nix {
+
+/* This template class ensures synchronized access to a value of type
+ T. It is used as follows:
+
+ struct Data { int x; ... };
+
+ Sync<Data> data;
+
+ {
+ auto data_(data.lock());
+ data_->x = 123;
+ }
+
+ Here, "data" is automatically unlocked when "data_" goes out of
+ scope.
+*/
+
+template<class T>
+class Sync
+{
+private:
+ std::mutex mutex;
+ T data;
+
+public:
+
+ Sync() { }
+ Sync(const T & data) : data(data) { }
+
+ class Lock
+ {
+ private:
+ Sync * s;
+ friend Sync;
+ Lock(Sync * s) : s(s) { s->mutex.lock(); }
+ public:
+ Lock(Lock && l) : s(l.s) { l.s = 0; }
+ Lock(const Lock & l) = delete;
+ ~Lock() { if (s) s->mutex.unlock(); }
+ T * operator -> () { return &s->data; }
+ T & operator * () { return s->data; }
+
+ /* FIXME: performance impact of condition_variable_any? */
+ void wait(std::condition_variable_any & cv)
+ {
+ assert(s);
+ cv.wait(s->mutex);
+ }
+
+ template<class Rep, class Period, class Predicate>
+ bool wait_for(std::condition_variable_any & cv,
+ const std::chrono::duration<Rep, Period> & duration,
+ Predicate pred)
+ {
+ assert(s);
+ return cv.wait_for(s->mutex, duration, pred);
+ }
+
+ template<class Clock, class Duration>
+ std::cv_status wait_until(std::condition_variable_any & cv,
+ const std::chrono::time_point<Clock, Duration> & duration)
+ {
+ assert(s);
+ return cv.wait_until(s->mutex, duration);
+ }
+ };
+
+ Lock lock() { return Lock(this); }
+};
+
+}