aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEelco Dolstra <edolstra@gmail.com>2018-03-28 00:01:47 +0200
committerEelco Dolstra <edolstra@gmail.com>2018-05-30 13:42:29 +0200
commite87e4a60d617bffadfedf23032254130cdb4d54d (patch)
tree2d17899e41f6372d0b64e74b2c260170248d1acd
parent08ec757726e5ef47e71bf16ed0b252b288bcf0f3 (diff)
Make HttpBinaryCacheStore::narFromPath() run in constant memory
This reduces memory consumption of nix copy --from https://cache.nixos.org --to ~/my-nix /nix/store/95cwv4q54dc6giaqv6q6p4r02ia2km35-blender-2.79 from 176 MiB to 82 MiB. (The remaining memory is probably due to xz decompression overhead.) Issue https://github.com/NixOS/nix/issues/1681. Issue https://github.com/NixOS/nix/issues/1969.
-rw-r--r--src/libstore/download.cc92
-rw-r--r--src/libstore/download.hh5
-rw-r--r--src/libstore/http-binary-cache-store.cc22
3 files changed, 116 insertions, 3 deletions
diff --git a/src/libstore/download.cc b/src/libstore/download.cc
index afb066e14..d450714ca 100644
--- a/src/libstore/download.cc
+++ b/src/libstore/download.cc
@@ -7,6 +7,7 @@
#include "s3.hh"
#include "compression.hh"
#include "pathlocks.hh"
+#include "finally.hh"
#ifdef ENABLE_S3
#include <aws/core/client/ClientConfiguration.h>
@@ -137,7 +138,10 @@ struct CurlDownloader : public Downloader
size_t writeCallback(void * contents, size_t size, size_t nmemb)
{
size_t realSize = size * nmemb;
- result.data->append((char *) contents, realSize);
+ if (request.dataCallback)
+ request.dataCallback((char *) contents, realSize);
+ else
+ result.data->append((char *) contents, realSize);
return realSize;
}
@@ -635,6 +639,92 @@ DownloadResult Downloader::download(const DownloadRequest & request)
return enqueueDownload(request).get();
}
+void Downloader::download(DownloadRequest && request, Sink & sink)
+{
+ /* Note: we can't call 'sink' via request.dataCallback, because
+ that would cause the sink to execute on the downloader
+ thread. If 'sink' is a coroutine, this will fail. Also, if the
+ sink is expensive (e.g. one that does decompression and writing
+ to the Nix store), it would stall the download thread too much.
+ Therefore we use a buffer to communicate data between the
+ download thread and the calling thread. */
+
+ struct State {
+ bool quit = false;
+ std::exception_ptr exc;
+ std::string data;
+ std::condition_variable avail, request;
+ };
+
+ auto _state = std::make_shared<Sync<State>>();
+
+ /* In case of an exception, wake up the download thread. FIXME:
+ abort the download request. */
+ Finally finally([&]() {
+ auto state(_state->lock());
+ state->quit = true;
+ state->request.notify_one();
+ });
+
+ request.dataCallback = [_state](char * buf, size_t len) {
+
+ auto state(_state->lock());
+
+ if (state->quit) return;
+
+ /* If the buffer is full, then go to sleep until the calling
+ thread wakes us up (i.e. when it has removed data from the
+ buffer). Note: this does stall the download thread. */
+ while (state->data.size() > 1024 * 1024) {
+ if (state->quit) return;
+ debug("download buffer is full; going to sleep");
+ state.wait(state->request);
+ }
+
+ /* Append data to the buffer and wake up the calling
+ thread. */
+ state->data.append(buf, len);
+ state->avail.notify_one();
+ };
+
+ enqueueDownload(request,
+ {[_state](std::future<DownloadResult> fut) {
+ auto state(_state->lock());
+ state->quit = true;
+ try {
+ fut.get();
+ } catch (...) {
+ state->exc = std::current_exception();
+ }
+ state->avail.notify_one();
+ state->request.notify_one();
+ }});
+
+ auto state(_state->lock());
+
+ while (true) {
+ checkInterrupt();
+
+ if (state->quit) {
+ if (state->exc) std::rethrow_exception(state->exc);
+ break;
+ }
+
+ /* If no data is available, then wait for the download thread
+ to wake us up. */
+ if (state->data.empty())
+ state.wait(state->avail);
+
+ /* If data is available, then flush it to the sink and wake up
+ the download thread if it's blocked on a full buffer. */
+ if (!state->data.empty()) {
+ sink((unsigned char *) state->data.data(), state->data.size());
+ state->data.clear();
+ state->request.notify_one();
+ }
+ }
+}
+
Path Downloader::downloadCached(ref<Store> store, const string & url_, bool unpack, string name, const Hash & expectedHash, string * effectiveUrl, int ttl)
{
auto url = resolveUri(url_);
diff --git a/src/libstore/download.hh b/src/libstore/download.hh
index 01940f544..f56274b23 100644
--- a/src/libstore/download.hh
+++ b/src/libstore/download.hh
@@ -21,6 +21,7 @@ struct DownloadRequest
bool decompress = true;
std::shared_ptr<std::string> data;
std::string mimeType;
+ std::function<void(char *, size_t)> dataCallback;
DownloadRequest(const std::string & uri)
: uri(uri), parentAct(getCurActivity()) { }
@@ -49,6 +50,10 @@ struct Downloader
/* Synchronously download a file. */
DownloadResult download(const DownloadRequest & request);
+ /* Download a file, writing its data to a sink. The sink will be
+ invoked on the thread of the caller. */
+ void download(DownloadRequest && request, Sink & sink);
+
/* Check if the specified file is already in ~/.cache/nix/tarballs
and is more recent than ‘tarball-ttl’ seconds. Otherwise,
use the recorded ETag to verify if the server has a more
diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc
index b8d670417..6fdae40e3 100644
--- a/src/libstore/http-binary-cache-store.cc
+++ b/src/libstore/http-binary-cache-store.cc
@@ -77,11 +77,29 @@ protected:
}
}
- void getFile(const std::string & path,
- Callback<std::shared_ptr<std::string>> callback) override
+ DownloadRequest makeRequest(const std::string & path)
{
DownloadRequest request(cacheUri + "/" + path);
request.tries = 8;
+ return request;
+ }
+
+ void getFile(const std::string & path, Sink & sink) override
+ {
+ auto request(makeRequest(path));
+ try {
+ getDownloader()->download(std::move(request), sink);
+ } catch (DownloadError & e) {
+ if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden)
+ throw NoSuchBinaryCacheFile("file '%s' does not exist in binary cache '%s'", path, getUri());
+ throw;
+ }
+ }
+
+ void getFile(const std::string & path,
+ Callback<std::shared_ptr<std::string>> callback) override
+ {
+ auto request(makeRequest(path));
getDownloader()->enqueueDownload(request,
{[callback](std::future<DownloadResult> result) {