aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEelco Dolstra <e.dolstra@tudelft.nl>2006-12-03 02:08:13 +0000
committerEelco Dolstra <e.dolstra@tudelft.nl>2006-12-03 02:08:13 +0000
commit7951c3c5460324c652d42f5f92bcae44e0a0b9c7 (patch)
treeda8595f52e710af3313a177cc73a1a1b0cfffe87 /src
parent714fa24cfb5afeb144549e0cc4808cc2a1c459cf (diff)
* Some hackery to propagate the worker's stderr and exceptions to the
client.
Diffstat (limited to 'src')
-rw-r--r--src/libstore/build.cc6
-rw-r--r--src/libstore/remote-store.cc39
-rw-r--r--src/libstore/remote-store.hh2
-rw-r--r--src/libstore/worker-protocol.hh5
-rw-r--r--src/libutil/util.cc11
-rw-r--r--src/libutil/util.hh2
-rw-r--r--src/nix-worker/main.cc245
7 files changed, 206 insertions, 104 deletions
diff --git a/src/libstore/build.cc b/src/libstore/build.cc
index 71560b2d0..d8b90252b 100644
--- a/src/libstore/build.cc
+++ b/src/libstore/build.cc
@@ -872,7 +872,7 @@ static void drain(int fd)
if (errno != EINTR)
throw SysError("draining");
} else if (rd == 0) break;
- else writeFull(STDERR_FILENO, buffer, rd);
+ else writeToStderr(buffer, rd);
}
}
@@ -1610,7 +1610,7 @@ void DerivationGoal::handleChildOutput(int fd, const string & data)
{
if (fd == logPipe.readSide) {
if (verbosity >= buildVerbosity)
- writeFull(STDERR_FILENO, (unsigned char *) data.c_str(), data.size());
+ writeToStderr((unsigned char *) data.c_str(), data.size());
writeFull(fdLogFile, (unsigned char *) data.c_str(), data.size());
}
@@ -1923,7 +1923,7 @@ void SubstitutionGoal::handleChildOutput(int fd, const string & data)
{
assert(fd == logPipe.readSide);
if (verbosity >= buildVerbosity)
- writeFull(STDERR_FILENO, (unsigned char *) data.c_str(), data.size());
+ writeToStderr((unsigned char *) data.c_str(), data.size());
/* Don't write substitution output to a log file for now. We
probably should, though. */
}
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc
index 9b9d74f7e..87547ce91 100644
--- a/src/libstore/remote-store.cc
+++ b/src/libstore/remote-store.cc
@@ -4,6 +4,10 @@
#include "worker-protocol.hh"
#include "archive.hh"
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
#include <iostream>
#include <unistd.h>
@@ -38,9 +42,15 @@ RemoteStore::RemoteStore()
if (dup2(toChild.readSide, STDIN_FILENO) == -1)
throw SysError("dupping read side");
- execlp(worker.c_str(), worker.c_str(),
- "--slave", NULL);
+ int fdDebug = open("/tmp/worker-log", O_WRONLY | O_CREAT | O_TRUNC, 0644);
+ assert(fdDebug != -1);
+ if (dup2(fdDebug, STDERR_FILENO) == -1)
+ throw SysError("dupping stderr");
+ close(fdDebug);
+ execlp(worker.c_str(), worker.c_str(),
+ "-vvv", "--slave", NULL);
+
throw SysError(format("executing `%1%'") % worker);
} catch (std::exception & e) {
@@ -66,9 +76,13 @@ RemoteStore::RemoteStore()
RemoteStore::~RemoteStore()
{
- writeInt(wopQuit, to);
- readInt(from);
- child.wait(true);
+ try {
+ fromChild.readSide.close();
+ toChild.writeSide.close();
+ child.wait(true);
+ } catch (Error & e) {
+ printMsg(lvlError, format("error (ignored): %1%") % e.msg());
+ }
}
@@ -158,6 +172,7 @@ void RemoteStore::buildDerivations(const PathSet & drvPaths)
{
writeInt(wopBuildDerivations, to);
writeStringSet(drvPaths, to);
+ processStderr();
readInt(from);
}
@@ -185,4 +200,18 @@ void RemoteStore::syncWithGC()
}
+void RemoteStore::processStderr()
+{
+ unsigned int msg;
+ while ((msg = readInt(from)) == STDERR_NEXT) {
+ string s = readString(from);
+ writeToStderr((unsigned char *) s.c_str(), s.size());
+ }
+ if (msg == STDERR_ERROR)
+ throw Error(readString(from));
+ else if (msg != STDERR_LAST)
+ throw Error("protocol error processing standard error");
+}
+
+
}
diff --git a/src/libstore/remote-store.hh b/src/libstore/remote-store.hh
index b11191c09..05d2a21ec 100644
--- a/src/libstore/remote-store.hh
+++ b/src/libstore/remote-store.hh
@@ -57,6 +57,8 @@ private:
FdSink to;
FdSource from;
Pid child;
+
+ void processStderr();
};
diff --git a/src/libstore/worker-protocol.hh b/src/libstore/worker-protocol.hh
index 2700b6719..284477483 100644
--- a/src/libstore/worker-protocol.hh
+++ b/src/libstore/worker-protocol.hh
@@ -23,4 +23,9 @@ typedef enum {
} WorkerOp;
+#define STDERR_NEXT 0x6f6c6d67
+#define STDERR_LAST 0x616c7473
+#define STDERR_ERROR 0x63787470
+
+
#endif /* !__WORKER_PROTOCOL_H */
diff --git a/src/libutil/util.cc b/src/libutil/util.cc
index 6d96310da..4460d95b8 100644
--- a/src/libutil/util.cc
+++ b/src/libutil/util.cc
@@ -437,7 +437,7 @@ void printMsg_(Verbosity level, const format & f)
else if (logType == ltEscapes && level != lvlInfo)
prefix = "\033[" + escVerbosity(level) + "s";
string s = (format("%1%%2%\n") % prefix % f.str()).str();
- writeFull(STDERR_FILENO, (const unsigned char *) s.c_str(), s.size());
+ writeToStderr((const unsigned char *) s.c_str(), s.size());
}
@@ -450,6 +450,15 @@ void warnOnce(bool & haveWarned, const format & f)
}
+static void defaultWriteToStderr(const unsigned char * buf, size_t count)
+{
+ writeFull(STDERR_FILENO, buf, count);
+}
+
+
+void (*writeToStderr) (const unsigned char * buf, size_t count) = defaultWriteToStderr;
+
+
void readFull(int fd, unsigned char * buf, size_t count)
{
while (count) {
diff --git a/src/libutil/util.hh b/src/libutil/util.hh
index d49067dfe..0d39ffee9 100644
--- a/src/libutil/util.hh
+++ b/src/libutil/util.hh
@@ -131,6 +131,8 @@ void printMsg_(Verbosity level, const format & f);
void warnOnce(bool & haveWarned, const format & f);
+extern void (*writeToStderr) (const unsigned char * buf, size_t count);
+
/* Wrappers arount read()/write() that read/write exactly the
requested number of bytes. */
diff --git a/src/nix-worker/main.cc b/src/nix-worker/main.cc
index cf550895e..17e892c64 100644
--- a/src/nix-worker/main.cc
+++ b/src/nix-worker/main.cc
@@ -10,7 +10,7 @@
using namespace nix;
-Path readStorePath(Source & from)
+static Path readStorePath(Source & from)
{
Path path = readString(from);
assertStorePath(path);
@@ -18,7 +18,7 @@ Path readStorePath(Source & from)
}
-PathSet readStorePaths(Source & from)
+static PathSet readStorePaths(Source & from)
{
PathSet paths = readStringSet(from);
for (PathSet::iterator i = paths.begin(); i != paths.end(); ++i)
@@ -27,123 +27,178 @@ PathSet readStorePaths(Source & from)
}
-void processConnection(Source & from, Sink & to)
+static Sink * _to; /* !!! should make writeToStderr an object */
+bool canSendStderr;
+
+
+static void tunnelStderr(const unsigned char * buf, size_t count)
{
- store = boost::shared_ptr<StoreAPI>(new LocalStore(true));
+ writeFull(STDERR_FILENO, buf, count);
+ if (canSendStderr) {
+ try {
+ writeInt(STDERR_NEXT, *_to);
+ writeString(string((char *) buf, count), *_to);
+ } catch (...) {
+ /* Write failed; that means that the other side is
+ gone. */
+ canSendStderr = false;
+ throw;
+ }
+ }
+}
- unsigned int magic = readInt(from);
- if (magic != WORKER_MAGIC_1) throw Error("protocol mismatch");
- writeInt(WORKER_MAGIC_2, to);
+/* startWork() means that we're starting an operation for which we
+ want to send out stderr to the client. */
+static void startWork()
+{
+ canSendStderr = true;
+}
- debug("greeting exchanged");
- bool quit = false;
+/* stopWork() means that we're done; stop sending stderr to the
+ client. */
+static void stopWork()
+{
+ canSendStderr = false;
+ writeInt(STDERR_LAST, *_to);
+}
- unsigned int opCount = 0;
-
- do {
+
+static void performOp(Source & from, Sink & to, unsigned int op)
+{
+ switch (op) {
+
+#if 0
+ case wopQuit: {
+ /* Close the database. */
+ store.reset((StoreAPI *) 0);
+ writeInt(1, to);
+ break;
+ }
+#endif
+
+ case wopIsValidPath: {
+ Path path = readStorePath(from);
+ writeInt(store->isValidPath(path), to);
+ break;
+ }
+
+ case wopHasSubstitutes: {
+ Path path = readStorePath(from);
+ writeInt(store->hasSubstitutes(path), to);
+ break;
+ }
+
+ case wopQueryPathHash: {
+ Path path = readStorePath(from);
+ writeString(printHash(store->queryPathHash(path)), to);
+ break;
+ }
+
+ case wopQueryReferences:
+ case wopQueryReferrers: {
+ Path path = readStorePath(from);
+ PathSet paths;
+ if (op == wopQueryReferences)
+ store->queryReferences(path, paths);
+ else
+ store->queryReferrers(path, paths);
+ writeStringSet(paths, to);
+ break;
+ }
+
+ case wopAddToStore: {
+ /* !!! uberquick hack */
+ string baseName = readString(from);
+ bool fixed = readInt(from) == 1;
+ bool recursive = readInt(from) == 1;
+ string hashAlgo = readString(from);
- WorkerOp op = (WorkerOp) readInt(from);
+ Path tmp = createTempDir();
+ Path tmp2 = tmp + "/" + baseName;
+ restorePath(tmp2, from);
- opCount++;
+ writeString(store->addToStore(tmp2, fixed, recursive, hashAlgo), to);
+
+ deletePath(tmp);
+ break;
+ }
- switch (op) {
+ case wopAddTextToStore: {
+ string suffix = readString(from);
+ string s = readString(from);
+ PathSet refs = readStorePaths(from);
+ writeString(store->addTextToStore(suffix, s, refs), to);
+ break;
+ }
- case wopQuit: {
- /* Close the database. */
- store.reset((StoreAPI *) 0);
- writeInt(1, to);
- quit = true;
- break;
- }
+ case wopBuildDerivations: {
+ PathSet drvs = readStorePaths(from);
+ startWork();
+ store->buildDerivations(drvs);
+ stopWork();
+ writeInt(1, to);
+ break;
+ }
- case wopIsValidPath: {
- Path path = readStorePath(from);
- writeInt(store->isValidPath(path), to);
- break;
- }
+ case wopEnsurePath: {
+ Path path = readStorePath(from);
+ store->ensurePath(path);
+ writeInt(1, to);
+ break;
+ }
- case wopHasSubstitutes: {
- Path path = readStorePath(from);
- writeInt(store->hasSubstitutes(path), to);
- break;
- }
+ case wopAddTempRoot: {
+ Path path = readStorePath(from);
+ store->addTempRoot(path);
+ writeInt(1, to);
+ break;
+ }
- case wopQueryPathHash: {
- Path path = readStorePath(from);
- writeString(printHash(store->queryPathHash(path)), to);
- break;
- }
+ case wopSyncWithGC: {
+ store->syncWithGC();
+ writeInt(1, to);
+ break;
+ }
- case wopQueryReferences:
- case wopQueryReferrers: {
- Path path = readStorePath(from);
- PathSet paths;
- if (op == wopQueryReferences)
- store->queryReferences(path, paths);
- else
- store->queryReferrers(path, paths);
- writeStringSet(paths, to);
- break;
- }
+ default:
+ throw Error(format("invalid operation %1%") % op);
+ }
+}
- case wopAddToStore: {
- /* !!! uberquick hack */
- string baseName = readString(from);
- bool fixed = readInt(from) == 1;
- bool recursive = readInt(from) == 1;
- string hashAlgo = readString(from);
- Path tmp = createTempDir();
- Path tmp2 = tmp + "/" + baseName;
- restorePath(tmp2, from);
+static void processConnection(Source & from, Sink & to)
+{
+ store = boost::shared_ptr<StoreAPI>(new LocalStore(true));
- writeString(store->addToStore(tmp2, fixed, recursive, hashAlgo), to);
-
- deletePath(tmp);
- break;
- }
+ unsigned int magic = readInt(from);
+ if (magic != WORKER_MAGIC_1) throw Error("protocol mismatch");
- case wopAddTextToStore: {
- string suffix = readString(from);
- string s = readString(from);
- PathSet refs = readStorePaths(from);
- writeString(store->addTextToStore(suffix, s, refs), to);
- break;
- }
+ writeInt(WORKER_MAGIC_2, to);
- case wopBuildDerivations: {
- PathSet drvs = readStorePaths(from);
- store->buildDerivations(drvs);
- writeInt(1, to);
- break;
- }
+ debug("greeting exchanged");
- case wopEnsurePath: {
- Path path = readStorePath(from);
- store->ensurePath(path);
- writeInt(1, to);
- break;
- }
+ _to = &to;
+ canSendStderr = false;
+ writeToStderr = tunnelStderr;
- case wopAddTempRoot: {
- Path path = readStorePath(from);
- store->addTempRoot(path);
- writeInt(1, to);
- break;
- }
+ bool quit = false;
- case wopSyncWithGC: {
- store->syncWithGC();
- writeInt(1, to);
- break;
- }
+ unsigned int opCount = 0;
+
+ do {
+ WorkerOp op = (WorkerOp) readInt(from);
- default:
- throw Error(format("invalid operation %1%") % op);
+ opCount++;
+
+ try {
+ performOp(from, to, op);
+ } catch (Error & e) {
+ writeInt(STDERR_ERROR, *_to);
+ writeString(e.msg(), to);
}
-
+
} while (!quit);
printMsg(lvlError, format("%1% worker operations") % opCount);