aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/libstore/remote-store.cc69
-rw-r--r--src/libstore/worker-protocol.hh3
-rw-r--r--src/libutil/logging.hh2
-rw-r--r--src/nix-daemon/nix-daemon.cc80
4 files changed, 124 insertions, 30 deletions
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc
index e9f2cee80..f0e3502bf 100644
--- a/src/libstore/remote-store.cc
+++ b/src/libstore/remote-store.cc
@@ -629,17 +629,37 @@ RemoteStore::Connection::~Connection()
}
+static Logger::Fields readFields(Source & from)
+{
+ Logger::Fields fields;
+ size_t size = readInt(from);
+ for (size_t n = 0; n < size; n++) {
+ auto type = (decltype(Logger::Field::type)) readInt(from);
+ if (type == Logger::Field::tInt)
+ fields.push_back(readNum<uint64_t>(from));
+ else if (type == Logger::Field::tString)
+ fields.push_back(readString(from));
+ else
+ throw Error("got unsupported field type %x from Nix daemon", (int) type);
+ }
+ return fields;
+}
+
+
void RemoteStore::Connection::processStderr(Sink * sink, Source * source)
{
to.flush();
- unsigned int msg;
- while ((msg = readInt(from)) == STDERR_NEXT
- || msg == STDERR_READ || msg == STDERR_WRITE) {
+
+ while (true) {
+
+ auto msg = readNum<uint64_t>(from);
+
if (msg == STDERR_WRITE) {
string s = readString(from);
if (!sink) throw Error("no sink");
(*sink)(s);
}
+
else if (msg == STDERR_READ) {
if (!source) throw Error("no source");
size_t len = readNum<size_t>(from);
@@ -647,16 +667,43 @@ void RemoteStore::Connection::processStderr(Sink * sink, Source * source)
writeString(buf.get(), source->read(buf.get(), len), to);
to.flush();
}
- else
+
+ else if (msg == STDERR_ERROR) {
+ string error = readString(from);
+ unsigned int status = readInt(from);
+ throw Error(status, error);
+ }
+
+ else if (msg == STDERR_NEXT)
printError(chomp(readString(from)));
+
+ else if (msg == STDERR_START_ACTIVITY) {
+ auto act = readNum<ActivityId>(from);
+ auto type = (ActivityType) readInt(from);
+ auto s = readString(from);
+ auto fields = readFields(from);
+ auto parent = readNum<ActivityId>(from);
+ logger->startActivity(act, type, s, fields, parent);
+ }
+
+ else if (msg == STDERR_STOP_ACTIVITY) {
+ auto act = readNum<ActivityId>(from);
+ logger->stopActivity(act);
+ }
+
+ else if (msg == STDERR_RESULT) {
+ auto act = readNum<ActivityId>(from);
+ auto type = (ResultType) readInt(from);
+ auto fields = readFields(from);
+ logger->result(act, type, fields);
+ }
+
+ else if (msg == STDERR_LAST)
+ break;
+
+ else
+ throw Error("got unknown message type %x from Nix daemon", msg);
}
- if (msg == STDERR_ERROR) {
- string error = readString(from);
- unsigned int status = readInt(from);
- throw Error(status, error);
- }
- else if (msg != STDERR_LAST)
- throw Error("protocol error processing standard error");
}
diff --git a/src/libstore/worker-protocol.hh b/src/libstore/worker-protocol.hh
index 6c6766b36..9daeb46ad 100644
--- a/src/libstore/worker-protocol.hh
+++ b/src/libstore/worker-protocol.hh
@@ -57,6 +57,9 @@ typedef enum {
#define STDERR_WRITE 0x64617416 // data for sink
#define STDERR_LAST 0x616c7473
#define STDERR_ERROR 0x63787470
+#define STDERR_START_ACTIVITY 0x53545254
+#define STDERR_STOP_ACTIVITY 0x53544f50
+#define STDERR_RESULT 0x52534c54
Path readStorePath(Store & store, Source & from);
diff --git a/src/libutil/logging.hh b/src/libutil/logging.hh
index 2ec15cb68..e3e7c8e6f 100644
--- a/src/libutil/logging.hh
+++ b/src/libutil/logging.hh
@@ -47,7 +47,7 @@ public:
struct Field
{
// FIXME: use std::variant.
- enum { tInt, tString } type;
+ enum { tInt = 0, tString = 1 } type;
uint64_t i = 0;
std::string s;
Field(const std::string & s) : type(tString), s(s) { }
diff --git a/src/nix-daemon/nix-daemon.cc b/src/nix-daemon/nix-daemon.cc
index 3237333f4..65c88562c 100644
--- a/src/nix-daemon/nix-daemon.cc
+++ b/src/nix-daemon/nix-daemon.cc
@@ -56,6 +56,21 @@ static FdSource from(STDIN_FILENO);
static FdSink to(STDOUT_FILENO);
+Sink & operator << (Sink & sink, const Logger::Fields & fields)
+{
+ sink << fields.size();
+ for (auto & f : fields) {
+ sink << f.type;
+ if (f.type == Logger::Field::tInt)
+ sink << f.i;
+ else if (f.type == Logger::Field::tString)
+ sink << f.s;
+ else abort();
+ }
+ return sink;
+}
+
+
/* Logger that forwards log messages to the client, *if* we're in a
state where the protocol allows it (i.e., when canSendStderr is
true). */
@@ -69,15 +84,14 @@ struct TunnelLogger : public Logger
Sync<State> state_;
- void log(Verbosity lvl, const FormatOrString & fs) override
+ void enqueueMsg(const std::string & s)
{
- if (lvl > verbosity) return;
-
auto state(state_.lock());
if (state->canSendStderr) {
+ assert(state->pendingMsgs.empty());
try {
- to << STDERR_NEXT << (fs.s + "\n");
+ to(s);
to.flush();
} catch (...) {
/* Write failed; that means that the other side is
@@ -86,7 +100,16 @@ struct TunnelLogger : public Logger
throw;
}
} else
- state->pendingMsgs.push_back(fs.s);
+ state->pendingMsgs.push_back(s);
+ }
+
+ void log(Verbosity lvl, const FormatOrString & fs) override
+ {
+ if (lvl > verbosity) return;
+
+ StringSink buf;
+ buf << STDERR_NEXT << (fs.s + "\n");
+ enqueueMsg(*buf.s);
}
/* startWork() means that we're starting an operation for which we
@@ -99,7 +122,7 @@ struct TunnelLogger : public Logger
state->canSendStderr = true;
for (auto & msg : state->pendingMsgs)
- to << STDERR_NEXT << (msg + "\n");
+ to(msg);
state->pendingMsgs.clear();
@@ -121,6 +144,28 @@ struct TunnelLogger : public Logger
if (status != 0) to << status;
}
}
+
+ void startActivity(ActivityId act, ActivityType type,
+ const std::string & s, const Fields & fields, ActivityId parent) override
+ {
+ StringSink buf;
+ buf << STDERR_START_ACTIVITY << act << type << s << fields << parent;
+ enqueueMsg(*buf.s);
+ }
+
+ void stopActivity(ActivityId act) override
+ {
+ StringSink buf;
+ buf << STDERR_STOP_ACTIVITY << act;
+ enqueueMsg(*buf.s);
+ }
+
+ void result(ActivityId act, ResultType type, const Fields & fields) override
+ {
+ StringSink buf;
+ buf << STDERR_RESULT << act << type << fields;
+ enqueueMsg(*buf.s);
+ }
};
@@ -665,16 +710,15 @@ static void processConnection(bool trusted)
{
MonitorFdHup monitor(from.fd);
- TunnelLogger tunnelLogger;
+ auto tunnelLogger = new TunnelLogger();
auto prevLogger = nix::logger;
- logger = &tunnelLogger;
+ logger = tunnelLogger;
unsigned int opCount = 0;
Finally finally([&]() {
- logger = prevLogger;
_isInterrupted = false;
- debug("%d operations", opCount);
+ prevLogger->log(lvlDebug, fmt("%d operations", opCount));
});
/* Exchange the greeting. */
@@ -693,7 +737,7 @@ static void processConnection(bool trusted)
readInt(from); // obsolete reserveSpace
/* Send startup error messages to the client. */
- tunnelLogger.startWork();
+ tunnelLogger->startWork();
try {
@@ -713,7 +757,7 @@ static void processConnection(bool trusted)
params["path-info-cache-size"] = "0";
auto store = make_ref<LocalStore>(params);
- tunnelLogger.stopWork();
+ tunnelLogger->stopWork();
to.flush();
/* Process client requests. */
@@ -730,28 +774,28 @@ static void processConnection(bool trusted)
opCount++;
try {
- performOp(&tunnelLogger, store, trusted, clientVersion, from, to, op);
+ performOp(tunnelLogger, store, trusted, clientVersion, from, to, op);
} catch (Error & e) {
/* If we're not in a state where we can send replies, then
something went wrong processing the input of the
client. This can happen especially if I/O errors occur
during addTextToStore() / importPath(). If that
happens, just send the error message and exit. */
- bool errorAllowed = tunnelLogger.state_.lock()->canSendStderr;
- tunnelLogger.stopWork(false, e.msg(), e.status);
+ bool errorAllowed = tunnelLogger->state_.lock()->canSendStderr;
+ tunnelLogger->stopWork(false, e.msg(), e.status);
if (!errorAllowed) throw;
} catch (std::bad_alloc & e) {
- tunnelLogger.stopWork(false, "Nix daemon out of memory", 1);
+ tunnelLogger->stopWork(false, "Nix daemon out of memory", 1);
throw;
}
to.flush();
- assert(!tunnelLogger.state_.lock()->canSendStderr);
+ assert(!tunnelLogger->state_.lock()->canSendStderr);
};
} catch (std::exception & e) {
- tunnelLogger.stopWork(false, e.what(), 1);
+ tunnelLogger->stopWork(false, e.what(), 1);
to.flush();
return;
}