diff options
-rw-r--r-- | src/libstore/download.cc | 92 | ||||
-rw-r--r-- | src/libstore/download.hh | 5 | ||||
-rw-r--r-- | src/libstore/http-binary-cache-store.cc | 22 |
3 files changed, 116 insertions, 3 deletions
diff --git a/src/libstore/download.cc b/src/libstore/download.cc index afb066e14..d450714ca 100644 --- a/src/libstore/download.cc +++ b/src/libstore/download.cc @@ -7,6 +7,7 @@ #include "s3.hh" #include "compression.hh" #include "pathlocks.hh" +#include "finally.hh" #ifdef ENABLE_S3 #include <aws/core/client/ClientConfiguration.h> @@ -137,7 +138,10 @@ struct CurlDownloader : public Downloader size_t writeCallback(void * contents, size_t size, size_t nmemb) { size_t realSize = size * nmemb; - result.data->append((char *) contents, realSize); + if (request.dataCallback) + request.dataCallback((char *) contents, realSize); + else + result.data->append((char *) contents, realSize); return realSize; } @@ -635,6 +639,92 @@ DownloadResult Downloader::download(const DownloadRequest & request) return enqueueDownload(request).get(); } +void Downloader::download(DownloadRequest && request, Sink & sink) +{ + /* Note: we can't call 'sink' via request.dataCallback, because + that would cause the sink to execute on the downloader + thread. If 'sink' is a coroutine, this will fail. Also, if the + sink is expensive (e.g. one that does decompression and writing + to the Nix store), it would stall the download thread too much. + Therefore we use a buffer to communicate data between the + download thread and the calling thread. */ + + struct State { + bool quit = false; + std::exception_ptr exc; + std::string data; + std::condition_variable avail, request; + }; + + auto _state = std::make_shared<Sync<State>>(); + + /* In case of an exception, wake up the download thread. FIXME: + abort the download request. */ + Finally finally([&]() { + auto state(_state->lock()); + state->quit = true; + state->request.notify_one(); + }); + + request.dataCallback = [_state](char * buf, size_t len) { + + auto state(_state->lock()); + + if (state->quit) return; + + /* If the buffer is full, then go to sleep until the calling + thread wakes us up (i.e. when it has removed data from the + buffer). Note: this does stall the download thread. */ + while (state->data.size() > 1024 * 1024) { + if (state->quit) return; + debug("download buffer is full; going to sleep"); + state.wait(state->request); + } + + /* Append data to the buffer and wake up the calling + thread. */ + state->data.append(buf, len); + state->avail.notify_one(); + }; + + enqueueDownload(request, + {[_state](std::future<DownloadResult> fut) { + auto state(_state->lock()); + state->quit = true; + try { + fut.get(); + } catch (...) { + state->exc = std::current_exception(); + } + state->avail.notify_one(); + state->request.notify_one(); + }}); + + auto state(_state->lock()); + + while (true) { + checkInterrupt(); + + if (state->quit) { + if (state->exc) std::rethrow_exception(state->exc); + break; + } + + /* If no data is available, then wait for the download thread + to wake us up. */ + if (state->data.empty()) + state.wait(state->avail); + + /* If data is available, then flush it to the sink and wake up + the download thread if it's blocked on a full buffer. */ + if (!state->data.empty()) { + sink((unsigned char *) state->data.data(), state->data.size()); + state->data.clear(); + state->request.notify_one(); + } + } +} + Path Downloader::downloadCached(ref<Store> store, const string & url_, bool unpack, string name, const Hash & expectedHash, string * effectiveUrl, int ttl) { auto url = resolveUri(url_); diff --git a/src/libstore/download.hh b/src/libstore/download.hh index 01940f544..f56274b23 100644 --- a/src/libstore/download.hh +++ b/src/libstore/download.hh @@ -21,6 +21,7 @@ struct DownloadRequest bool decompress = true; std::shared_ptr<std::string> data; std::string mimeType; + std::function<void(char *, size_t)> dataCallback; DownloadRequest(const std::string & uri) : uri(uri), parentAct(getCurActivity()) { } @@ -49,6 +50,10 @@ struct Downloader /* Synchronously download a file. */ DownloadResult download(const DownloadRequest & request); + /* Download a file, writing its data to a sink. The sink will be + invoked on the thread of the caller. */ + void download(DownloadRequest && request, Sink & sink); + /* Check if the specified file is already in ~/.cache/nix/tarballs and is more recent than ‘tarball-ttl’ seconds. Otherwise, use the recorded ETag to verify if the server has a more diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc index b8d670417..6fdae40e3 100644 --- a/src/libstore/http-binary-cache-store.cc +++ b/src/libstore/http-binary-cache-store.cc @@ -77,11 +77,29 @@ protected: } } - void getFile(const std::string & path, - Callback<std::shared_ptr<std::string>> callback) override + DownloadRequest makeRequest(const std::string & path) { DownloadRequest request(cacheUri + "/" + path); request.tries = 8; + return request; + } + + void getFile(const std::string & path, Sink & sink) override + { + auto request(makeRequest(path)); + try { + getDownloader()->download(std::move(request), sink); + } catch (DownloadError & e) { + if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) + throw NoSuchBinaryCacheFile("file '%s' does not exist in binary cache '%s'", path, getUri()); + throw; + } + } + + void getFile(const std::string & path, + Callback<std::shared_ptr<std::string>> callback) override + { + auto request(makeRequest(path)); getDownloader()->enqueueDownload(request, {[callback](std::future<DownloadResult> result) { |