aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEelco Dolstra <edolstra@gmail.com>2017-08-28 14:17:07 +0200
committerEelco Dolstra <edolstra@gmail.com>2017-08-28 14:17:07 +0200
commit8fff3e7bb50377e48fb9c672e6551abae0fdf03d (patch)
tree6e5351a7b62c6ee24d6d85c4f02fc867d6327d08 /src
parent94a0548dc420e23c64b18d36ea8d50a2afbce6fb (diff)
Make TunnelLogger thread-safe
Now that we use threads in lots of places, it's possible for TunnelLogger::log() to be called asynchronously from other threads than the main loop. So we need to ensure that STDERR_NEXT messages don't clobber other messages.
Diffstat (limited to 'src')
-rw-r--r--src/nix-daemon/nix-daemon.cc238
1 files changed, 129 insertions, 109 deletions
diff --git a/src/nix-daemon/nix-daemon.cc b/src/nix-daemon/nix-daemon.cc
index 7e6f3aa25..3237333f4 100644
--- a/src/nix-daemon/nix-daemon.cc
+++ b/src/nix-daemon/nix-daemon.cc
@@ -8,6 +8,7 @@
#include "globals.hh"
#include "monitor-fd.hh"
#include "derivations.hh"
+#include "finally.hh"
#include <algorithm>
@@ -54,57 +55,73 @@ static ssize_t splice(int fd_in, void *off_in, int fd_out, void *off_out, size_t
static FdSource from(STDIN_FILENO);
static FdSink to(STDOUT_FILENO);
-static bool canSendStderr;
-
-static Logger * defaultLogger;
-
/* Logger that forwards log messages to the client, *if* we're in a
state where the protocol allows it (i.e., when canSendStderr is
true). */
-class TunnelLogger : public Logger
+struct TunnelLogger : public Logger
{
+ struct State
+ {
+ bool canSendStderr = false;
+ std::vector<std::string> pendingMsgs;
+ };
+
+ Sync<State> state_;
+
void log(Verbosity lvl, const FormatOrString & fs) override
{
if (lvl > verbosity) return;
- if (canSendStderr) {
+ auto state(state_.lock());
+
+ if (state->canSendStderr) {
try {
to << STDERR_NEXT << (fs.s + "\n");
to.flush();
} catch (...) {
/* Write failed; that means that the other side is
gone. */
- canSendStderr = false;
+ state->canSendStderr = false;
throw;
}
} else
- defaultLogger->log(lvl, fs);
+ state->pendingMsgs.push_back(fs.s);
}
-};
+ /* startWork() means that we're starting an operation for which we
+ want to send out stderr to the client. */
+ void startWork()
+ {
+ std::vector<std::string> pendingMsgs;
-/* startWork() means that we're starting an operation for which we
- want to send out stderr to the client. */
-static void startWork()
-{
- canSendStderr = true;
-}
+ auto state(state_.lock());
+ state->canSendStderr = true;
+ for (auto & msg : state->pendingMsgs)
+ to << STDERR_NEXT << (msg + "\n");
-/* stopWork() means that we're done; stop sending stderr to the
- client. */
-static void stopWork(bool success = true, const string & msg = "", unsigned int status = 0)
-{
- canSendStderr = false;
+ state->pendingMsgs.clear();
- if (success)
- to << STDERR_LAST;
- else {
- to << STDERR_ERROR << msg;
- if (status != 0) to << status;
+ to.flush();
}
-}
+
+ /* stopWork() means that we're done; stop sending stderr to the
+ client. */
+ void stopWork(bool success = true, const string & msg = "", unsigned int status = 0)
+ {
+ auto state(state_.lock());
+
+ state->canSendStderr = false;
+
+ if (success)
+ to << STDERR_LAST;
+ else {
+ to << STDERR_ERROR << msg;
+ if (status != 0) to << status;
+ }
+ }
+};
struct TunnelSink : Sink
@@ -160,7 +177,8 @@ struct RetrieveRegularNARSink : ParseSink
};
-static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVersion,
+static void performOp(TunnelLogger * logger, ref<LocalStore> store,
+ bool trusted, unsigned int clientVersion,
Source & from, Sink & to, unsigned int op)
{
switch (op) {
@@ -172,46 +190,46 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
that the 'Error' exception handler doesn't close the
connection. */
Path path = readString(from);
- startWork();
+ logger->startWork();
store->assertStorePath(path);
bool result = store->isValidPath(path);
- stopWork();
+ logger->stopWork();
to << result;
break;
}
case wopQueryValidPaths: {
PathSet paths = readStorePaths<PathSet>(*store, from);
- startWork();
+ logger->startWork();
PathSet res = store->queryValidPaths(paths);
- stopWork();
+ logger->stopWork();
to << res;
break;
}
case wopHasSubstitutes: {
Path path = readStorePath(*store, from);
- startWork();
+ logger->startWork();
PathSet res = store->querySubstitutablePaths({path});
- stopWork();
+ logger->stopWork();
to << (res.find(path) != res.end());
break;
}
case wopQuerySubstitutablePaths: {
PathSet paths = readStorePaths<PathSet>(*store, from);
- startWork();
+ logger->startWork();
PathSet res = store->querySubstitutablePaths(paths);
- stopWork();
+ logger->stopWork();
to << res;
break;
}
case wopQueryPathHash: {
Path path = readStorePath(*store, from);
- startWork();
+ logger->startWork();
auto hash = store->queryPathInfo(path)->narHash;
- stopWork();
+ logger->stopWork();
to << hash.to_string(Base16, false);
break;
}
@@ -221,7 +239,7 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
case wopQueryValidDerivers:
case wopQueryDerivationOutputs: {
Path path = readStorePath(*store, from);
- startWork();
+ logger->startWork();
PathSet paths;
if (op == wopQueryReferences)
paths = store->queryPathInfo(path)->references;
@@ -230,35 +248,35 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
else if (op == wopQueryValidDerivers)
paths = store->queryValidDerivers(path);
else paths = store->queryDerivationOutputs(path);
- stopWork();
+ logger->stopWork();
to << paths;
break;
}
case wopQueryDerivationOutputNames: {
Path path = readStorePath(*store, from);
- startWork();
+ logger->startWork();
StringSet names;
names = store->queryDerivationOutputNames(path);
- stopWork();
+ logger->stopWork();
to << names;
break;
}
case wopQueryDeriver: {
Path path = readStorePath(*store, from);
- startWork();
+ logger->startWork();
auto deriver = store->queryPathInfo(path)->deriver;
- stopWork();
+ logger->stopWork();
to << deriver;
break;
}
case wopQueryPathFromHashPart: {
string hashPart = readString(from);
- startWork();
+ logger->startWork();
Path path = store->queryPathFromHashPart(hashPart);
- stopWork();
+ logger->stopWork();
to << path;
break;
}
@@ -286,10 +304,10 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
} else
parseDump(savedRegular, from);
- startWork();
+ logger->startWork();
if (!savedRegular.regular) throw Error("regular file expected");
Path path = store->addToStoreFromDump(recursive ? *savedNAR.data : savedRegular.s, baseName, recursive, hashAlgo);
- stopWork();
+ logger->stopWork();
to << path;
break;
@@ -299,9 +317,9 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
string suffix = readString(from);
string s = readString(from);
PathSet refs = readStorePaths<PathSet>(*store, from);
- startWork();
+ logger->startWork();
Path path = store->addTextToStore(suffix, s, refs, NoRepair);
- stopWork();
+ logger->stopWork();
to << path;
break;
}
@@ -309,20 +327,20 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
case wopExportPath: {
Path path = readStorePath(*store, from);
readInt(from); // obsolete
- startWork();
+ logger->startWork();
TunnelSink sink(to);
store->exportPath(path, sink);
- stopWork();
+ logger->stopWork();
to << 1;
break;
}
case wopImportPaths: {
- startWork();
+ logger->startWork();
TunnelSource source(from);
Paths paths = store->importPaths(source, nullptr,
trusted ? NoCheckSigs : CheckSigs);
- stopWork();
+ logger->stopWork();
to << paths;
break;
}
@@ -338,9 +356,9 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
if (mode == bmRepair && !trusted)
throw Error("repairing is not supported when building through the Nix daemon");
}
- startWork();
+ logger->startWork();
store->buildPaths(drvs, mode);
- stopWork();
+ logger->stopWork();
to << 1;
break;
}
@@ -350,54 +368,54 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
BasicDerivation drv;
readDerivation(from, *store, drv);
BuildMode buildMode = (BuildMode) readInt(from);
- startWork();
+ logger->startWork();
if (!trusted)
throw Error("you are not privileged to build derivations");
auto res = store->buildDerivation(drvPath, drv, buildMode);
- stopWork();
+ logger->stopWork();
to << res.status << res.errorMsg;
break;
}
case wopEnsurePath: {
Path path = readStorePath(*store, from);
- startWork();
+ logger->startWork();
store->ensurePath(path);
- stopWork();
+ logger->stopWork();
to << 1;
break;
}
case wopAddTempRoot: {
Path path = readStorePath(*store, from);
- startWork();
+ logger->startWork();
store->addTempRoot(path);
- stopWork();
+ logger->stopWork();
to << 1;
break;
}
case wopAddIndirectRoot: {
Path path = absPath(readString(from));
- startWork();
+ logger->startWork();
store->addIndirectRoot(path);
- stopWork();
+ logger->stopWork();
to << 1;
break;
}
case wopSyncWithGC: {
- startWork();
+ logger->startWork();
store->syncWithGC();
- stopWork();
+ logger->stopWork();
to << 1;
break;
}
case wopFindRoots: {
- startWork();
+ logger->startWork();
Roots roots = store->findRoots();
- stopWork();
+ logger->stopWork();
to << roots.size();
for (auto & i : roots)
to << i.first << i.second;
@@ -416,11 +434,11 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
GCResults results;
- startWork();
+ logger->startWork();
if (options.ignoreLiveness)
throw Error("you are not allowed to ignore liveness");
store->collectGarbage(options, results);
- stopWork();
+ logger->stopWork();
to << results.paths << results.bytesFreed << 0 /* obsolete */;
@@ -451,7 +469,7 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
}
}
- startWork();
+ logger->startWork();
for (auto & i : overrides) {
auto & name(i.first);
@@ -492,16 +510,16 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
}
}
- stopWork();
+ logger->stopWork();
break;
}
case wopQuerySubstitutablePathInfo: {
Path path = absPath(readString(from));
- startWork();
+ logger->startWork();
SubstitutablePathInfos infos;
store->querySubstitutablePathInfos({path}, infos);
- stopWork();
+ logger->stopWork();
SubstitutablePathInfos::iterator i = infos.find(path);
if (i == infos.end())
to << 0;
@@ -513,10 +531,10 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
case wopQuerySubstitutablePathInfos: {
PathSet paths = readStorePaths<PathSet>(*store, from);
- startWork();
+ logger->startWork();
SubstitutablePathInfos infos;
store->querySubstitutablePathInfos(paths, infos);
- stopWork();
+ logger->stopWork();
to << infos.size();
for (auto & i : infos) {
to << i.first << i.second.deriver << i.second.references
@@ -526,9 +544,9 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
}
case wopQueryAllValidPaths: {
- startWork();
+ logger->startWork();
PathSet paths = store->queryAllValidPaths();
- stopWork();
+ logger->stopWork();
to << paths;
break;
}
@@ -536,13 +554,13 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
case wopQueryPathInfo: {
Path path = readStorePath(*store, from);
std::shared_ptr<const ValidPathInfo> info;
- startWork();
+ logger->startWork();
try {
info = store->queryPathInfo(path);
} catch (InvalidPath &) {
if (GET_PROTOCOL_MINOR(clientVersion) < 17) throw;
}
- stopWork();
+ logger->stopWork();
if (info) {
if (GET_PROTOCOL_MINOR(clientVersion) >= 17)
to << 1;
@@ -561,20 +579,20 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
}
case wopOptimiseStore:
- startWork();
+ logger->startWork();
store->optimiseStore();
- stopWork();
+ logger->stopWork();
to << 1;
break;
case wopVerifyStore: {
bool checkContents, repair;
from >> checkContents >> repair;
- startWork();
+ logger->startWork();
if (repair && !trusted)
throw Error("you are not privileged to repair paths");
bool errors = store->verifyStore(checkContents, (RepairFlag) repair);
- stopWork();
+ logger->stopWork();
to << errors;
break;
}
@@ -582,19 +600,19 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
case wopAddSignatures: {
Path path = readStorePath(*store, from);
StringSet sigs = readStrings<StringSet>(from);
- startWork();
+ logger->startWork();
if (!trusted)
throw Error("you are not privileged to add signatures");
store->addSignatures(path, sigs);
- stopWork();
+ logger->stopWork();
to << 1;
break;
}
case wopNarFromPath: {
auto path = readStorePath(*store, from);
- startWork();
- stopWork();
+ logger->startWork();
+ logger->stopWork();
dumpPath(path, to);
break;
}
@@ -619,20 +637,20 @@ static void performOp(ref<LocalStore> store, bool trusted, unsigned int clientVe
TeeSink tee(from);
parseDump(tee, tee.source);
- startWork();
+ logger->startWork();
store->addToStore(info, tee.source.data, (RepairFlag) repair,
dontCheckSigs ? NoCheckSigs : CheckSigs, nullptr);
- stopWork();
+ logger->stopWork();
break;
}
case wopQueryMissing: {
PathSet targets = readStorePaths<PathSet>(*store, from);
- startWork();
+ logger->startWork();
PathSet willBuild, willSubstitute, unknown;
unsigned long long downloadSize, narSize;
store->queryMissing(targets, willBuild, willSubstitute, unknown, downloadSize, narSize);
- stopWork();
+ logger->stopWork();
to << willBuild << willSubstitute << unknown << downloadSize << narSize;
break;
}
@@ -647,9 +665,17 @@ static void processConnection(bool trusted)
{
MonitorFdHup monitor(from.fd);
- canSendStderr = false;
- defaultLogger = logger;
- logger = new TunnelLogger();
+ TunnelLogger tunnelLogger;
+ auto prevLogger = nix::logger;
+ logger = &tunnelLogger;
+
+ unsigned int opCount = 0;
+
+ Finally finally([&]() {
+ logger = prevLogger;
+ _isInterrupted = false;
+ debug("%d operations", opCount);
+ });
/* Exchange the greeting. */
unsigned int magic = readInt(from);
@@ -667,7 +693,7 @@ static void processConnection(bool trusted)
readInt(from); // obsolete reserveSpace
/* Send startup error messages to the client. */
- startWork();
+ tunnelLogger.startWork();
try {
@@ -687,12 +713,10 @@ static void processConnection(bool trusted)
params["path-info-cache-size"] = "0";
auto store = make_ref<LocalStore>(params);
- stopWork();
+ tunnelLogger.stopWork();
to.flush();
/* Process client requests. */
- unsigned int opCount = 0;
-
while (true) {
WorkerOp op;
try {
@@ -706,32 +730,28 @@ static void processConnection(bool trusted)
opCount++;
try {
- performOp(store, trusted, clientVersion, from, to, op);
+ performOp(&tunnelLogger, store, trusted, clientVersion, from, to, op);
} catch (Error & e) {
/* If we're not in a state where we can send replies, then
something went wrong processing the input of the
client. This can happen especially if I/O errors occur
during addTextToStore() / importPath(). If that
happens, just send the error message and exit. */
- bool errorAllowed = canSendStderr;
- stopWork(false, e.msg(), e.status);
+ bool errorAllowed = tunnelLogger.state_.lock()->canSendStderr;
+ tunnelLogger.stopWork(false, e.msg(), e.status);
if (!errorAllowed) throw;
} catch (std::bad_alloc & e) {
- stopWork(false, "Nix daemon out of memory", 1);
+ tunnelLogger.stopWork(false, "Nix daemon out of memory", 1);
throw;
}
to.flush();
- assert(!canSendStderr);
+ assert(!tunnelLogger.state_.lock()->canSendStderr);
};
- canSendStderr = false;
- _isInterrupted = false;
- debug(format("%1% operations") % opCount);
-
- } catch (Error & e) {
- stopWork(false, e.msg(), 1);
+ } catch (std::exception & e) {
+ tunnelLogger.stopWork(false, e.what(), 1);
to.flush();
return;
}