diff options
Diffstat (limited to 'src/libstore/filetransfer.cc')
-rw-r--r-- | src/libstore/filetransfer.cc | 120 |
1 files changed, 77 insertions, 43 deletions
diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index 065f38a0c..fcb947f96 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -686,16 +686,8 @@ struct curlFileTransfer : public FileTransfer ->callback.get_future(); } - void download(FileTransferRequest && request, Sink & sink) override + box_ptr<Source> download(FileTransferRequest && request) override { - /* Note: we can't call 'sink' via request.dataCallback, because - that would cause the sink to execute on the fileTransfer - thread. If 'sink' is a coroutine, this will fail. Also, if the - sink is expensive (e.g. one that does decompression and writing - to the Nix store), it would stall the download thread too much. - Therefore we use a buffer to communicate data between the - download thread and the calling thread. */ - struct State { bool done = false, failed = false; std::exception_ptr exc; @@ -705,13 +697,6 @@ struct curlFileTransfer : public FileTransfer auto _state = std::make_shared<Sync<State>>(); - /* In case of an exception, wake up the download thread. */ - Finally finally([&]() { - auto state(_state->lock()); - state->failed |= std::uncaught_exceptions() != 0; - state->request.notify_one(); - }); - enqueueFileTransfer( request, [_state](std::exception_ptr ex) { @@ -750,50 +735,99 @@ struct curlFileTransfer : public FileTransfer } ); - std::unique_ptr<FinishSink> decompressor; - - while (true) { - checkInterrupt(); - + struct InnerSource : Source + { + const std::shared_ptr<Sync<State>> _state; std::string chunk; + std::string_view buffered; + + explicit InnerSource(const std::shared_ptr<Sync<State>> & state) : _state(state) {} - /* Grab data if available, otherwise wait for the download - thread to wake us up. */ + ~InnerSource() { + // wake up the download thread if it's still going and have it abort auto state(_state->lock()); + state->failed |= !state->done; + state->request.notify_one(); + } - if (state->data.empty()) { - - if (state->done) { - if (state->exc) std::rethrow_exception(state->exc); - if (decompressor) { - decompressor->finish(); + void awaitData(Sync<State>::Lock & state) + { + /* Grab data if available, otherwise wait for the download + thread to wake us up. */ + while (buffered.empty()) { + if (state->data.empty()) { + if (state->done) { + if (state->exc) { + std::rethrow_exception(state->exc); + } + return; } - return; + + state.wait(state->avail); } - state.wait(state->avail); + chunk = std::move(state->data); + buffered = chunk; + state->request.notify_one(); + } + } - if (state->data.empty()) continue; + size_t read(char * data, size_t len) override + { + auto readPartial = [this](char * data, size_t len) { + const auto available = std::min(len, buffered.size()); + memcpy(data, buffered.data(), available); + buffered.remove_prefix(available); + return available; + }; + size_t total = readPartial(data, len); + + while (total < len) { + { + auto state(_state->lock()); + awaitData(state); + } + const auto current = readPartial(data + total, len - total); + total += current; + if (total == 0 || current == 0) { + break; + } + } + + if (total == 0) { + throw EndOfFile("download finished"); } - chunk = std::move(state->data); - /* Reset state->data after the move, since we check data.empty() */ - state->data = ""; + return total; + } + }; + + struct DownloadSource : Source + { + InnerSource inner; + std::unique_ptr<Source> decompressor; + + explicit DownloadSource(const std::shared_ptr<Sync<State>> & state) : inner(state) {} + + size_t read(char * data, size_t len) override + { + checkInterrupt(); if (!decompressor) { - decompressor = makeDecompressionSink(state->encoding, sink); + auto state(inner._state->lock()); + inner.awaitData(state); + decompressor = makeDecompressionSource(state->encoding, inner); } - state->request.notify_one(); + return decompressor->read(data, len); } + }; - /* Flush the data to the sink and wake up the download thread - if it's blocked on a full buffer. We don't hold the state - lock while doing this to prevent blocking the download - thread if sink() takes a long time. */ - (*decompressor)(chunk); - } + auto source = make_box_ptr<DownloadSource>(_state); + auto lock(_state->lock()); + source->inner.awaitData(lock); + return source; } }; |