diff options
author | eldritch horrors <pennae@lix.systems> | 2024-04-25 14:44:02 +0200 |
---|---|---|
committer | eldritch horrors <pennae@lix.systems> | 2024-04-26 15:26:37 +0000 |
commit | fb0996aaa838d45b69c2b1bfd488874bacd5fc79 (patch) | |
tree | 03b97ff2845e9252500a39372a009ce62ad9c49f /src/libstore/filetransfer.cc | |
parent | dfe3baea12d759928b992dec0452f6636d42f990 (diff) |
filetransfer: remove dataCallback from interface
this is highly questionable. single-arg download calls will misbehave
with it set, and two-arg download calls will just overwrite it. being
an implementation detail this should not have been in the API at all.
Change-Id: I613772951ee03d8302366085f06a53601d13f132
Diffstat (limited to 'src/libstore/filetransfer.cc')
-rw-r--r-- | src/libstore/filetransfer.cc | 66 |
1 files changed, 38 insertions, 28 deletions
diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index b4887d9e8..aa293d2bd 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -49,6 +49,7 @@ struct curlFileTransfer : public FileTransfer Activity act; bool done = false; // whether either the success or failure function has been called Callback<FileTransferResult> callback; + std::function<void(std::string_view data)> dataCallback; CURL * req = 0; bool active = false; // whether the handle has been added to the multi object bool headersProcessed = false; @@ -82,20 +83,22 @@ struct curlFileTransfer : public FileTransfer TransferItem(curlFileTransfer & fileTransfer, const FileTransferRequest & request, - Callback<FileTransferResult> && callback) + Callback<FileTransferResult> && callback, + std::function<void(std::string_view data)> dataCallback) : fileTransfer(fileTransfer) , request(request) , act(*logger, lvlTalkative, actFileTransfer, fmt(request.data ? "uploading '%s'" : "downloading '%s'", request.uri), {request.uri}, request.parentAct) , callback(std::move(callback)) + , dataCallback(std::move(dataCallback)) , finalSink([this](std::string_view data) { auto httpStatus = getHTTPStatus(); /* Only write data to the sink if this is a successful response. */ - if (successfulStatuses.count(httpStatus) && this->request.dataCallback) { + if (successfulStatuses.count(httpStatus) && this->dataCallback) { writtenToSink += data.size(); - this->request.dataCallback(data); + this->dataCallback(data); } else this->result.data.append(data); }) @@ -455,7 +458,7 @@ struct curlFileTransfer : public FileTransfer ranged requests. */ if (err == Transient && attempt < request.tries - && (!this->request.dataCallback + && (!this->dataCallback || writtenToSink == 0 || (acceptRanges && encoding.empty()))) { @@ -666,6 +669,13 @@ struct curlFileTransfer : public FileTransfer void enqueueFileTransfer(const FileTransferRequest & request, Callback<FileTransferResult> callback) override { + enqueueFileTransfer(request, std::move(callback), {}); + } + + void enqueueFileTransfer(const FileTransferRequest & request, + Callback<FileTransferResult> callback, + std::function<void(std::string_view data)> dataCallback) + { /* Ugly hack to support s3:// URIs. */ if (request.uri.starts_with("s3://")) { // FIXME: do this on a worker thread @@ -694,7 +704,9 @@ struct curlFileTransfer : public FileTransfer return; } - enqueueItem(std::make_shared<TransferItem>(*this, request, std::move(callback))); + enqueueItem(std::make_shared<TransferItem>( + *this, request, std::move(callback), std::move(dataCallback) + )); } void download(FileTransferRequest && request, Sink & sink) override @@ -724,28 +736,6 @@ struct curlFileTransfer : public FileTransfer state->request.notify_one(); }); - request.dataCallback = [_state](std::string_view data) { - - auto state(_state->lock()); - - if (state->quit) return; - - /* If the buffer is full, then go to sleep until the calling - thread wakes us up (i.e. when it has removed data from the - buffer). We don't wait forever to prevent stalling the - download thread. (Hopefully sleeping will throttle the - sender.) */ - if (state->data.size() > 1024 * 1024) { - debug("download buffer is full; going to sleep"); - state.wait_for(state->request, std::chrono::seconds(10)); - } - - /* Append data to the buffer and wake up the calling - thread. */ - state->data.append(data); - state->avail.notify_one(); - }; - enqueueFileTransfer(request, {[_state](std::future<FileTransferResult> fut) { auto state(_state->lock()); @@ -757,7 +747,27 @@ struct curlFileTransfer : public FileTransfer } state->avail.notify_one(); state->request.notify_one(); - }}); + }}, + [_state](std::string_view data) { + auto state(_state->lock()); + + if (state->quit) return; + + /* If the buffer is full, then go to sleep until the calling + thread wakes us up (i.e. when it has removed data from the + buffer). We don't wait forever to prevent stalling the + download thread. (Hopefully sleeping will throttle the + sender.) */ + if (state->data.size() > 1024 * 1024) { + debug("download buffer is full; going to sleep"); + state.wait_for(state->request, std::chrono::seconds(10)); + } + + /* Append data to the buffer and wake up the calling + thread. */ + state->data.append(data); + state->avail.notify_one(); + }); while (true) { checkInterrupt(); |