aboutsummaryrefslogtreecommitdiff
path: root/src/libstore
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstore')
-rw-r--r--src/libstore/builtins/fetchurl.cc2
-rw-r--r--src/libstore/filetransfer.cc120
-rw-r--r--src/libstore/filetransfer.hh10
-rw-r--r--src/libstore/http-binary-cache-store.cc2
4 files changed, 86 insertions, 48 deletions
diff --git a/src/libstore/builtins/fetchurl.cc b/src/libstore/builtins/fetchurl.cc
index 6bf46dad8..37d640fe4 100644
--- a/src/libstore/builtins/fetchurl.cc
+++ b/src/libstore/builtins/fetchurl.cc
@@ -41,7 +41,7 @@ void builtinFetchurl(const BasicDerivation & drv, const std::string & netrcData)
auto decompressor = makeDecompressionSink(
unpack && mainUrl.ends_with(".xz") ? "xz" : "none", sink);
- fileTransfer->download(std::move(request), *decompressor);
+ fileTransfer->download(std::move(request))->drainInto(*decompressor);
decompressor->finish();
});
diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc
index 065f38a0c..fcb947f96 100644
--- a/src/libstore/filetransfer.cc
+++ b/src/libstore/filetransfer.cc
@@ -686,16 +686,8 @@ struct curlFileTransfer : public FileTransfer
->callback.get_future();
}
- void download(FileTransferRequest && request, Sink & sink) override
+ box_ptr<Source> download(FileTransferRequest && request) override
{
- /* Note: we can't call 'sink' via request.dataCallback, because
- that would cause the sink to execute on the fileTransfer
- thread. If 'sink' is a coroutine, this will fail. Also, if the
- sink is expensive (e.g. one that does decompression and writing
- to the Nix store), it would stall the download thread too much.
- Therefore we use a buffer to communicate data between the
- download thread and the calling thread. */
-
struct State {
bool done = false, failed = false;
std::exception_ptr exc;
@@ -705,13 +697,6 @@ struct curlFileTransfer : public FileTransfer
auto _state = std::make_shared<Sync<State>>();
- /* In case of an exception, wake up the download thread. */
- Finally finally([&]() {
- auto state(_state->lock());
- state->failed |= std::uncaught_exceptions() != 0;
- state->request.notify_one();
- });
-
enqueueFileTransfer(
request,
[_state](std::exception_ptr ex) {
@@ -750,50 +735,99 @@ struct curlFileTransfer : public FileTransfer
}
);
- std::unique_ptr<FinishSink> decompressor;
-
- while (true) {
- checkInterrupt();
-
+ struct InnerSource : Source
+ {
+ const std::shared_ptr<Sync<State>> _state;
std::string chunk;
+ std::string_view buffered;
+
+ explicit InnerSource(const std::shared_ptr<Sync<State>> & state) : _state(state) {}
- /* Grab data if available, otherwise wait for the download
- thread to wake us up. */
+ ~InnerSource()
{
+ // wake up the download thread if it's still going and have it abort
auto state(_state->lock());
+ state->failed |= !state->done;
+ state->request.notify_one();
+ }
- if (state->data.empty()) {
-
- if (state->done) {
- if (state->exc) std::rethrow_exception(state->exc);
- if (decompressor) {
- decompressor->finish();
+ void awaitData(Sync<State>::Lock & state)
+ {
+ /* Grab data if available, otherwise wait for the download
+ thread to wake us up. */
+ while (buffered.empty()) {
+ if (state->data.empty()) {
+ if (state->done) {
+ if (state->exc) {
+ std::rethrow_exception(state->exc);
+ }
+ return;
}
- return;
+
+ state.wait(state->avail);
}
- state.wait(state->avail);
+ chunk = std::move(state->data);
+ buffered = chunk;
+ state->request.notify_one();
+ }
+ }
- if (state->data.empty()) continue;
+ size_t read(char * data, size_t len) override
+ {
+ auto readPartial = [this](char * data, size_t len) {
+ const auto available = std::min(len, buffered.size());
+ memcpy(data, buffered.data(), available);
+ buffered.remove_prefix(available);
+ return available;
+ };
+ size_t total = readPartial(data, len);
+
+ while (total < len) {
+ {
+ auto state(_state->lock());
+ awaitData(state);
+ }
+ const auto current = readPartial(data + total, len - total);
+ total += current;
+ if (total == 0 || current == 0) {
+ break;
+ }
+ }
+
+ if (total == 0) {
+ throw EndOfFile("download finished");
}
- chunk = std::move(state->data);
- /* Reset state->data after the move, since we check data.empty() */
- state->data = "";
+ return total;
+ }
+ };
+
+ struct DownloadSource : Source
+ {
+ InnerSource inner;
+ std::unique_ptr<Source> decompressor;
+
+ explicit DownloadSource(const std::shared_ptr<Sync<State>> & state) : inner(state) {}
+
+ size_t read(char * data, size_t len) override
+ {
+ checkInterrupt();
if (!decompressor) {
- decompressor = makeDecompressionSink(state->encoding, sink);
+ auto state(inner._state->lock());
+ inner.awaitData(state);
+ decompressor = makeDecompressionSource(state->encoding, inner);
}
- state->request.notify_one();
+ return decompressor->read(data, len);
}
+ };
- /* Flush the data to the sink and wake up the download thread
- if it's blocked on a full buffer. We don't hold the state
- lock while doing this to prevent blocking the download
- thread if sink() takes a long time. */
- (*decompressor)(chunk);
- }
+ auto source = make_box_ptr<DownloadSource>(_state);
+ auto lock(_state->lock());
+ source->inner.awaitData(lock);
+ return source;
}
};
diff --git a/src/libstore/filetransfer.hh b/src/libstore/filetransfer.hh
index 5d739112b..b2ee66312 100644
--- a/src/libstore/filetransfer.hh
+++ b/src/libstore/filetransfer.hh
@@ -1,6 +1,7 @@
#pragma once
///@file
+#include "box_ptr.hh"
#include "logging.hh"
#include "serialise.hh"
#include "types.hh"
@@ -104,10 +105,13 @@ struct FileTransfer
FileTransferResult transfer(const FileTransferRequest & request);
/**
- * Download a file, writing its data to a sink. The sink will be
- * invoked on the thread of the caller.
+ * Download a file, returning its contents through a source. Will not return
+ * before the transfer has fully started, ensuring that any errors thrown by
+ * the setup phase (e.g. HTTP 404 or similar errors) are not postponed to be
+ * thrown by the returned source. The source will only throw errors detected
+ * during the transfer itself (decompression errors, connection drops, etc).
*/
- virtual void download(FileTransferRequest && request, Sink & sink) = 0;
+ virtual box_ptr<Source> download(FileTransferRequest && request) = 0;
enum Error { NotFound, Forbidden, Misc, Transient, Interrupted };
};
diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc
index f64da2569..06297e2eb 100644
--- a/src/libstore/http-binary-cache-store.cc
+++ b/src/libstore/http-binary-cache-store.cc
@@ -155,7 +155,7 @@ protected:
checkEnabled();
auto request(makeRequest(path));
try {
- getFileTransfer()->download(std::move(request), sink);
+ getFileTransfer()->download(std::move(request))->drainInto(sink);
} catch (FileTransferError & e) {
if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden)
throw NoSuchBinaryCacheFile("file '%s' does not exist in binary cache '%s'", path, getUri());