aboutsummaryrefslogtreecommitdiff
path: root/src/libstore/remote-store.cc
diff options
context:
space:
mode:
authorRobert Hensing <robert@roberthensing.nl>2020-09-17 17:36:16 +0200
committerRobert Hensing <robert@roberthensing.nl>2020-09-17 20:21:04 +0200
commit14b30b3f3d5af75c210a15cb128e67c0eff66149 (patch)
tree81db6f5789083371286a1efa1bd347066c058e43 /src/libstore/remote-store.cc
parentdfa547c6a81d6fb7ef3d3f69a98ebe969df42828 (diff)
Move FramedSource and FramedSink, extract withFramedSink
Diffstat (limited to 'src/libstore/remote-store.cc')
-rw-r--r--src/libstore/remote-store.cc151
1 files changed, 80 insertions, 71 deletions
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc
index e92b94975..993b05dc0 100644
--- a/src/libstore/remote-store.cc
+++ b/src/libstore/remote-store.cc
@@ -306,6 +306,8 @@ struct ConnectionHandle
std::rethrow_exception(ex);
}
}
+
+ void withFramedSink(std::function<void(Sink & sink)> fun);
};
@@ -564,78 +566,9 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
<< repair << !checkSigs;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 23) {
-
- conn->to.flush();
-
- std::exception_ptr ex;
-
- struct FramedSink : BufferedSink
- {
- ConnectionHandle & conn;
- std::exception_ptr & ex;
-
- FramedSink(ConnectionHandle & conn, std::exception_ptr & ex) : conn(conn), ex(ex)
- { }
-
- ~FramedSink()
- {
- try {
- conn->to << 0;
- conn->to.flush();
- } catch (...) {
- ignoreException();
- }
- }
-
- void write(const unsigned char * data, size_t len) override
- {
- /* Don't send more data if the remote has
- encountered an error. */
- if (ex) {
- auto ex2 = ex;
- ex = nullptr;
- std::rethrow_exception(ex2);
- }
- conn->to << len;
- conn->to(data, len);
- };
- };
-
- /* Handle log messages / exceptions from the remote on a
- separate thread. */
- std::thread stderrThread([&]()
- {
- try {
- conn.processStderr(nullptr, nullptr, false);
- } catch (...) {
- ex = std::current_exception();
- }
- });
-
- Finally joinStderrThread([&]()
- {
- if (stderrThread.joinable()) {
- stderrThread.join();
- if (ex) {
- try {
- std::rethrow_exception(ex);
- } catch (...) {
- ignoreException();
- }
- }
- }
- });
-
- {
- FramedSink sink(conn, ex);
+ conn.withFramedSink([&](Sink & sink) {
copyNAR(source, sink);
- sink.flush();
- }
-
- stderrThread.join();
- if (ex)
- std::rethrow_exception(ex);
-
+ });
} else if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21) {
conn.processStderr(0, &source);
} else {
@@ -992,6 +925,82 @@ std::exception_ptr RemoteStore::Connection::processStderr(Sink * sink, Source *
return nullptr;
}
+
+struct FramedSink : nix::BufferedSink
+{
+ ConnectionHandle & conn;
+ std::exception_ptr & ex;
+
+ FramedSink(ConnectionHandle & conn, std::exception_ptr & ex) : conn(conn), ex(ex)
+ { }
+
+ ~FramedSink()
+ {
+ try {
+ conn->to << 0;
+ conn->to.flush();
+ } catch (...) {
+ ignoreException();
+ }
+ }
+
+ void write(const unsigned char * data, size_t len) override
+ {
+ /* Don't send more data if the remote has
+ encountered an error. */
+ if (ex) {
+ auto ex2 = ex;
+ ex = nullptr;
+ std::rethrow_exception(ex2);
+ }
+ conn->to << len;
+ conn->to(data, len);
+ };
+};
+
+void
+ConnectionHandle::withFramedSink(std::function<void(Sink &sink)> fun) {
+ (*this)->to.flush();
+
+ std::exception_ptr ex;
+
+ /* Handle log messages / exceptions from the remote on a
+ separate thread. */
+ std::thread stderrThread([&]()
+ {
+ try {
+ processStderr(nullptr, nullptr, false);
+ } catch (...) {
+ ex = std::current_exception();
+ }
+ });
+
+ Finally joinStderrThread([&]()
+ {
+ if (stderrThread.joinable()) {
+ stderrThread.join();
+ if (ex) {
+ try {
+ std::rethrow_exception(ex);
+ } catch (...) {
+ ignoreException();
+ }
+ }
+ }
+ });
+
+ {
+ FramedSink sink(*this, ex);
+ fun(sink);
+ sink.flush();
+ }
+
+ stderrThread.join();
+ if (ex)
+ std::rethrow_exception(ex);
+
+}
+
static RegisterStoreImplementation<UDSRemoteStore, UDSRemoteStoreConfig> regStore;
}