aboutsummaryrefslogtreecommitdiff
path: root/src/libstore/filetransfer.cc
diff options
context:
space:
mode:
authoreldritch horrors <pennae@lix.systems>2024-04-25 19:30:01 +0200
committereldritch horrors <pennae@lix.systems>2024-04-26 15:26:37 +0000
commita1ad4e52a667d76472e8a5a3daf44c0eb34c2150 (patch)
tree69ad1f98d7aa9456ba48f8c536ee239f8de59ea1 /src/libstore/filetransfer.cc
parentfb0996aaa838d45b69c2b1bfd488874bacd5fc79 (diff)
filetransfer: don't decompress in curl wrapper itself
only decompress the response once all data has been received (in the fully buffered case), or at least outside of the curl wrapper itself (in the receive-to-sink case). unfortunately this means we will have to duplicate decompression logic for these two cases for time being, but once the curl wrapper has been rewritten to return a real future or Source we can deduplicate this logic again. the curl wrapper will have to turn into a proper Source first and use decompression source logic which also does not currently exist—only decompression *sinks* Change-Id: I66bc692f07d9b9e69fe10689ee73a2de8d65e35c
Diffstat (limited to 'src/libstore/filetransfer.cc')
-rw-r--r--src/libstore/filetransfer.cc58
1 files changed, 28 insertions, 30 deletions
diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc
index aa293d2bd..c9f5d6260 100644
--- a/src/libstore/filetransfer.cc
+++ b/src/libstore/filetransfer.cc
@@ -49,7 +49,7 @@ struct curlFileTransfer : public FileTransfer
Activity act;
bool done = false; // whether either the success or failure function has been called
Callback<FileTransferResult> callback;
- std::function<void(std::string_view data)> dataCallback;
+ std::function<void(TransferItem &, std::string_view data)> dataCallback;
CURL * req = 0;
bool active = false; // whether the handle has been added to the multi object
bool headersProcessed = false;
@@ -84,7 +84,7 @@ struct curlFileTransfer : public FileTransfer
TransferItem(curlFileTransfer & fileTransfer,
const FileTransferRequest & request,
Callback<FileTransferResult> && callback,
- std::function<void(std::string_view data)> dataCallback)
+ std::function<void(TransferItem &, std::string_view data)> dataCallback)
: fileTransfer(fileTransfer)
, request(request)
, act(*logger, lvlTalkative, actFileTransfer,
@@ -92,16 +92,6 @@ struct curlFileTransfer : public FileTransfer
{request.uri}, request.parentAct)
, callback(std::move(callback))
, dataCallback(std::move(dataCallback))
- , finalSink([this](std::string_view data) {
- auto httpStatus = getHTTPStatus();
- /* Only write data to the sink if this is a
- successful response. */
- if (successfulStatuses.count(httpStatus) && this->dataCallback) {
- writtenToSink += data.size();
- this->dataCallback(data);
- } else
- this->result.data.append(data);
- })
{
requestHeaders = curl_slist_append(requestHeaders, "Accept-Encoding: zstd, br, gzip, deflate, bzip2, xz");
if (!request.expectedETag.empty())
@@ -142,9 +132,6 @@ struct curlFileTransfer : public FileTransfer
failEx(std::make_exception_ptr(std::forward<T>(e)));
}
- LambdaSink finalSink;
- std::shared_ptr<FinishSink> decompressionSink;
-
std::exception_ptr writeException;
std::optional<std::string> getHeader(const char * name)
@@ -177,12 +164,13 @@ struct curlFileTransfer : public FileTransfer
size_t realSize = size * nmemb;
result.bodySize += realSize;
- if (!decompressionSink) {
- decompressionSink = makeDecompressionSink(encoding, finalSink);
+ if (successfulStatuses.count(getHTTPStatus()) && this->dataCallback) {
+ writtenToSink += realSize;
+ dataCallback(*this, {(const char *) contents, realSize});
+ } else {
+ this->result.data.append((const char *) contents, realSize);
}
- (*decompressionSink)({(char *) contents, realSize});
-
return realSize;
} catch (...) {
writeException = std::current_exception();
@@ -345,14 +333,6 @@ struct curlFileTransfer : public FileTransfer
debug("finished %s of '%s'; curl status = %d, HTTP status = %d, body = %d bytes",
request.verb(), request.uri, code, httpStatus, result.bodySize);
- if (decompressionSink) {
- try {
- decompressionSink->finish();
- } catch (...) {
- writeException = std::current_exception();
- }
- }
-
auto link = getHeader("link");
if (!link) {
link = getHeader("x-amz-meta-link");
@@ -372,6 +352,14 @@ struct curlFileTransfer : public FileTransfer
result.etag = std::move(*etag);
}
+ // this has to happen here until we can return an actual future.
+ // wrapping user `callback`s instead is not possible because the
+ // Callback api expects std::functions, and copying Callbacks is
+ // not possible due the promises they hold.
+ if (code == CURLE_OK && !dataCallback) {
+ result.data = decompress(encoding, result.data);
+ }
+
if (writeException)
failEx(writeException);
@@ -674,7 +662,7 @@ struct curlFileTransfer : public FileTransfer
void enqueueFileTransfer(const FileTransferRequest & request,
Callback<FileTransferResult> callback,
- std::function<void(std::string_view data)> dataCallback)
+ std::function<void(TransferItem &, std::string_view data)> dataCallback)
{
/* Ugly hack to support s3:// URIs. */
if (request.uri.starts_with("s3://")) {
@@ -724,6 +712,7 @@ struct curlFileTransfer : public FileTransfer
std::exception_ptr exc;
std::string data;
std::condition_variable avail, request;
+ std::unique_ptr<FinishSink> decompressor;
};
auto _state = std::make_shared<Sync<State>>();
@@ -748,11 +737,15 @@ struct curlFileTransfer : public FileTransfer
state->avail.notify_one();
state->request.notify_one();
}},
- [_state](std::string_view data) {
+ [_state, &sink](TransferItem & transfer, std::string_view data) {
auto state(_state->lock());
if (state->quit) return;
+ if (!state->decompressor) {
+ state->decompressor = makeDecompressionSink(transfer.encoding, sink);
+ }
+
/* If the buffer is full, then go to sleep until the calling
thread wakes us up (i.e. when it has removed data from the
buffer). We don't wait forever to prevent stalling the
@@ -773,6 +766,7 @@ struct curlFileTransfer : public FileTransfer
checkInterrupt();
std::string chunk;
+ FinishSink * sink = nullptr;
/* Grab data if available, otherwise wait for the download
thread to wake us up. */
@@ -783,6 +777,9 @@ struct curlFileTransfer : public FileTransfer
if (state->quit) {
if (state->exc) std::rethrow_exception(state->exc);
+ if (state->decompressor) {
+ state->decompressor->finish();
+ }
return;
}
@@ -792,6 +789,7 @@ struct curlFileTransfer : public FileTransfer
}
chunk = std::move(state->data);
+ sink = state->decompressor.get();
/* Reset state->data after the move, since we check data.empty() */
state->data = "";
@@ -802,7 +800,7 @@ struct curlFileTransfer : public FileTransfer
if it's blocked on a full buffer. We don't hold the state
lock while doing this to prevent blocking the download
thread if sink() takes a long time. */
- sink(chunk);
+ (*sink)(chunk);
}
}
};