diff options
author | Eelco Dolstra <eelco.dolstra@logicblox.com> | 2016-09-16 18:54:14 +0200 |
---|---|---|
committer | Eelco Dolstra <eelco.dolstra@logicblox.com> | 2016-09-16 18:54:14 +0200 |
commit | 75989bdca773eedb8b8d1cc8a7675900358acd25 (patch) | |
tree | 2d1dce1431662f441cead67d8754e96eb4db6807 /src/libstore/download.cc | |
parent | 054be5025762c5e1c7e853c4fa5d7eed8da1727f (diff) |
Make computeFSClosure() single-threaded again
The fact that queryPathInfo() is synchronous meant that we needed a
thread for every concurrent binary cache lookup, even though they end
up being handled by the same download thread. Requiring hundreds of
threads is not a good idea. So now there is an asynchronous version of
queryPathInfo() that takes a callback function to process the
result. Similarly, enqueueDownload() now takes a callback rather than
returning a future.
Thus, a command like
nix path-info --store https://cache.nixos.org/ -r /nix/store/slljrzwmpygy1daay14kjszsr9xix063-nixos-16.09beta231.dccf8c5
that returns 4941 paths now takes 1.87s using only 2 threads (the main
thread and the downloader thread). (This is with a prewarmed
CloudFront.)
Diffstat (limited to 'src/libstore/download.cc')
-rw-r--r-- | src/libstore/download.cc | 38 |
1 files changed, 27 insertions, 11 deletions
diff --git a/src/libstore/download.cc b/src/libstore/download.cc index b2d223da9..ca324595a 100644 --- a/src/libstore/download.cc +++ b/src/libstore/download.cc @@ -47,8 +47,9 @@ struct CurlDownloader : public Downloader CurlDownloader & downloader; DownloadRequest request; DownloadResult result; - bool done = false; // whether the promise has been set - std::promise<DownloadResult> promise; + bool done = false; // whether either the success or failure function has been called + std::function<void(const DownloadResult &)> success; + std::function<void(std::exception_ptr exc)> failure; CURL * req = 0; bool active = false; // whether the handle has been added to the multi object std::string status; @@ -86,7 +87,7 @@ struct CurlDownloader : public Downloader if (requestHeaders) curl_slist_free_all(requestHeaders); try { if (!done) - fail(DownloadError(Transient, format("download of ‘%s’ was interrupted") % request.uri)); + fail(DownloadError(Interrupted, format("download of ‘%s’ was interrupted") % request.uri)); } catch (...) { ignoreException(); } @@ -95,8 +96,9 @@ struct CurlDownloader : public Downloader template<class T> void fail(const T & e) { - promise.set_exception(std::make_exception_ptr(e)); + assert(!done); done = true; + failure(std::make_exception_ptr(e)); } size_t writeCallback(void * contents, size_t size, size_t nmemb) @@ -239,7 +241,7 @@ struct CurlDownloader : public Downloader (httpStatus == 200 || httpStatus == 304 || httpStatus == 226 /* FTP */ || httpStatus == 0 /* other protocol */)) { result.cached = httpStatus == 304; - promise.set_value(result); + success(result); done = true; } else { Error err = @@ -253,9 +255,11 @@ struct CurlDownloader : public Downloader attempt++; auto exc = - httpStatus != 0 - ? DownloadError(err, format("unable to download ‘%s’: HTTP error %d") % request.uri % httpStatus) - : DownloadError(err, format("unable to download ‘%s’: %s (%d)") % request.uri % curl_easy_strerror(code) % code); + code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted + ? DownloadError(Interrupted, format("download of ‘%s’ was interrupted") % request.uri) + : httpStatus != 0 + ? DownloadError(err, format("unable to download ‘%s’: HTTP error %d") % request.uri % httpStatus) + : DownloadError(err, format("unable to download ‘%s’: %s (%d)") % request.uri % curl_easy_strerror(code) % code); /* If this is a transient error, then maybe retry the download after a while. */ @@ -414,7 +418,7 @@ struct CurlDownloader : public Downloader { try { workerThreadMain(); - } catch (Interrupted & e) { + } catch (nix::Interrupted & e) { } catch (std::exception & e) { printMsg(lvlError, format("unexpected error in download thread: %s") % e.what()); } @@ -437,11 +441,14 @@ struct CurlDownloader : public Downloader writeFull(wakeupPipe.writeSide.get(), " "); } - std::future<DownloadResult> enqueueDownload(const DownloadRequest & request) override + void enqueueDownload(const DownloadRequest & request, + std::function<void(const DownloadResult &)> success, + std::function<void(std::exception_ptr exc)> failure) override { auto item = std::make_shared<DownloadItem>(*this, request); + item->success = success; + item->failure = failure; enqueueItem(item); - return item->promise.get_future(); } }; @@ -458,6 +465,15 @@ ref<Downloader> makeDownloader() return make_ref<CurlDownloader>(); } +std::future<DownloadResult> Downloader::enqueueDownload(const DownloadRequest & request) +{ + auto promise = std::make_shared<std::promise<DownloadResult>>(); + enqueueDownload(request, + [promise](const DownloadResult & result) { promise->set_value(result); }, + [promise](std::exception_ptr exc) { promise->set_exception(exc); }); + return promise->get_future(); +} + DownloadResult Downloader::download(const DownloadRequest & request) { return enqueueDownload(request).get(); |