aboutsummaryrefslogtreecommitdiff
path: root/src/nix-worker/nix-worker.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/nix-worker/nix-worker.cc')
-rw-r--r--src/nix-worker/nix-worker.cc86
1 files changed, 49 insertions, 37 deletions
diff --git a/src/nix-worker/nix-worker.cc b/src/nix-worker/nix-worker.cc
index 3587bd7fd..68567f341 100644
--- a/src/nix-worker/nix-worker.cc
+++ b/src/nix-worker/nix-worker.cc
@@ -56,7 +56,8 @@ static void tunnelStderr(const unsigned char * buf, size_t count)
if (canSendStderr && myPid == getpid()) {
try {
writeInt(STDERR_NEXT, to);
- writeString(string((char *) buf, count), to);
+ writeString(buf, count, to);
+ to.flush();
} catch (...) {
/* Write failed; that means that the other side is
gone. */
@@ -200,26 +201,20 @@ static void stopWork(bool success = true, const string & msg = "", unsigned int
struct TunnelSink : Sink
{
Sink & to;
- TunnelSink(Sink & to) : to(to)
- {
- }
- virtual void operator ()
- (const unsigned char * data, unsigned int len)
+ TunnelSink(Sink & to) : to(to) { }
+ virtual void operator () (const unsigned char * data, size_t len)
{
writeInt(STDERR_WRITE, to);
- writeString(string((const char *) data, len), to);
+ writeString(data, len, to);
}
};
-struct TunnelSource : Source
+struct TunnelSource : BufferedSource
{
Source & from;
- TunnelSource(Source & from) : from(from)
- {
- }
- virtual void operator ()
- (unsigned char * data, unsigned int len)
+ TunnelSource(Source & from) : from(from) { }
+ size_t readUnbuffered(unsigned char * data, size_t len)
{
/* Careful: we're going to receive data from the client now,
so we have to disable the SIGPOLL handler. */
@@ -228,11 +223,12 @@ struct TunnelSource : Source
writeInt(STDERR_READ, to);
writeInt(len, to);
- string s = readString(from);
- if (s.size() != len) throw Error("not enough data");
- memcpy(data, (const unsigned char *) s.c_str(), len);
+ to.flush();
+ size_t n = readString(data, len, from);
startWork();
+ if (n == 0) throw EndOfFile("unexpected end-of-file");
+ return n;
}
};
@@ -241,11 +237,14 @@ struct TunnelSource : Source
the contents of the file to `s'. Otherwise barf. */
struct RetrieveRegularNARSink : ParseSink
{
+ bool regular;
string s;
+ RetrieveRegularNARSink() : regular(true) { }
+
void createDirectory(const Path & path)
{
- throw Error("regular file expected");
+ regular = false;
}
void receiveContents(unsigned char * data, unsigned int len)
@@ -255,7 +254,7 @@ struct RetrieveRegularNARSink : ParseSink
void createSymlink(const Path & path, const string & target)
{
- throw Error("regular file expected");
+ regular = false;
}
};
@@ -266,10 +265,11 @@ struct SavingSourceAdapter : Source
Source & orig;
string s;
SavingSourceAdapter(Source & orig) : orig(orig) { }
- void operator () (unsigned char * data, unsigned int len)
+ size_t read(unsigned char * data, size_t len)
{
- orig(data, len);
- s.append((const char *) data, len);
+ size_t n = orig.read(data, len);
+ s.append((const char *) data, n);
+ return n;
}
};
@@ -327,7 +327,7 @@ static void performOp(unsigned int clientVersion,
store->queryReferrers(path, paths);
else paths = store->queryDerivationOutputs(path);
stopWork();
- writeStringSet(paths, to);
+ writeStrings(paths, to);
break;
}
@@ -371,11 +371,11 @@ static void performOp(unsigned int clientVersion,
addToStoreFromDump(). */
ParseSink sink; /* null sink; just parse the NAR */
parseDump(sink, savedNAR);
- } else {
+ } else
parseDump(savedRegular, from);
- }
startWork();
+ if (!savedRegular.regular) throw Error("regular file expected");
Path path = dynamic_cast<LocalStore *>(store.get())
->addToStoreFromDump(recursive ? savedNAR.s : savedRegular.s, baseName, recursive, hashAlgo);
stopWork();
@@ -387,7 +387,7 @@ static void performOp(unsigned int clientVersion,
case wopAddTextToStore: {
string suffix = readString(from);
string s = readString(from);
- PathSet refs = readStorePaths(from);
+ PathSet refs = readStorePaths<PathSet>(from);
startWork();
Path path = store->addTextToStore(suffix, s, refs);
stopWork();
@@ -406,17 +406,17 @@ static void performOp(unsigned int clientVersion,
break;
}
- case wopImportPath: {
+ case wopImportPaths: {
startWork();
TunnelSource source(from);
- Path path = store->importPath(true, source);
+ Paths paths = store->importPaths(true, source);
stopWork();
- writeString(path, to);
+ writeStrings(paths, to);
break;
}
case wopBuildDerivations: {
- PathSet drvs = readStorePaths(from);
+ PathSet drvs = readStorePaths<PathSet>(from);
startWork();
store->buildDerivations(drvs);
stopWork();
@@ -474,7 +474,7 @@ static void performOp(unsigned int clientVersion,
case wopCollectGarbage: {
GCOptions options;
options.action = (GCOptions::GCAction) readInt(from);
- options.pathsToDelete = readStorePaths(from);
+ options.pathsToDelete = readStorePaths<PathSet>(from);
options.ignoreLiveness = readInt(from);
options.maxFreed = readLongLong(from);
options.maxLinks = readInt(from);
@@ -492,7 +492,7 @@ static void performOp(unsigned int clientVersion,
store->collectGarbage(options, results);
stopWork();
- writeStringSet(results.paths, to);
+ writeStrings(results.paths, to);
writeLongLong(results.bytesFreed, to);
writeLongLong(results.blocksFreed, to);
@@ -530,7 +530,7 @@ static void performOp(unsigned int clientVersion,
writeInt(res ? 1 : 0, to);
if (res) {
writeString(info.deriver, to);
- writeStringSet(info.references, to);
+ writeStrings(info.references, to);
writeLongLong(info.downloadSize, to);
if (GET_PROTOCOL_MINOR(clientVersion) >= 7)
writeLongLong(info.narSize, to);
@@ -542,7 +542,7 @@ static void performOp(unsigned int clientVersion,
startWork();
PathSet paths = store->queryValidPaths();
stopWork();
- writeStringSet(paths, to);
+ writeStrings(paths, to);
break;
}
@@ -550,12 +550,12 @@ static void performOp(unsigned int clientVersion,
startWork();
PathSet paths = store->queryFailedPaths();
stopWork();
- writeStringSet(paths, to);
+ writeStrings(paths, to);
break;
}
case wopClearFailedPaths: {
- PathSet paths = readStringSet(from);
+ PathSet paths = readStrings<PathSet>(from);
startWork();
store->clearFailedPaths(paths);
stopWork();
@@ -570,7 +570,7 @@ static void performOp(unsigned int clientVersion,
stopWork();
writeString(info.deriver, to);
writeString(printHash(info.hash), to);
- writeStringSet(info.references, to);
+ writeStrings(info.references, to);
writeInt(info.registrationTime, to);
writeLongLong(info.narSize, to);
break;
@@ -603,8 +603,8 @@ static void processConnection()
unsigned int magic = readInt(from);
if (magic != WORKER_MAGIC_1) throw Error("protocol mismatch");
writeInt(WORKER_MAGIC_2, to);
-
writeInt(PROTOCOL_VERSION, to);
+ to.flush();
unsigned int clientVersion = readInt(from);
/* Send startup error messages to the client. */
@@ -626,9 +626,11 @@ static void processConnection()
store = boost::shared_ptr<StoreAPI>(new LocalStore());
stopWork();
+ to.flush();
} catch (Error & e) {
stopWork(false, e.msg());
+ to.flush();
return;
}
@@ -648,9 +650,19 @@ static void processConnection()
try {
performOp(clientVersion, from, to, op);
} catch (Error & e) {
+ /* If we're not in a state were we can send replies, then
+ something went wrong processing the input of the
+ client. This can happen especially if I/O errors occur
+ during addTextToStore() / importPath(). If that
+ happens, just send the error message and exit. */
+ bool errorAllowed = canSendStderr;
+ if (!errorAllowed) printMsg(lvlError, format("error processing client input: %1%") % e.msg());
stopWork(false, e.msg(), GET_PROTOCOL_MINOR(clientVersion) >= 8 ? e.status : 0);
+ if (!errorAllowed) break;
}
+ to.flush();
+
assert(!canSendStderr);
};