aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Hensing <robert@roberthensing.nl>2020-09-17 22:01:35 +0200
committerRobert Hensing <robert@roberthensing.nl>2020-09-21 07:55:47 +0200
commit8279178b078103f816bf85f5a153a4845d30cc83 (patch)
treedb0ae65fa87e37ca47b86ad0bcef6c5977394bd9
parentecc8088cb7e91e4c45787f290c8ff61547fb838a (diff)
Move FramedSink next to FramedSource
-rw-r--r--src/libstore/remote-store.cc35
-rw-r--r--src/libutil/serialise.hh46
2 files changed, 45 insertions, 36 deletions
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc
index 95e6a3291..cfbf23ac4 100644
--- a/src/libstore/remote-store.cc
+++ b/src/libstore/remote-store.cc
@@ -957,39 +957,6 @@ std::exception_ptr RemoteStore::Connection::processStderr(Sink * sink, Source *
return nullptr;
}
-
-struct FramedSink : nix::BufferedSink
-{
- ConnectionHandle & conn;
- std::exception_ptr & ex;
-
- FramedSink(ConnectionHandle & conn, std::exception_ptr & ex) : conn(conn), ex(ex)
- { }
-
- ~FramedSink()
- {
- try {
- conn->to << 0;
- conn->to.flush();
- } catch (...) {
- ignoreException();
- }
- }
-
- void write(const unsigned char * data, size_t len) override
- {
- /* Don't send more data if the remote has
- encountered an error. */
- if (ex) {
- auto ex2 = ex;
- ex = nullptr;
- std::rethrow_exception(ex2);
- }
- conn->to << len;
- conn->to(data, len);
- };
-};
-
void
ConnectionHandle::withFramedSink(std::function<void(Sink &sink)> fun) {
(*this)->to.flush();
@@ -1022,7 +989,7 @@ ConnectionHandle::withFramedSink(std::function<void(Sink &sink)> fun) {
});
{
- FramedSink sink(*this, ex);
+ FramedSink sink((*this)->to, ex);
fun(sink);
sink.flush();
}
diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh
index 6027d5961..3511e4cd8 100644
--- a/src/libutil/serialise.hh
+++ b/src/libutil/serialise.hh
@@ -406,8 +406,13 @@ struct StreamToSourceAdapter : Source
};
-/* Like SizedSource, but guarantees that the whole frame is consumed after
- destruction. This ensures that the original stream is in a known state. */
+/* A source that reads a distinct format of concatenated chunks back into its
+ logical form, in order to guarantee a known state to the original stream,
+ even in the event of errors.
+
+ Use with FramedSink, which also allows the logical stream to be terminated
+ in the event of an exception.
+*/
struct FramedSource : Source
{
Source & from;
@@ -452,5 +457,42 @@ struct FramedSource : Source
}
};
+/* Write as chunks in the format expected by FramedSource.
+
+ The exception_ptr reference can be used to terminate the stream when you
+ detect that an error has occurred on the remote end.
+*/
+struct FramedSink : nix::BufferedSink
+{
+ BufferedSink & to;
+ std::exception_ptr & ex;
+
+ FramedSink(BufferedSink & to, std::exception_ptr & ex) : to(to), ex(ex)
+ { }
+
+ ~FramedSink()
+ {
+ try {
+ to << 0;
+ to.flush();
+ } catch (...) {
+ ignoreException();
+ }
+ }
+
+ void write(const unsigned char * data, size_t len) override
+ {
+ /* Don't send more data if the remote has
+ encountered an error. */
+ if (ex) {
+ auto ex2 = ex;
+ ex = nullptr;
+ std::rethrow_exception(ex2);
+ }
+ to << len;
+ to(data, len);
+ };
+};
+
}