aboutsummaryrefslogtreecommitdiff
path: root/src/libstore/remote-store.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstore/remote-store.cc')
-rw-r--r--src/libstore/remote-store.cc465
1 files changed, 232 insertions, 233 deletions
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc
index ab2ebb9ae..2f540c640 100644
--- a/src/libstore/remote-store.cc
+++ b/src/libstore/remote-store.cc
@@ -6,6 +6,7 @@
#include "affinity.hh"
#include "globals.hh"
#include "derivations.hh"
+#include "pool.hh"
#include <sys/types.h>
#include <sys/stat.h>
@@ -13,9 +14,8 @@
#include <sys/un.h>
#include <errno.h>
#include <fcntl.h>
-
-#include <iostream>
#include <unistd.h>
+
#include <cstring>
namespace nix {
@@ -39,62 +39,25 @@ template<class T> T readStorePaths(Source & from)
template PathSet readStorePaths(Source & from);
-RemoteStore::RemoteStore()
+RemoteStore::RemoteStore(size_t maxConnections)
+ : connections(make_ref<Pool<Connection>>(
+ maxConnections,
+ [this]() { return openConnection(); },
+ [](const ref<Connection> & r) { return r->to.good() && r->from.good(); }
+ ))
{
- initialised = false;
}
-void RemoteStore::openConnection(bool reserveSpace)
+ref<RemoteStore::Connection> RemoteStore::openConnection(bool reserveSpace)
{
- if (initialised) return;
- initialised = true;
+ auto conn = make_ref<Connection>();
/* Connect to a daemon that does the privileged work for us. */
- connectToDaemon();
-
- from.fd = fdSocket;
- to.fd = fdSocket;
-
- /* Send the magic greeting, check for the reply. */
- try {
- to << WORKER_MAGIC_1;
- to.flush();
- unsigned int magic = readInt(from);
- if (magic != WORKER_MAGIC_2) throw Error("protocol mismatch");
-
- daemonVersion = readInt(from);
- if (GET_PROTOCOL_MAJOR(daemonVersion) != GET_PROTOCOL_MAJOR(PROTOCOL_VERSION))
- throw Error("Nix daemon protocol version not supported");
- to << PROTOCOL_VERSION;
-
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 14) {
- int cpu = settings.lockCPU ? lockToCurrentCPU() : -1;
- if (cpu != -1)
- to << 1 << cpu;
- else
- to << 0;
- }
-
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 11)
- to << reserveSpace;
-
- processStderr();
- }
- catch (Error & e) {
- throw Error(format("cannot start daemon worker: %1%") % e.msg());
- }
-
- setOptions();
-}
-
-
-void RemoteStore::connectToDaemon()
-{
- fdSocket = socket(PF_UNIX, SOCK_STREAM, 0);
- if (fdSocket == -1)
+ conn->fd = socket(PF_UNIX, SOCK_STREAM, 0);
+ if (conn->fd == -1)
throw SysError("cannot create Unix domain socket");
- closeOnExec(fdSocket);
+ closeOnExec(conn->fd);
string socketPath = settings.nixDaemonSocketFile;
@@ -111,111 +74,135 @@ void RemoteStore::connectToDaemon()
addr.sun_family = AF_UNIX;
if (socketPathRel.size() >= sizeof(addr.sun_path))
throw Error(format("socket path ‘%1%’ is too long") % socketPathRel);
- using namespace std;
strcpy(addr.sun_path, socketPathRel.c_str());
- if (connect(fdSocket, (struct sockaddr *) &addr, sizeof(addr)) == -1)
+ if (connect(conn->fd, (struct sockaddr *) &addr, sizeof(addr)) == -1)
throw SysError(format("cannot connect to daemon at ‘%1%’") % socketPath);
if (fchdir(fdPrevDir) == -1)
throw SysError("couldn't change back to previous directory");
-}
+ conn->from.fd = conn->fd;
+ conn->to.fd = conn->fd;
-RemoteStore::~RemoteStore()
-{
+ /* Send the magic greeting, check for the reply. */
try {
- to.flush();
- fdSocket.close();
- } catch (...) {
- ignoreException();
+ conn->to << WORKER_MAGIC_1;
+ conn->to.flush();
+ unsigned int magic = readInt(conn->from);
+ if (magic != WORKER_MAGIC_2) throw Error("protocol mismatch");
+
+ conn->daemonVersion = readInt(conn->from);
+ if (GET_PROTOCOL_MAJOR(conn->daemonVersion) != GET_PROTOCOL_MAJOR(PROTOCOL_VERSION))
+ throw Error("Nix daemon protocol version not supported");
+ conn->to << PROTOCOL_VERSION;
+
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 14) {
+ int cpu = settings.lockCPU ? lockToCurrentCPU() : -1;
+ if (cpu != -1)
+ conn->to << 1 << cpu;
+ else
+ conn->to << 0;
+ }
+
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 11)
+ conn->to << reserveSpace;
+
+ conn->processStderr();
}
+ catch (Error & e) {
+ throw Error(format("cannot start daemon worker: %1%") % e.msg());
+ }
+
+ setOptions(conn);
+
+ return conn;
}
-void RemoteStore::setOptions()
+void RemoteStore::setOptions(ref<Connection> conn)
{
- to << wopSetOptions
+ conn->to << wopSetOptions
<< settings.keepFailed
<< settings.keepGoing
<< settings.tryFallback
<< verbosity
<< settings.maxBuildJobs
<< settings.maxSilentTime;
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 2)
- to << settings.useBuildHook;
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 4)
- to << settings.buildVerbosity
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 2)
+ conn->to << settings.useBuildHook;
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 4)
+ conn->to << settings.buildVerbosity
<< logType
<< settings.printBuildTrace;
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 6)
- to << settings.buildCores;
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 10)
- to << settings.useSubstitutes;
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 6)
+ conn->to << settings.buildCores;
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 10)
+ conn->to << settings.useSubstitutes;
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 12) {
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 12) {
Settings::SettingsMap overrides = settings.getOverrides();
if (overrides["ssh-auth-sock"] == "")
overrides["ssh-auth-sock"] = getEnv("SSH_AUTH_SOCK");
- to << overrides.size();
+ conn->to << overrides.size();
for (auto & i : overrides)
- to << i.first << i.second;
+ conn->to << i.first << i.second;
}
- processStderr();
+ conn->processStderr();
}
bool RemoteStore::isValidPath(const Path & path)
{
- openConnection();
- to << wopIsValidPath << path;
- processStderr();
- unsigned int reply = readInt(from);
+ auto conn(connections->get());
+ conn->to << wopIsValidPath << path;
+ conn->processStderr();
+ unsigned int reply = readInt(conn->from);
return reply != 0;
}
PathSet RemoteStore::queryValidPaths(const PathSet & paths)
{
- openConnection();
- if (GET_PROTOCOL_MINOR(daemonVersion) < 12) {
+ auto conn(connections->get());
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
PathSet res;
for (auto & i : paths)
if (isValidPath(i)) res.insert(i);
return res;
} else {
- to << wopQueryValidPaths << paths;
- processStderr();
- return readStorePaths<PathSet>(from);
+ conn->to << wopQueryValidPaths << paths;
+ conn->processStderr();
+ return readStorePaths<PathSet>(conn->from);
}
}
PathSet RemoteStore::queryAllValidPaths()
{
- openConnection();
- to << wopQueryAllValidPaths;
- processStderr();
- return readStorePaths<PathSet>(from);
+ auto conn(connections->get());
+ conn->to << wopQueryAllValidPaths;
+ conn->processStderr();
+ return readStorePaths<PathSet>(conn->from);
}
PathSet RemoteStore::querySubstitutablePaths(const PathSet & paths)
{
- openConnection();
- if (GET_PROTOCOL_MINOR(daemonVersion) < 12) {
+ auto conn(connections->get());
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
PathSet res;
for (auto & i : paths) {
- to << wopHasSubstitutes << i;
- processStderr();
- if (readInt(from)) res.insert(i);
+ conn->to << wopHasSubstitutes << i;
+ conn->processStderr();
+ if (readInt(conn->from)) res.insert(i);
}
return res;
} else {
- to << wopQuerySubstitutablePaths << paths;
- processStderr();
- return readStorePaths<PathSet>(from);
+ conn->to << wopQuerySubstitutablePaths << paths;
+ conn->processStderr();
+ return readStorePaths<PathSet>(conn->from);
}
}
@@ -225,39 +212,39 @@ void RemoteStore::querySubstitutablePathInfos(const PathSet & paths,
{
if (paths.empty()) return;
- openConnection();
+ auto conn(connections->get());
- if (GET_PROTOCOL_MINOR(daemonVersion) < 3) return;
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 3) return;
- if (GET_PROTOCOL_MINOR(daemonVersion) < 12) {
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
for (auto & i : paths) {
SubstitutablePathInfo info;
- to << wopQuerySubstitutablePathInfo << i;
- processStderr();
- unsigned int reply = readInt(from);
+ conn->to << wopQuerySubstitutablePathInfo << i;
+ conn->processStderr();
+ unsigned int reply = readInt(conn->from);
if (reply == 0) continue;
- info.deriver = readString(from);
+ info.deriver = readString(conn->from);
if (info.deriver != "") assertStorePath(info.deriver);
- info.references = readStorePaths<PathSet>(from);
- info.downloadSize = readLongLong(from);
- info.narSize = GET_PROTOCOL_MINOR(daemonVersion) >= 7 ? readLongLong(from) : 0;
+ info.references = readStorePaths<PathSet>(conn->from);
+ info.downloadSize = readLongLong(conn->from);
+ info.narSize = GET_PROTOCOL_MINOR(conn->daemonVersion) >= 7 ? readLongLong(conn->from) : 0;
infos[i] = info;
}
} else {
- to << wopQuerySubstitutablePathInfos << paths;
- processStderr();
- unsigned int count = readInt(from);
+ conn->to << wopQuerySubstitutablePathInfos << paths;
+ conn->processStderr();
+ unsigned int count = readInt(conn->from);
for (unsigned int n = 0; n < count; n++) {
- Path path = readStorePath(from);
+ Path path = readStorePath(conn->from);
SubstitutablePathInfo & info(infos[path]);
- info.deriver = readString(from);
+ info.deriver = readString(conn->from);
if (info.deriver != "") assertStorePath(info.deriver);
- info.references = readStorePaths<PathSet>(from);
- info.downloadSize = readLongLong(from);
- info.narSize = readLongLong(from);
+ info.references = readStorePaths<PathSet>(conn->from);
+ info.downloadSize = readLongLong(conn->from);
+ info.narSize = readLongLong(conn->from);
}
}
@@ -266,27 +253,27 @@ void RemoteStore::querySubstitutablePathInfos(const PathSet & paths,
ValidPathInfo RemoteStore::queryPathInfo(const Path & path)
{
- openConnection();
- to << wopQueryPathInfo << path;
- processStderr();
+ auto conn(connections->get());
+ conn->to << wopQueryPathInfo << path;
+ conn->processStderr();
ValidPathInfo info;
info.path = path;
- info.deriver = readString(from);
+ info.deriver = readString(conn->from);
if (info.deriver != "") assertStorePath(info.deriver);
- info.narHash = parseHash(htSHA256, readString(from));
- info.references = readStorePaths<PathSet>(from);
- info.registrationTime = readInt(from);
- info.narSize = readLongLong(from);
+ info.narHash = parseHash(htSHA256, readString(conn->from));
+ info.references = readStorePaths<PathSet>(conn->from);
+ info.registrationTime = readInt(conn->from);
+ info.narSize = readLongLong(conn->from);
return info;
}
Hash RemoteStore::queryPathHash(const Path & path)
{
- openConnection();
- to << wopQueryPathHash << path;
- processStderr();
- string hash = readString(from);
+ auto conn(connections->get());
+ conn->to << wopQueryPathHash << path;
+ conn->processStderr();
+ string hash = readString(conn->from);
return parseHash(htSHA256, hash);
}
@@ -294,10 +281,10 @@ Hash RemoteStore::queryPathHash(const Path & path)
void RemoteStore::queryReferences(const Path & path,
PathSet & references)
{
- openConnection();
- to << wopQueryReferences << path;
- processStderr();
- PathSet references2 = readStorePaths<PathSet>(from);
+ auto conn(connections->get());
+ conn->to << wopQueryReferences << path;
+ conn->processStderr();
+ PathSet references2 = readStorePaths<PathSet>(conn->from);
references.insert(references2.begin(), references2.end());
}
@@ -305,20 +292,20 @@ void RemoteStore::queryReferences(const Path & path,
void RemoteStore::queryReferrers(const Path & path,
PathSet & referrers)
{
- openConnection();
- to << wopQueryReferrers << path;
- processStderr();
- PathSet referrers2 = readStorePaths<PathSet>(from);
+ auto conn(connections->get());
+ conn->to << wopQueryReferrers << path;
+ conn->processStderr();
+ PathSet referrers2 = readStorePaths<PathSet>(conn->from);
referrers.insert(referrers2.begin(), referrers2.end());
}
Path RemoteStore::queryDeriver(const Path & path)
{
- openConnection();
- to << wopQueryDeriver << path;
- processStderr();
- Path drvPath = readString(from);
+ auto conn(connections->get());
+ conn->to << wopQueryDeriver << path;
+ conn->processStderr();
+ Path drvPath = readString(conn->from);
if (drvPath != "") assertStorePath(drvPath);
return drvPath;
}
@@ -326,37 +313,37 @@ Path RemoteStore::queryDeriver(const Path & path)
PathSet RemoteStore::queryValidDerivers(const Path & path)
{
- openConnection();
- to << wopQueryValidDerivers << path;
- processStderr();
- return readStorePaths<PathSet>(from);
+ auto conn(connections->get());
+ conn->to << wopQueryValidDerivers << path;
+ conn->processStderr();
+ return readStorePaths<PathSet>(conn->from);
}
PathSet RemoteStore::queryDerivationOutputs(const Path & path)
{
- openConnection();
- to << wopQueryDerivationOutputs << path;
- processStderr();
- return readStorePaths<PathSet>(from);
+ auto conn(connections->get());
+ conn->to << wopQueryDerivationOutputs << path;
+ conn->processStderr();
+ return readStorePaths<PathSet>(conn->from);
}
PathSet RemoteStore::queryDerivationOutputNames(const Path & path)
{
- openConnection();
- to << wopQueryDerivationOutputNames << path;
- processStderr();
- return readStrings<PathSet>(from);
+ auto conn(connections->get());
+ conn->to << wopQueryDerivationOutputNames << path;
+ conn->processStderr();
+ return readStrings<PathSet>(conn->from);
}
Path RemoteStore::queryPathFromHashPart(const string & hashPart)
{
- openConnection();
- to << wopQueryPathFromHashPart << hashPart;
- processStderr();
- Path path = readString(from);
+ auto conn(connections->get());
+ conn->to << wopQueryPathFromHashPart << hashPart;
+ conn->processStderr();
+ Path path = readString(conn->from);
if (!path.empty()) assertStorePath(path);
return path;
}
@@ -367,32 +354,32 @@ Path RemoteStore::addToStore(const string & name, const Path & _srcPath,
{
if (repair) throw Error("repairing is not supported when building through the Nix daemon");
- openConnection();
+ auto conn(connections->get());
Path srcPath(absPath(_srcPath));
- to << wopAddToStore << name
+ conn->to << wopAddToStore << name
<< ((hashAlgo == htSHA256 && recursive) ? 0 : 1) /* backwards compatibility hack */
<< (recursive ? 1 : 0)
<< printHashType(hashAlgo);
try {
- to.written = 0;
- to.warn = true;
- dumpPath(srcPath, to, filter);
- to.warn = false;
- processStderr();
+ conn->to.written = 0;
+ conn->to.warn = true;
+ dumpPath(srcPath, conn->to, filter);
+ conn->to.warn = false;
+ conn->processStderr();
} catch (SysError & e) {
/* Daemon closed while we were sending the path. Probably OOM
or I/O error. */
if (e.errNo == EPIPE)
try {
- processStderr();
+ conn->processStderr();
} catch (EndOfFile & e) { }
throw;
}
- return readStorePath(from);
+ return readStorePath(conn->from);
}
@@ -401,43 +388,43 @@ Path RemoteStore::addTextToStore(const string & name, const string & s,
{
if (repair) throw Error("repairing is not supported when building through the Nix daemon");
- openConnection();
- to << wopAddTextToStore << name << s << references;
+ auto conn(connections->get());
+ conn->to << wopAddTextToStore << name << s << references;
- processStderr();
- return readStorePath(from);
+ conn->processStderr();
+ return readStorePath(conn->from);
}
void RemoteStore::exportPath(const Path & path, bool sign,
Sink & sink)
{
- openConnection();
- to << wopExportPath << path << (sign ? 1 : 0);
- processStderr(&sink); /* sink receives the actual data */
- readInt(from);
+ auto conn(connections->get());
+ conn->to << wopExportPath << path << (sign ? 1 : 0);
+ conn->processStderr(&sink); /* sink receives the actual data */
+ readInt(conn->from);
}
Paths RemoteStore::importPaths(bool requireSignature, Source & source)
{
- openConnection();
- to << wopImportPaths;
+ auto conn(connections->get());
+ conn->to << wopImportPaths;
/* We ignore requireSignature, since the worker forces it to true
anyway. */
- processStderr(0, &source);
- return readStorePaths<Paths>(from);
+ conn->processStderr(0, &source);
+ return readStorePaths<Paths>(conn->from);
}
void RemoteStore::buildPaths(const PathSet & drvPaths, BuildMode buildMode)
{
- openConnection();
- to << wopBuildPaths;
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 13) {
- to << drvPaths;
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 15)
- to << buildMode;
+ auto conn(connections->get());
+ conn->to << wopBuildPaths;
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 13) {
+ conn->to << drvPaths;
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 15)
+ conn->to << buildMode;
else
/* Old daemons did not take a 'buildMode' parameter, so we
need to validate it here on the client side. */
@@ -449,22 +436,22 @@ void RemoteStore::buildPaths(const PathSet & drvPaths, BuildMode buildMode)
PathSet drvPaths2;
for (auto & i : drvPaths)
drvPaths2.insert(string(i, 0, i.find('!')));
- to << drvPaths2;
+ conn->to << drvPaths2;
}
- processStderr();
- readInt(from);
+ conn->processStderr();
+ readInt(conn->from);
}
BuildResult RemoteStore::buildDerivation(const Path & drvPath, const BasicDerivation & drv,
BuildMode buildMode)
{
- openConnection();
- to << wopBuildDerivation << drvPath << drv << buildMode;
- processStderr();
+ auto conn(connections->get());
+ conn->to << wopBuildDerivation << drvPath << drv << buildMode;
+ conn->processStderr();
BuildResult res;
unsigned int status;
- from >> status >> res.errorMsg;
+ conn->from >> status >> res.errorMsg;
res.status = (BuildResult::Status) status;
return res;
}
@@ -472,50 +459,50 @@ BuildResult RemoteStore::buildDerivation(const Path & drvPath, const BasicDeriva
void RemoteStore::ensurePath(const Path & path)
{
- openConnection();
- to << wopEnsurePath << path;
- processStderr();
- readInt(from);
+ auto conn(connections->get());
+ conn->to << wopEnsurePath << path;
+ conn->processStderr();
+ readInt(conn->from);
}
void RemoteStore::addTempRoot(const Path & path)
{
- openConnection();
- to << wopAddTempRoot << path;
- processStderr();
- readInt(from);
+ auto conn(connections->get());
+ conn->to << wopAddTempRoot << path;
+ conn->processStderr();
+ readInt(conn->from);
}
void RemoteStore::addIndirectRoot(const Path & path)
{
- openConnection();
- to << wopAddIndirectRoot << path;
- processStderr();
- readInt(from);
+ auto conn(connections->get());
+ conn->to << wopAddIndirectRoot << path;
+ conn->processStderr();
+ readInt(conn->from);
}
void RemoteStore::syncWithGC()
{
- openConnection();
- to << wopSyncWithGC;
- processStderr();
- readInt(from);
+ auto conn(connections->get());
+ conn->to << wopSyncWithGC;
+ conn->processStderr();
+ readInt(conn->from);
}
Roots RemoteStore::findRoots()
{
- openConnection();
- to << wopFindRoots;
- processStderr();
- unsigned int count = readInt(from);
+ auto conn(connections->get());
+ conn->to << wopFindRoots;
+ conn->processStderr();
+ unsigned int count = readInt(conn->from);
Roots result;
while (count--) {
- Path link = readString(from);
- Path target = readStorePath(from);
+ Path link = readString(conn->from);
+ Path target = readStorePath(conn->from);
result[link] = target;
}
return result;
@@ -524,56 +511,68 @@ Roots RemoteStore::findRoots()
void RemoteStore::collectGarbage(const GCOptions & options, GCResults & results)
{
- openConnection(false);
+ auto conn(connections->get());
- to << wopCollectGarbage << options.action << options.pathsToDelete << options.ignoreLiveness
+ conn->to << wopCollectGarbage << options.action << options.pathsToDelete << options.ignoreLiveness
<< options.maxFreed << 0;
- if (GET_PROTOCOL_MINOR(daemonVersion) >= 5)
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 5)
/* removed options */
- to << 0 << 0;
+ conn->to << 0 << 0;
- processStderr();
+ conn->processStderr();
- results.paths = readStrings<PathSet>(from);
- results.bytesFreed = readLongLong(from);
- readLongLong(from); // obsolete
+ results.paths = readStrings<PathSet>(conn->from);
+ results.bytesFreed = readLongLong(conn->from);
+ readLongLong(conn->from); // obsolete
}
PathSet RemoteStore::queryFailedPaths()
{
- openConnection();
- to << wopQueryFailedPaths;
- processStderr();
- return readStorePaths<PathSet>(from);
+ auto conn(connections->get());
+ conn->to << wopQueryFailedPaths;
+ conn->processStderr();
+ return readStorePaths<PathSet>(conn->from);
}
void RemoteStore::clearFailedPaths(const PathSet & paths)
{
- openConnection();
- to << wopClearFailedPaths << paths;
- processStderr();
- readInt(from);
+ auto conn(connections->get());
+ conn->to << wopClearFailedPaths << paths;
+ conn->processStderr();
+ readInt(conn->from);
}
void RemoteStore::optimiseStore()
{
- openConnection();
- to << wopOptimiseStore;
- processStderr();
- readInt(from);
+ auto conn(connections->get());
+ conn->to << wopOptimiseStore;
+ conn->processStderr();
+ readInt(conn->from);
}
bool RemoteStore::verifyStore(bool checkContents, bool repair)
{
- openConnection();
- to << wopVerifyStore << checkContents << repair;
- processStderr();
- return readInt(from) != 0;
+ auto conn(connections->get());
+ conn->to << wopVerifyStore << checkContents << repair;
+ conn->processStderr();
+ return readInt(conn->from) != 0;
+}
+
+
+RemoteStore::Connection::~Connection()
+{
+ try {
+ to.flush();
+ fd.close();
+ } catch (...) {
+ ignoreException();
+ }
}
-void RemoteStore::processStderr(Sink * sink, Source * source)
+
+void RemoteStore::Connection::processStderr(Sink * sink, Source * source)
{
to.flush();
unsigned int msg;