diff options
Diffstat (limited to 'src/nix-worker')
-rw-r--r-- | src/nix-worker/nix-worker.cc | 86 |
1 files changed, 49 insertions, 37 deletions
diff --git a/src/nix-worker/nix-worker.cc b/src/nix-worker/nix-worker.cc index 3587bd7fd..68567f341 100644 --- a/src/nix-worker/nix-worker.cc +++ b/src/nix-worker/nix-worker.cc @@ -56,7 +56,8 @@ static void tunnelStderr(const unsigned char * buf, size_t count) if (canSendStderr && myPid == getpid()) { try { writeInt(STDERR_NEXT, to); - writeString(string((char *) buf, count), to); + writeString(buf, count, to); + to.flush(); } catch (...) { /* Write failed; that means that the other side is gone. */ @@ -200,26 +201,20 @@ static void stopWork(bool success = true, const string & msg = "", unsigned int struct TunnelSink : Sink { Sink & to; - TunnelSink(Sink & to) : to(to) - { - } - virtual void operator () - (const unsigned char * data, unsigned int len) + TunnelSink(Sink & to) : to(to) { } + virtual void operator () (const unsigned char * data, size_t len) { writeInt(STDERR_WRITE, to); - writeString(string((const char *) data, len), to); + writeString(data, len, to); } }; -struct TunnelSource : Source +struct TunnelSource : BufferedSource { Source & from; - TunnelSource(Source & from) : from(from) - { - } - virtual void operator () - (unsigned char * data, unsigned int len) + TunnelSource(Source & from) : from(from) { } + size_t readUnbuffered(unsigned char * data, size_t len) { /* Careful: we're going to receive data from the client now, so we have to disable the SIGPOLL handler. */ @@ -228,11 +223,12 @@ struct TunnelSource : Source writeInt(STDERR_READ, to); writeInt(len, to); - string s = readString(from); - if (s.size() != len) throw Error("not enough data"); - memcpy(data, (const unsigned char *) s.c_str(), len); + to.flush(); + size_t n = readString(data, len, from); startWork(); + if (n == 0) throw EndOfFile("unexpected end-of-file"); + return n; } }; @@ -241,11 +237,14 @@ struct TunnelSource : Source the contents of the file to `s'. Otherwise barf. */ struct RetrieveRegularNARSink : ParseSink { + bool regular; string s; + RetrieveRegularNARSink() : regular(true) { } + void createDirectory(const Path & path) { - throw Error("regular file expected"); + regular = false; } void receiveContents(unsigned char * data, unsigned int len) @@ -255,7 +254,7 @@ struct RetrieveRegularNARSink : ParseSink void createSymlink(const Path & path, const string & target) { - throw Error("regular file expected"); + regular = false; } }; @@ -266,10 +265,11 @@ struct SavingSourceAdapter : Source Source & orig; string s; SavingSourceAdapter(Source & orig) : orig(orig) { } - void operator () (unsigned char * data, unsigned int len) + size_t read(unsigned char * data, size_t len) { - orig(data, len); - s.append((const char *) data, len); + size_t n = orig.read(data, len); + s.append((const char *) data, n); + return n; } }; @@ -327,7 +327,7 @@ static void performOp(unsigned int clientVersion, store->queryReferrers(path, paths); else paths = store->queryDerivationOutputs(path); stopWork(); - writeStringSet(paths, to); + writeStrings(paths, to); break; } @@ -371,11 +371,11 @@ static void performOp(unsigned int clientVersion, addToStoreFromDump(). */ ParseSink sink; /* null sink; just parse the NAR */ parseDump(sink, savedNAR); - } else { + } else parseDump(savedRegular, from); - } startWork(); + if (!savedRegular.regular) throw Error("regular file expected"); Path path = dynamic_cast<LocalStore *>(store.get()) ->addToStoreFromDump(recursive ? savedNAR.s : savedRegular.s, baseName, recursive, hashAlgo); stopWork(); @@ -387,7 +387,7 @@ static void performOp(unsigned int clientVersion, case wopAddTextToStore: { string suffix = readString(from); string s = readString(from); - PathSet refs = readStorePaths(from); + PathSet refs = readStorePaths<PathSet>(from); startWork(); Path path = store->addTextToStore(suffix, s, refs); stopWork(); @@ -406,17 +406,17 @@ static void performOp(unsigned int clientVersion, break; } - case wopImportPath: { + case wopImportPaths: { startWork(); TunnelSource source(from); - Path path = store->importPath(true, source); + Paths paths = store->importPaths(true, source); stopWork(); - writeString(path, to); + writeStrings(paths, to); break; } case wopBuildDerivations: { - PathSet drvs = readStorePaths(from); + PathSet drvs = readStorePaths<PathSet>(from); startWork(); store->buildDerivations(drvs); stopWork(); @@ -474,7 +474,7 @@ static void performOp(unsigned int clientVersion, case wopCollectGarbage: { GCOptions options; options.action = (GCOptions::GCAction) readInt(from); - options.pathsToDelete = readStorePaths(from); + options.pathsToDelete = readStorePaths<PathSet>(from); options.ignoreLiveness = readInt(from); options.maxFreed = readLongLong(from); options.maxLinks = readInt(from); @@ -492,7 +492,7 @@ static void performOp(unsigned int clientVersion, store->collectGarbage(options, results); stopWork(); - writeStringSet(results.paths, to); + writeStrings(results.paths, to); writeLongLong(results.bytesFreed, to); writeLongLong(results.blocksFreed, to); @@ -530,7 +530,7 @@ static void performOp(unsigned int clientVersion, writeInt(res ? 1 : 0, to); if (res) { writeString(info.deriver, to); - writeStringSet(info.references, to); + writeStrings(info.references, to); writeLongLong(info.downloadSize, to); if (GET_PROTOCOL_MINOR(clientVersion) >= 7) writeLongLong(info.narSize, to); @@ -542,7 +542,7 @@ static void performOp(unsigned int clientVersion, startWork(); PathSet paths = store->queryValidPaths(); stopWork(); - writeStringSet(paths, to); + writeStrings(paths, to); break; } @@ -550,12 +550,12 @@ static void performOp(unsigned int clientVersion, startWork(); PathSet paths = store->queryFailedPaths(); stopWork(); - writeStringSet(paths, to); + writeStrings(paths, to); break; } case wopClearFailedPaths: { - PathSet paths = readStringSet(from); + PathSet paths = readStrings<PathSet>(from); startWork(); store->clearFailedPaths(paths); stopWork(); @@ -570,7 +570,7 @@ static void performOp(unsigned int clientVersion, stopWork(); writeString(info.deriver, to); writeString(printHash(info.hash), to); - writeStringSet(info.references, to); + writeStrings(info.references, to); writeInt(info.registrationTime, to); writeLongLong(info.narSize, to); break; @@ -603,8 +603,8 @@ static void processConnection() unsigned int magic = readInt(from); if (magic != WORKER_MAGIC_1) throw Error("protocol mismatch"); writeInt(WORKER_MAGIC_2, to); - writeInt(PROTOCOL_VERSION, to); + to.flush(); unsigned int clientVersion = readInt(from); /* Send startup error messages to the client. */ @@ -626,9 +626,11 @@ static void processConnection() store = boost::shared_ptr<StoreAPI>(new LocalStore()); stopWork(); + to.flush(); } catch (Error & e) { stopWork(false, e.msg()); + to.flush(); return; } @@ -648,9 +650,19 @@ static void processConnection() try { performOp(clientVersion, from, to, op); } catch (Error & e) { + /* If we're not in a state were we can send replies, then + something went wrong processing the input of the + client. This can happen especially if I/O errors occur + during addTextToStore() / importPath(). If that + happens, just send the error message and exit. */ + bool errorAllowed = canSendStderr; + if (!errorAllowed) printMsg(lvlError, format("error processing client input: %1%") % e.msg()); stopWork(false, e.msg(), GET_PROTOCOL_MINOR(clientVersion) >= 8 ? e.status : 0); + if (!errorAllowed) break; } + to.flush(); + assert(!canSendStderr); }; |