aboutsummaryrefslogtreecommitdiff
path: root/src/libstore/filetransfer.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstore/filetransfer.cc')
-rw-r--r--src/libstore/filetransfer.cc198
1 files changed, 99 insertions, 99 deletions
diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc
index 9dc742220..b4887d9e8 100644
--- a/src/libstore/filetransfer.cc
+++ b/src/libstore/filetransfer.cc
@@ -696,6 +696,105 @@ struct curlFileTransfer : public FileTransfer
enqueueItem(std::make_shared<TransferItem>(*this, request, std::move(callback)));
}
+
+ void download(FileTransferRequest && request, Sink & sink) override
+ {
+ /* Note: we can't call 'sink' via request.dataCallback, because
+ that would cause the sink to execute on the fileTransfer
+ thread. If 'sink' is a coroutine, this will fail. Also, if the
+ sink is expensive (e.g. one that does decompression and writing
+ to the Nix store), it would stall the download thread too much.
+ Therefore we use a buffer to communicate data between the
+ download thread and the calling thread. */
+
+ struct State {
+ bool quit = false;
+ std::exception_ptr exc;
+ std::string data;
+ std::condition_variable avail, request;
+ };
+
+ auto _state = std::make_shared<Sync<State>>();
+
+ /* In case of an exception, wake up the download thread. FIXME:
+ abort the download request. */
+ Finally finally([&]() {
+ auto state(_state->lock());
+ state->quit = true;
+ state->request.notify_one();
+ });
+
+ request.dataCallback = [_state](std::string_view data) {
+
+ auto state(_state->lock());
+
+ if (state->quit) return;
+
+ /* If the buffer is full, then go to sleep until the calling
+ thread wakes us up (i.e. when it has removed data from the
+ buffer). We don't wait forever to prevent stalling the
+ download thread. (Hopefully sleeping will throttle the
+ sender.) */
+ if (state->data.size() > 1024 * 1024) {
+ debug("download buffer is full; going to sleep");
+ state.wait_for(state->request, std::chrono::seconds(10));
+ }
+
+ /* Append data to the buffer and wake up the calling
+ thread. */
+ state->data.append(data);
+ state->avail.notify_one();
+ };
+
+ enqueueFileTransfer(request,
+ {[_state](std::future<FileTransferResult> fut) {
+ auto state(_state->lock());
+ state->quit = true;
+ try {
+ fut.get();
+ } catch (...) {
+ state->exc = std::current_exception();
+ }
+ state->avail.notify_one();
+ state->request.notify_one();
+ }});
+
+ while (true) {
+ checkInterrupt();
+
+ std::string chunk;
+
+ /* Grab data if available, otherwise wait for the download
+ thread to wake us up. */
+ {
+ auto state(_state->lock());
+
+ if (state->data.empty()) {
+
+ if (state->quit) {
+ if (state->exc) std::rethrow_exception(state->exc);
+ return;
+ }
+
+ state.wait(state->avail);
+
+ if (state->data.empty()) continue;
+ }
+
+ chunk = std::move(state->data);
+ /* Reset state->data after the move, since we check data.empty() */
+ state->data = "";
+
+ state->request.notify_one();
+ }
+
+ /* Flush the data to the sink and wake up the download thread
+ if it's blocked on a full buffer. We don't hold the state
+ lock while doing this to prevent blocking the download
+ thread if sink() takes a long time. */
+ sink(chunk);
+ }
+ }
};
ref<curlFileTransfer> makeCurlFileTransfer()
@@ -743,105 +842,6 @@ FileTransferResult FileTransfer::upload(const FileTransferRequest & request)
return enqueueFileTransfer(request).get();
}
-void FileTransfer::download(FileTransferRequest && request, Sink & sink)
-{
- /* Note: we can't call 'sink' via request.dataCallback, because
- that would cause the sink to execute on the fileTransfer
- thread. If 'sink' is a coroutine, this will fail. Also, if the
- sink is expensive (e.g. one that does decompression and writing
- to the Nix store), it would stall the download thread too much.
- Therefore we use a buffer to communicate data between the
- download thread and the calling thread. */
-
- struct State {
- bool quit = false;
- std::exception_ptr exc;
- std::string data;
- std::condition_variable avail, request;
- };
-
- auto _state = std::make_shared<Sync<State>>();
-
- /* In case of an exception, wake up the download thread. FIXME:
- abort the download request. */
- Finally finally([&]() {
- auto state(_state->lock());
- state->quit = true;
- state->request.notify_one();
- });
-
- request.dataCallback = [_state](std::string_view data) {
-
- auto state(_state->lock());
-
- if (state->quit) return;
-
- /* If the buffer is full, then go to sleep until the calling
- thread wakes us up (i.e. when it has removed data from the
- buffer). We don't wait forever to prevent stalling the
- download thread. (Hopefully sleeping will throttle the
- sender.) */
- if (state->data.size() > 1024 * 1024) {
- debug("download buffer is full; going to sleep");
- state.wait_for(state->request, std::chrono::seconds(10));
- }
-
- /* Append data to the buffer and wake up the calling
- thread. */
- state->data.append(data);
- state->avail.notify_one();
- };
-
- enqueueFileTransfer(request,
- {[_state](std::future<FileTransferResult> fut) {
- auto state(_state->lock());
- state->quit = true;
- try {
- fut.get();
- } catch (...) {
- state->exc = std::current_exception();
- }
- state->avail.notify_one();
- state->request.notify_one();
- }});
-
- while (true) {
- checkInterrupt();
-
- std::string chunk;
-
- /* Grab data if available, otherwise wait for the download
- thread to wake us up. */
- {
- auto state(_state->lock());
-
- if (state->data.empty()) {
-
- if (state->quit) {
- if (state->exc) std::rethrow_exception(state->exc);
- return;
- }
-
- state.wait(state->avail);
-
- if (state->data.empty()) continue;
- }
-
- chunk = std::move(state->data);
- /* Reset state->data after the move, since we check data.empty() */
- state->data = "";
-
- state->request.notify_one();
- }
-
- /* Flush the data to the sink and wake up the download thread
- if it's blocked on a full buffer. We don't hold the state
- lock while doing this to prevent blocking the download
- thread if sink() takes a long time. */
- sink(chunk);
- }
-}
-
template<typename... Args>
FileTransferError::FileTransferError(FileTransfer::Error error, std::optional<std::string> response, const Args & ... args)
: Error(args...), error(error), response(response)