diff options
-rw-r--r-- | src/libstore/builtins/fetchurl.cc | 2 | ||||
-rw-r--r-- | src/libstore/filetransfer.cc | 120 | ||||
-rw-r--r-- | src/libstore/filetransfer.hh | 10 | ||||
-rw-r--r-- | src/libstore/http-binary-cache-store.cc | 2 | ||||
-rw-r--r-- | src/nix/prefetch.cc | 2 | ||||
-rw-r--r-- | tests/unit/libstore/filetransfer.cc | 18 |
6 files changed, 99 insertions, 55 deletions
diff --git a/src/libstore/builtins/fetchurl.cc b/src/libstore/builtins/fetchurl.cc index 6bf46dad8..37d640fe4 100644 --- a/src/libstore/builtins/fetchurl.cc +++ b/src/libstore/builtins/fetchurl.cc @@ -41,7 +41,7 @@ void builtinFetchurl(const BasicDerivation & drv, const std::string & netrcData) auto decompressor = makeDecompressionSink( unpack && mainUrl.ends_with(".xz") ? "xz" : "none", sink); - fileTransfer->download(std::move(request), *decompressor); + fileTransfer->download(std::move(request))->drainInto(*decompressor); decompressor->finish(); }); diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index 065f38a0c..fcb947f96 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -686,16 +686,8 @@ struct curlFileTransfer : public FileTransfer ->callback.get_future(); } - void download(FileTransferRequest && request, Sink & sink) override + box_ptr<Source> download(FileTransferRequest && request) override { - /* Note: we can't call 'sink' via request.dataCallback, because - that would cause the sink to execute on the fileTransfer - thread. If 'sink' is a coroutine, this will fail. Also, if the - sink is expensive (e.g. one that does decompression and writing - to the Nix store), it would stall the download thread too much. - Therefore we use a buffer to communicate data between the - download thread and the calling thread. */ - struct State { bool done = false, failed = false; std::exception_ptr exc; @@ -705,13 +697,6 @@ struct curlFileTransfer : public FileTransfer auto _state = std::make_shared<Sync<State>>(); - /* In case of an exception, wake up the download thread. */ - Finally finally([&]() { - auto state(_state->lock()); - state->failed |= std::uncaught_exceptions() != 0; - state->request.notify_one(); - }); - enqueueFileTransfer( request, [_state](std::exception_ptr ex) { @@ -750,50 +735,99 @@ struct curlFileTransfer : public FileTransfer } ); - std::unique_ptr<FinishSink> decompressor; - - while (true) { - checkInterrupt(); - + struct InnerSource : Source + { + const std::shared_ptr<Sync<State>> _state; std::string chunk; + std::string_view buffered; + + explicit InnerSource(const std::shared_ptr<Sync<State>> & state) : _state(state) {} - /* Grab data if available, otherwise wait for the download - thread to wake us up. */ + ~InnerSource() { + // wake up the download thread if it's still going and have it abort auto state(_state->lock()); + state->failed |= !state->done; + state->request.notify_one(); + } - if (state->data.empty()) { - - if (state->done) { - if (state->exc) std::rethrow_exception(state->exc); - if (decompressor) { - decompressor->finish(); + void awaitData(Sync<State>::Lock & state) + { + /* Grab data if available, otherwise wait for the download + thread to wake us up. */ + while (buffered.empty()) { + if (state->data.empty()) { + if (state->done) { + if (state->exc) { + std::rethrow_exception(state->exc); + } + return; } - return; + + state.wait(state->avail); } - state.wait(state->avail); + chunk = std::move(state->data); + buffered = chunk; + state->request.notify_one(); + } + } - if (state->data.empty()) continue; + size_t read(char * data, size_t len) override + { + auto readPartial = [this](char * data, size_t len) { + const auto available = std::min(len, buffered.size()); + memcpy(data, buffered.data(), available); + buffered.remove_prefix(available); + return available; + }; + size_t total = readPartial(data, len); + + while (total < len) { + { + auto state(_state->lock()); + awaitData(state); + } + const auto current = readPartial(data + total, len - total); + total += current; + if (total == 0 || current == 0) { + break; + } + } + + if (total == 0) { + throw EndOfFile("download finished"); } - chunk = std::move(state->data); - /* Reset state->data after the move, since we check data.empty() */ - state->data = ""; + return total; + } + }; + + struct DownloadSource : Source + { + InnerSource inner; + std::unique_ptr<Source> decompressor; + + explicit DownloadSource(const std::shared_ptr<Sync<State>> & state) : inner(state) {} + + size_t read(char * data, size_t len) override + { + checkInterrupt(); if (!decompressor) { - decompressor = makeDecompressionSink(state->encoding, sink); + auto state(inner._state->lock()); + inner.awaitData(state); + decompressor = makeDecompressionSource(state->encoding, inner); } - state->request.notify_one(); + return decompressor->read(data, len); } + }; - /* Flush the data to the sink and wake up the download thread - if it's blocked on a full buffer. We don't hold the state - lock while doing this to prevent blocking the download - thread if sink() takes a long time. */ - (*decompressor)(chunk); - } + auto source = make_box_ptr<DownloadSource>(_state); + auto lock(_state->lock()); + source->inner.awaitData(lock); + return source; } }; diff --git a/src/libstore/filetransfer.hh b/src/libstore/filetransfer.hh index 5d739112b..b2ee66312 100644 --- a/src/libstore/filetransfer.hh +++ b/src/libstore/filetransfer.hh @@ -1,6 +1,7 @@ #pragma once ///@file +#include "box_ptr.hh" #include "logging.hh" #include "serialise.hh" #include "types.hh" @@ -104,10 +105,13 @@ struct FileTransfer FileTransferResult transfer(const FileTransferRequest & request); /** - * Download a file, writing its data to a sink. The sink will be - * invoked on the thread of the caller. + * Download a file, returning its contents through a source. Will not return + * before the transfer has fully started, ensuring that any errors thrown by + * the setup phase (e.g. HTTP 404 or similar errors) are not postponed to be + * thrown by the returned source. The source will only throw errors detected + * during the transfer itself (decompression errors, connection drops, etc). */ - virtual void download(FileTransferRequest && request, Sink & sink) = 0; + virtual box_ptr<Source> download(FileTransferRequest && request) = 0; enum Error { NotFound, Forbidden, Misc, Transient, Interrupted }; }; diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc index f64da2569..06297e2eb 100644 --- a/src/libstore/http-binary-cache-store.cc +++ b/src/libstore/http-binary-cache-store.cc @@ -155,7 +155,7 @@ protected: checkEnabled(); auto request(makeRequest(path)); try { - getFileTransfer()->download(std::move(request), sink); + getFileTransfer()->download(std::move(request))->drainInto(sink); } catch (FileTransferError & e) { if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden) throw NoSuchBinaryCacheFile("file '%s' does not exist in binary cache '%s'", path, getUri()); diff --git a/src/nix/prefetch.cc b/src/nix/prefetch.cc index 2457e4cc8..cad70e726 100644 --- a/src/nix/prefetch.cc +++ b/src/nix/prefetch.cc @@ -98,7 +98,7 @@ std::tuple<StorePath, Hash> prefetchFile( FdSink sink(fd.get()); FileTransferRequest req(url); - getFileTransfer()->download(std::move(req), sink); + getFileTransfer()->download(std::move(req))->drainInto(sink); } /* Optionally unpack the file. */ diff --git a/tests/unit/libstore/filetransfer.cc b/tests/unit/libstore/filetransfer.cc index b60963a46..6e8cf3bbe 100644 --- a/tests/unit/libstore/filetransfer.cc +++ b/tests/unit/libstore/filetransfer.cc @@ -7,6 +7,7 @@ #include <gtest/gtest.h> #include <netinet/in.h> #include <stdexcept> +#include <string> #include <string_view> #include <sys/poll.h> #include <sys/socket.h> @@ -136,7 +137,7 @@ TEST(FileTransfer, exceptionAbortsDownload) LambdaSink broken([](auto block) { throw Done(); }); - ASSERT_THROW(ft->download(FileTransferRequest("file:///dev/zero"), broken), Done); + ASSERT_THROW(ft->download(FileTransferRequest("file:///dev/zero"))->drainInto(broken), Done); // makeFileTransfer returns a ref<>, which cannot be cleared. since we also // can't default-construct it we'll have to overwrite it instead, but we'll @@ -159,16 +160,21 @@ TEST(FileTransfer, NOT_ON_DARWIN(reportsSetupErrors)) FileTransferError); } -TEST(FileTransfer, NOT_ON_DARWIN(reportsTransferError)) +TEST(FileTransfer, NOT_ON_DARWIN(defersFailures)) { - auto [port, srv] = serveHTTP("200 ok", "content-length: 100\r\n", [] { + auto [port, srv] = serveHTTP("200 ok", "content-length: 100000000\r\n", [] { std::this_thread::sleep_for(10ms); - return ""; + // just a bunch of data to fill the curl wrapper buffer, otherwise the + // initial wait for header data will also wait for the the response to + // complete (the source is only woken when curl returns data, and curl + // might only do so once its internal buffer has already been filled.) + return std::string(1024 * 1024, ' '); }); auto ft = makeFileTransfer(); FileTransferRequest req(fmt("http://[::1]:%d/index", port)); req.baseRetryTimeMs = 0; - ASSERT_THROW(ft->transfer(req), FileTransferError); + auto src = ft->download(std::move(req)); + ASSERT_THROW(src->drain(), FileTransferError); } TEST(FileTransfer, NOT_ON_DARWIN(handlesContentEncoding)) @@ -180,7 +186,7 @@ TEST(FileTransfer, NOT_ON_DARWIN(handlesContentEncoding)) auto ft = makeFileTransfer(); StringSink sink; - ft->download(FileTransferRequest(fmt("http://[::1]:%d/index", port)), sink); + ft->download(FileTransferRequest(fmt("http://[::1]:%d/index", port)))->drainInto(sink); EXPECT_EQ(sink.s, original); } |