aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/libstore/builtins/fetchurl.cc2
-rw-r--r--src/libstore/filetransfer.cc120
-rw-r--r--src/libstore/filetransfer.hh10
-rw-r--r--src/libstore/http-binary-cache-store.cc2
-rw-r--r--src/nix/prefetch.cc2
-rw-r--r--tests/unit/libstore/filetransfer.cc18
6 files changed, 99 insertions, 55 deletions
diff --git a/src/libstore/builtins/fetchurl.cc b/src/libstore/builtins/fetchurl.cc
index 6bf46dad8..37d640fe4 100644
--- a/src/libstore/builtins/fetchurl.cc
+++ b/src/libstore/builtins/fetchurl.cc
@@ -41,7 +41,7 @@ void builtinFetchurl(const BasicDerivation & drv, const std::string & netrcData)
auto decompressor = makeDecompressionSink(
unpack && mainUrl.ends_with(".xz") ? "xz" : "none", sink);
- fileTransfer->download(std::move(request), *decompressor);
+ fileTransfer->download(std::move(request))->drainInto(*decompressor);
decompressor->finish();
});
diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc
index 065f38a0c..fcb947f96 100644
--- a/src/libstore/filetransfer.cc
+++ b/src/libstore/filetransfer.cc
@@ -686,16 +686,8 @@ struct curlFileTransfer : public FileTransfer
->callback.get_future();
}
- void download(FileTransferRequest && request, Sink & sink) override
+ box_ptr<Source> download(FileTransferRequest && request) override
{
- /* Note: we can't call 'sink' via request.dataCallback, because
- that would cause the sink to execute on the fileTransfer
- thread. If 'sink' is a coroutine, this will fail. Also, if the
- sink is expensive (e.g. one that does decompression and writing
- to the Nix store), it would stall the download thread too much.
- Therefore we use a buffer to communicate data between the
- download thread and the calling thread. */
-
struct State {
bool done = false, failed = false;
std::exception_ptr exc;
@@ -705,13 +697,6 @@ struct curlFileTransfer : public FileTransfer
auto _state = std::make_shared<Sync<State>>();
- /* In case of an exception, wake up the download thread. */
- Finally finally([&]() {
- auto state(_state->lock());
- state->failed |= std::uncaught_exceptions() != 0;
- state->request.notify_one();
- });
-
enqueueFileTransfer(
request,
[_state](std::exception_ptr ex) {
@@ -750,50 +735,99 @@ struct curlFileTransfer : public FileTransfer
}
);
- std::unique_ptr<FinishSink> decompressor;
-
- while (true) {
- checkInterrupt();
-
+ struct InnerSource : Source
+ {
+ const std::shared_ptr<Sync<State>> _state;
std::string chunk;
+ std::string_view buffered;
+
+ explicit InnerSource(const std::shared_ptr<Sync<State>> & state) : _state(state) {}
- /* Grab data if available, otherwise wait for the download
- thread to wake us up. */
+ ~InnerSource()
{
+ // wake up the download thread if it's still going and have it abort
auto state(_state->lock());
+ state->failed |= !state->done;
+ state->request.notify_one();
+ }
- if (state->data.empty()) {
-
- if (state->done) {
- if (state->exc) std::rethrow_exception(state->exc);
- if (decompressor) {
- decompressor->finish();
+ void awaitData(Sync<State>::Lock & state)
+ {
+ /* Grab data if available, otherwise wait for the download
+ thread to wake us up. */
+ while (buffered.empty()) {
+ if (state->data.empty()) {
+ if (state->done) {
+ if (state->exc) {
+ std::rethrow_exception(state->exc);
+ }
+ return;
}
- return;
+
+ state.wait(state->avail);
}
- state.wait(state->avail);
+ chunk = std::move(state->data);
+ buffered = chunk;
+ state->request.notify_one();
+ }
+ }
- if (state->data.empty()) continue;
+ size_t read(char * data, size_t len) override
+ {
+ auto readPartial = [this](char * data, size_t len) {
+ const auto available = std::min(len, buffered.size());
+ memcpy(data, buffered.data(), available);
+ buffered.remove_prefix(available);
+ return available;
+ };
+ size_t total = readPartial(data, len);
+
+ while (total < len) {
+ {
+ auto state(_state->lock());
+ awaitData(state);
+ }
+ const auto current = readPartial(data + total, len - total);
+ total += current;
+ if (total == 0 || current == 0) {
+ break;
+ }
+ }
+
+ if (total == 0) {
+ throw EndOfFile("download finished");
}
- chunk = std::move(state->data);
- /* Reset state->data after the move, since we check data.empty() */
- state->data = "";
+ return total;
+ }
+ };
+
+ struct DownloadSource : Source
+ {
+ InnerSource inner;
+ std::unique_ptr<Source> decompressor;
+
+ explicit DownloadSource(const std::shared_ptr<Sync<State>> & state) : inner(state) {}
+
+ size_t read(char * data, size_t len) override
+ {
+ checkInterrupt();
if (!decompressor) {
- decompressor = makeDecompressionSink(state->encoding, sink);
+ auto state(inner._state->lock());
+ inner.awaitData(state);
+ decompressor = makeDecompressionSource(state->encoding, inner);
}
- state->request.notify_one();
+ return decompressor->read(data, len);
}
+ };
- /* Flush the data to the sink and wake up the download thread
- if it's blocked on a full buffer. We don't hold the state
- lock while doing this to prevent blocking the download
- thread if sink() takes a long time. */
- (*decompressor)(chunk);
- }
+ auto source = make_box_ptr<DownloadSource>(_state);
+ auto lock(_state->lock());
+ source->inner.awaitData(lock);
+ return source;
}
};
diff --git a/src/libstore/filetransfer.hh b/src/libstore/filetransfer.hh
index 5d739112b..b2ee66312 100644
--- a/src/libstore/filetransfer.hh
+++ b/src/libstore/filetransfer.hh
@@ -1,6 +1,7 @@
#pragma once
///@file
+#include "box_ptr.hh"
#include "logging.hh"
#include "serialise.hh"
#include "types.hh"
@@ -104,10 +105,13 @@ struct FileTransfer
FileTransferResult transfer(const FileTransferRequest & request);
/**
- * Download a file, writing its data to a sink. The sink will be
- * invoked on the thread of the caller.
+ * Download a file, returning its contents through a source. Will not return
+ * before the transfer has fully started, ensuring that any errors thrown by
+ * the setup phase (e.g. HTTP 404 or similar errors) are not postponed to be
+ * thrown by the returned source. The source will only throw errors detected
+ * during the transfer itself (decompression errors, connection drops, etc).
*/
- virtual void download(FileTransferRequest && request, Sink & sink) = 0;
+ virtual box_ptr<Source> download(FileTransferRequest && request) = 0;
enum Error { NotFound, Forbidden, Misc, Transient, Interrupted };
};
diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc
index f64da2569..06297e2eb 100644
--- a/src/libstore/http-binary-cache-store.cc
+++ b/src/libstore/http-binary-cache-store.cc
@@ -155,7 +155,7 @@ protected:
checkEnabled();
auto request(makeRequest(path));
try {
- getFileTransfer()->download(std::move(request), sink);
+ getFileTransfer()->download(std::move(request))->drainInto(sink);
} catch (FileTransferError & e) {
if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden)
throw NoSuchBinaryCacheFile("file '%s' does not exist in binary cache '%s'", path, getUri());
diff --git a/src/nix/prefetch.cc b/src/nix/prefetch.cc
index 2457e4cc8..cad70e726 100644
--- a/src/nix/prefetch.cc
+++ b/src/nix/prefetch.cc
@@ -98,7 +98,7 @@ std::tuple<StorePath, Hash> prefetchFile(
FdSink sink(fd.get());
FileTransferRequest req(url);
- getFileTransfer()->download(std::move(req), sink);
+ getFileTransfer()->download(std::move(req))->drainInto(sink);
}
/* Optionally unpack the file. */
diff --git a/tests/unit/libstore/filetransfer.cc b/tests/unit/libstore/filetransfer.cc
index b60963a46..6e8cf3bbe 100644
--- a/tests/unit/libstore/filetransfer.cc
+++ b/tests/unit/libstore/filetransfer.cc
@@ -7,6 +7,7 @@
#include <gtest/gtest.h>
#include <netinet/in.h>
#include <stdexcept>
+#include <string>
#include <string_view>
#include <sys/poll.h>
#include <sys/socket.h>
@@ -136,7 +137,7 @@ TEST(FileTransfer, exceptionAbortsDownload)
LambdaSink broken([](auto block) { throw Done(); });
- ASSERT_THROW(ft->download(FileTransferRequest("file:///dev/zero"), broken), Done);
+ ASSERT_THROW(ft->download(FileTransferRequest("file:///dev/zero"))->drainInto(broken), Done);
// makeFileTransfer returns a ref<>, which cannot be cleared. since we also
// can't default-construct it we'll have to overwrite it instead, but we'll
@@ -159,16 +160,21 @@ TEST(FileTransfer, NOT_ON_DARWIN(reportsSetupErrors))
FileTransferError);
}
-TEST(FileTransfer, NOT_ON_DARWIN(reportsTransferError))
+TEST(FileTransfer, NOT_ON_DARWIN(defersFailures))
{
- auto [port, srv] = serveHTTP("200 ok", "content-length: 100\r\n", [] {
+ auto [port, srv] = serveHTTP("200 ok", "content-length: 100000000\r\n", [] {
std::this_thread::sleep_for(10ms);
- return "";
+ // just a bunch of data to fill the curl wrapper buffer, otherwise the
+ // initial wait for header data will also wait for the the response to
+ // complete (the source is only woken when curl returns data, and curl
+ // might only do so once its internal buffer has already been filled.)
+ return std::string(1024 * 1024, ' ');
});
auto ft = makeFileTransfer();
FileTransferRequest req(fmt("http://[::1]:%d/index", port));
req.baseRetryTimeMs = 0;
- ASSERT_THROW(ft->transfer(req), FileTransferError);
+ auto src = ft->download(std::move(req));
+ ASSERT_THROW(src->drain(), FileTransferError);
}
TEST(FileTransfer, NOT_ON_DARWIN(handlesContentEncoding))
@@ -180,7 +186,7 @@ TEST(FileTransfer, NOT_ON_DARWIN(handlesContentEncoding))
auto ft = makeFileTransfer();
StringSink sink;
- ft->download(FileTransferRequest(fmt("http://[::1]:%d/index", port)), sink);
+ ft->download(FileTransferRequest(fmt("http://[::1]:%d/index", port)))->drainInto(sink);
EXPECT_EQ(sink.s, original);
}