aboutsummaryrefslogtreecommitdiff
path: root/src/libstore
diff options
context:
space:
mode:
authoreldritch horrors <pennae@lix.systems>2024-05-09 21:22:48 +0200
committereldritch horrors <pennae@lix.systems>2024-06-19 10:50:12 +0000
commitc55dcc6c13b864dc613a0a6ba51e0b897868f4b4 (patch)
tree2c588cf6dd0662b9b8875e9dd7802b564e9306c5 /src/libstore
parent11f4a5bc7eca8a4cca2ae9f3d83b69cd497933f8 (diff)
filetransfer: return a Source from download()
without this we will not be able to get rid of makeDecompressionSink, which in turn will be necessary to get rid of sourceToSink (since the libarchive archive wrapper *must* be a Source due to api limitations) Change-Id: Iccd3d333ba2cbcab49cb5a1d3125624de16bce27
Diffstat (limited to 'src/libstore')
-rw-r--r--src/libstore/builtins/fetchurl.cc2
-rw-r--r--src/libstore/filetransfer.cc120
-rw-r--r--src/libstore/filetransfer.hh10
-rw-r--r--src/libstore/http-binary-cache-store.cc2
4 files changed, 86 insertions, 48 deletions
diff --git a/src/libstore/builtins/fetchurl.cc b/src/libstore/builtins/fetchurl.cc
index 6bf46dad8..37d640fe4 100644
--- a/src/libstore/builtins/fetchurl.cc
+++ b/src/libstore/builtins/fetchurl.cc
@@ -41,7 +41,7 @@ void builtinFetchurl(const BasicDerivation & drv, const std::string & netrcData)
auto decompressor = makeDecompressionSink(
unpack && mainUrl.ends_with(".xz") ? "xz" : "none", sink);
- fileTransfer->download(std::move(request), *decompressor);
+ fileTransfer->download(std::move(request))->drainInto(*decompressor);
decompressor->finish();
});
diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc
index 065f38a0c..fcb947f96 100644
--- a/src/libstore/filetransfer.cc
+++ b/src/libstore/filetransfer.cc
@@ -686,16 +686,8 @@ struct curlFileTransfer : public FileTransfer
->callback.get_future();
}
- void download(FileTransferRequest && request, Sink & sink) override
+ box_ptr<Source> download(FileTransferRequest && request) override
{
- /* Note: we can't call 'sink' via request.dataCallback, because
- that would cause the sink to execute on the fileTransfer
- thread. If 'sink' is a coroutine, this will fail. Also, if the
- sink is expensive (e.g. one that does decompression and writing
- to the Nix store), it would stall the download thread too much.
- Therefore we use a buffer to communicate data between the
- download thread and the calling thread. */
-
struct State {
bool done = false, failed = false;
std::exception_ptr exc;
@@ -705,13 +697,6 @@ struct curlFileTransfer : public FileTransfer
auto _state = std::make_shared<Sync<State>>();
- /* In case of an exception, wake up the download thread. */
- Finally finally([&]() {
- auto state(_state->lock());
- state->failed |= std::uncaught_exceptions() != 0;
- state->request.notify_one();
- });
-
enqueueFileTransfer(
request,
[_state](std::exception_ptr ex) {
@@ -750,50 +735,99 @@ struct curlFileTransfer : public FileTransfer
}
);
- std::unique_ptr<FinishSink> decompressor;
-
- while (true) {
- checkInterrupt();
-
+ struct InnerSource : Source
+ {
+ const std::shared_ptr<Sync<State>> _state;
std::string chunk;
+ std::string_view buffered;
+
+ explicit InnerSource(const std::shared_ptr<Sync<State>> & state) : _state(state) {}
- /* Grab data if available, otherwise wait for the download
- thread to wake us up. */
+ ~InnerSource()
{
+ // wake up the download thread if it's still going and have it abort
auto state(_state->lock());
+ state->failed |= !state->done;
+ state->request.notify_one();
+ }
- if (state->data.empty()) {
-
- if (state->done) {
- if (state->exc) std::rethrow_exception(state->exc);
- if (decompressor) {
- decompressor->finish();
+ void awaitData(Sync<State>::Lock & state)
+ {
+ /* Grab data if available, otherwise wait for the download
+ thread to wake us up. */
+ while (buffered.empty()) {
+ if (state->data.empty()) {
+ if (state->done) {
+ if (state->exc) {
+ std::rethrow_exception(state->exc);
+ }
+ return;
}
- return;
+
+ state.wait(state->avail);
}
- state.wait(state->avail);
+ chunk = std::move(state->data);
+ buffered = chunk;
+ state->request.notify_one();
+ }
+ }
- if (state->data.empty()) continue;
+ size_t read(char * data, size_t len) override
+ {
+ auto readPartial = [this](char * data, size_t len) {
+ const auto available = std::min(len, buffered.size());
+ memcpy(data, buffered.data(), available);
+ buffered.remove_prefix(available);
+ return available;
+ };
+ size_t total = readPartial(data, len);
+
+ while (total < len) {
+ {
+ auto state(_state->lock());
+ awaitData(state);
+ }
+ const auto current = readPartial(data + total, len - total);
+ total += current;
+ if (total == 0 || current == 0) {
+ break;
+ }
+ }
+
+ if (total == 0) {
+ throw EndOfFile("download finished");
}
- chunk = std::move(state->data);
- /* Reset state->data after the move, since we check data.empty() */
- state->data = "";
+ return total;
+ }
+ };
+
+ struct DownloadSource : Source
+ {
+ InnerSource inner;
+ std::unique_ptr<Source> decompressor;
+
+ explicit DownloadSource(const std::shared_ptr<Sync<State>> & state) : inner(state) {}
+
+ size_t read(char * data, size_t len) override
+ {
+ checkInterrupt();
if (!decompressor) {
- decompressor = makeDecompressionSink(state->encoding, sink);
+ auto state(inner._state->lock());
+ inner.awaitData(state);
+ decompressor = makeDecompressionSource(state->encoding, inner);
}
- state->request.notify_one();
+ return decompressor->read(data, len);
}
+ };
- /* Flush the data to the sink and wake up the download thread
- if it's blocked on a full buffer. We don't hold the state
- lock while doing this to prevent blocking the download
- thread if sink() takes a long time. */
- (*decompressor)(chunk);
- }
+ auto source = make_box_ptr<DownloadSource>(_state);
+ auto lock(_state->lock());
+ source->inner.awaitData(lock);
+ return source;
}
};
diff --git a/src/libstore/filetransfer.hh b/src/libstore/filetransfer.hh
index 5d739112b..b2ee66312 100644
--- a/src/libstore/filetransfer.hh
+++ b/src/libstore/filetransfer.hh
@@ -1,6 +1,7 @@
#pragma once
///@file
+#include "box_ptr.hh"
#include "logging.hh"
#include "serialise.hh"
#include "types.hh"
@@ -104,10 +105,13 @@ struct FileTransfer
FileTransferResult transfer(const FileTransferRequest & request);
/**
- * Download a file, writing its data to a sink. The sink will be
- * invoked on the thread of the caller.
+ * Download a file, returning its contents through a source. Will not return
+ * before the transfer has fully started, ensuring that any errors thrown by
+ * the setup phase (e.g. HTTP 404 or similar errors) are not postponed to be
+ * thrown by the returned source. The source will only throw errors detected
+ * during the transfer itself (decompression errors, connection drops, etc).
*/
- virtual void download(FileTransferRequest && request, Sink & sink) = 0;
+ virtual box_ptr<Source> download(FileTransferRequest && request) = 0;
enum Error { NotFound, Forbidden, Misc, Transient, Interrupted };
};
diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc
index f64da2569..06297e2eb 100644
--- a/src/libstore/http-binary-cache-store.cc
+++ b/src/libstore/http-binary-cache-store.cc
@@ -155,7 +155,7 @@ protected:
checkEnabled();
auto request(makeRequest(path));
try {
- getFileTransfer()->download(std::move(request), sink);
+ getFileTransfer()->download(std::move(request))->drainInto(sink);
} catch (FileTransferError & e) {
if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden)
throw NoSuchBinaryCacheFile("file '%s' does not exist in binary cache '%s'", path, getUri());