aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/libexpr/common-opts.cc2
-rw-r--r--src/libexpr/parser.y2
-rw-r--r--src/libexpr/primops.cc2
-rw-r--r--src/libstore/builtins.cc10
-rw-r--r--src/libstore/download.cc550
-rw-r--r--src/libstore/download.hh35
-rw-r--r--src/libstore/http-binary-cache-store.cc26
-rwxr-xr-xsrc/nix-channel/nix-channel.cc8
-rw-r--r--src/nix-prefetch-url/nix-prefetch-url.cc2
9 files changed, 430 insertions, 207 deletions
diff --git a/src/libexpr/common-opts.cc b/src/libexpr/common-opts.cc
index 8a7989aac..06d6ed87d 100644
--- a/src/libexpr/common-opts.cc
+++ b/src/libexpr/common-opts.cc
@@ -55,7 +55,7 @@ bool parseSearchPathArg(Strings::iterator & i,
Path lookupFileArg(EvalState & state, string s)
{
if (isUri(s))
- return makeDownloader()->downloadCached(state.store, s, true);
+ return getDownloader()->downloadCached(state.store, s, true);
else if (s.size() > 2 && s.at(0) == '<' && s.at(s.size() - 1) == '>') {
Path p = s.substr(1, s.size() - 2);
return state.findFile(p);
diff --git a/src/libexpr/parser.y b/src/libexpr/parser.y
index 776e5cb39..3f7eed16f 100644
--- a/src/libexpr/parser.y
+++ b/src/libexpr/parser.y
@@ -662,7 +662,7 @@ std::pair<bool, std::string> EvalState::resolveSearchPathElem(const SearchPathEl
// FIXME: support specifying revision/branch
res = { true, exportGit(store, elem.second, "master") };
else
- res = { true, makeDownloader()->downloadCached(store, elem.second, true) };
+ res = { true, getDownloader()->downloadCached(store, elem.second, true) };
} catch (DownloadError & e) {
printMsg(lvlError, format("warning: Nix search path entry ‘%1%’ cannot be downloaded, ignoring") % elem.second);
res = { false, "" };
diff --git a/src/libexpr/primops.cc b/src/libexpr/primops.cc
index 3b965f209..e46000084 100644
--- a/src/libexpr/primops.cc
+++ b/src/libexpr/primops.cc
@@ -1769,7 +1769,7 @@ void fetch(EvalState & state, const Pos & pos, Value * * args, Value & v,
if (state.restricted && !expectedHash)
throw Error(format("‘%1%’ is not allowed in restricted mode") % who);
- Path res = makeDownloader()->downloadCached(state.store, url, unpack, name, expectedHash);
+ Path res = getDownloader()->downloadCached(state.store, url, unpack, name, expectedHash);
mkString(v, res, PathSet({res}));
}
diff --git a/src/libstore/builtins.cc b/src/libstore/builtins.cc
index d3194a905..a30f30906 100644
--- a/src/libstore/builtins.cc
+++ b/src/libstore/builtins.cc
@@ -17,13 +17,15 @@ void builtinFetchurl(const BasicDerivation & drv)
auto fetch = [&](const string & url) {
/* No need to do TLS verification, because we check the hash of
the result anyway. */
- DownloadOptions options;
- options.verifyTLS = false;
+ DownloadRequest request(url);
+ request.verifyTLS = false;
/* Show a progress indicator, even though stderr is not a tty. */
- options.showProgress = DownloadOptions::yes;
+ request.showProgress = DownloadRequest::yes;
- auto data = makeDownloader()->download(url, options);
+ /* Note: have to use a fresh downloader here because we're in
+ a forked process. */
+ auto data = makeDownloader()->download(request);
assert(data.data);
return data.data;
diff --git a/src/libstore/download.cc b/src/libstore/download.cc
index ed7e124d2..2aca0a975 100644
--- a/src/libstore/download.cc
+++ b/src/libstore/download.cc
@@ -5,10 +5,15 @@
#include "store-api.hh"
#include "archive.hh"
+#include <unistd.h>
+#include <fcntl.h>
+
#include <curl/curl.h>
#include <iostream>
#include <thread>
+#include <cmath>
+#include <random>
namespace nix {
@@ -30,225 +35,428 @@ std::string resolveUri(const std::string & uri)
struct CurlDownloader : public Downloader
{
- CURL * curl;
- ref<std::string> data;
- string etag, status, expectedETag, effectiveUrl;
-
- struct curl_slist * requestHeaders;
+ CURLM * curlm = 0;
- bool showProgress;
- double prevProgressTime{0}, startTime{0};
- unsigned int moveBack{1};
+ std::random_device rd;
+ std::mt19937 mt19937;
- size_t writeCallback(void * contents, size_t size, size_t nmemb)
+ struct DownloadItem : public std::enable_shared_from_this<DownloadItem>
{
- size_t realSize = size * nmemb;
- data->append((char *) contents, realSize);
- return realSize;
- }
+ CurlDownloader & downloader;
+ DownloadRequest request;
+ DownloadResult result;
+ bool done = false; // whether the promise has been set
+ std::promise<DownloadResult> promise;
+ CURL * req = 0;
+ bool active = false; // whether the handle has been added to the multi object
+ std::string status;
+
+ bool showProgress = false;
+ double prevProgressTime{0}, startTime{0};
+ unsigned int moveBack{1};
+
+ unsigned int attempt = 0;
+
+ /* Don't start this download until the specified time point
+ has been reached. */
+ std::chrono::steady_clock::time_point embargo;
+
+ struct curl_slist * requestHeaders = 0;
+
+ DownloadItem(CurlDownloader & downloader, const DownloadRequest & request)
+ : downloader(downloader), request(request)
+ {
+ showProgress =
+ request.showProgress == DownloadRequest::yes ||
+ (request.showProgress == DownloadRequest::automatic && isatty(STDERR_FILENO));
+
+ if (!request.expectedETag.empty())
+ requestHeaders = curl_slist_append(requestHeaders, ("If-None-Match: " + request.expectedETag).c_str());
+ }
- static size_t writeCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp)
- {
- return ((CurlDownloader *) userp)->writeCallback(contents, size, nmemb);
- }
+ ~DownloadItem()
+ {
+ if (req) {
+ if (active)
+ curl_multi_remove_handle(downloader.curlm, req);
+ curl_easy_cleanup(req);
+ }
+ if (requestHeaders) curl_slist_free_all(requestHeaders);
+ try {
+ if (!done)
+ fail(DownloadError(Transient, format("download of ‘%s’ was interrupted") % request.uri));
+ } catch (...) {
+ ignoreException();
+ }
+ }
- size_t headerCallback(void * contents, size_t size, size_t nmemb)
- {
- size_t realSize = size * nmemb;
- string line = string((char *) contents, realSize);
- printMsg(lvlVomit, format("got header: %1%") % trim(line));
- if (line.compare(0, 5, "HTTP/") == 0) { // new response starts
- etag = "";
- auto ss = tokenizeString<vector<string>>(line, " ");
- status = ss.size() >= 2 ? ss[1] : "";
- } else {
- auto i = line.find(':');
- if (i != string::npos) {
- string name = trim(string(line, 0, i));
- if (name == "ETag") { // FIXME: case
- etag = trim(string(line, i + 1));
- /* Hack to work around a GitHub bug: it sends
- ETags, but ignores If-None-Match. So if we get
- the expected ETag on a 200 response, then shut
- down the connection because we already have the
- data. */
- printMsg(lvlDebug, format("got ETag: %1%") % etag);
- if (etag == expectedETag && status == "200") {
- printMsg(lvlDebug, format("shutting down on 200 HTTP response with expected ETag"));
- return 0;
+ template<class T>
+ void fail(const T & e)
+ {
+ promise.set_exception(std::make_exception_ptr(e));
+ done = true;
+ }
+
+ size_t writeCallback(void * contents, size_t size, size_t nmemb)
+ {
+ size_t realSize = size * nmemb;
+ result.data->append((char *) contents, realSize);
+ return realSize;
+ }
+
+ static size_t writeCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp)
+ {
+ return ((DownloadItem *) userp)->writeCallback(contents, size, nmemb);
+ }
+
+ size_t headerCallback(void * contents, size_t size, size_t nmemb)
+ {
+ size_t realSize = size * nmemb;
+ std::string line((char *) contents, realSize);
+ printMsg(lvlVomit, format("got header for ‘%s’: %s") % request.uri % trim(line));
+ if (line.compare(0, 5, "HTTP/") == 0) { // new response starts
+ result.etag = "";
+ auto ss = tokenizeString<vector<string>>(line, " ");
+ status = ss.size() >= 2 ? ss[1] : "";
+ result.data = std::make_shared<std::string>();
+ } else {
+ auto i = line.find(':');
+ if (i != string::npos) {
+ string name = toLower(trim(string(line, 0, i)));
+ if (name == "etag") {
+ result.etag = trim(string(line, i + 1));
+ /* Hack to work around a GitHub bug: it sends
+ ETags, but ignores If-None-Match. So if we get
+ the expected ETag on a 200 response, then shut
+ down the connection because we already have the
+ data. */
+ if (result.etag == request.expectedETag && status == "200") {
+ debug(format("shutting down on 200 HTTP response with expected ETag"));
+ return 0;
+ }
}
}
}
+ return realSize;
}
- return realSize;
- }
- static size_t headerCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp)
- {
- return ((CurlDownloader *) userp)->headerCallback(contents, size, nmemb);
- }
+ static size_t headerCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp)
+ {
+ return ((DownloadItem *) userp)->headerCallback(contents, size, nmemb);
+ }
- int progressCallback(double dltotal, double dlnow)
- {
- if (showProgress) {
- double now = getTime();
- if (prevProgressTime <= now - 1) {
- string s = (format(" [%1$.0f/%2$.0f KiB, %3$.1f KiB/s]")
- % (dlnow / 1024.0)
- % (dltotal / 1024.0)
- % (now == startTime ? 0 : dlnow / 1024.0 / (now - startTime))).str();
- std::cerr << "\e[" << moveBack << "D" << s;
- moveBack = s.size();
+ int progressCallback(double dltotal, double dlnow)
+ {
+ if (showProgress) {
+ double now = getTime();
+ if (prevProgressTime <= now - 1) {
+ string s = (format(" [%1$.0f/%2$.0f KiB, %3$.1f KiB/s]")
+ % (dlnow / 1024.0)
+ % (dltotal / 1024.0)
+ % (now == startTime ? 0 : dlnow / 1024.0 / (now - startTime))).str();
+ std::cerr << "\e[" << moveBack << "D" << s;
+ moveBack = s.size();
+ std::cerr.flush();
+ prevProgressTime = now;
+ }
+ }
+ return _isInterrupted;
+ }
+
+ static int progressCallbackWrapper(void * userp, double dltotal, double dlnow, double ultotal, double ulnow)
+ {
+ return ((DownloadItem *) userp)->progressCallback(dltotal, dlnow);
+ }
+
+ void init()
+ {
+ // FIXME: handle parallel downloads.
+ if (showProgress) {
+ std::cerr << (format("downloading ‘%1%’... ") % request.uri);
std::cerr.flush();
- prevProgressTime = now;
+ startTime = getTime();
}
+
+ if (!req) req = curl_easy_init();
+
+ curl_easy_reset(req);
+ curl_easy_setopt(req, CURLOPT_URL, request.uri.c_str());
+ curl_easy_setopt(req, CURLOPT_FOLLOWLOCATION, 1L);
+ curl_easy_setopt(req, CURLOPT_NOSIGNAL, 1);
+ curl_easy_setopt(req, CURLOPT_USERAGENT, ("Nix/" + nixVersion).c_str());
+ curl_easy_setopt(req, CURLOPT_PIPEWAIT, 1);
+ curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2TLS);
+ curl_easy_setopt(req, CURLOPT_WRITEFUNCTION, DownloadItem::writeCallbackWrapper);
+ curl_easy_setopt(req, CURLOPT_WRITEDATA, this);
+ curl_easy_setopt(req, CURLOPT_HEADERFUNCTION, DownloadItem::headerCallbackWrapper);
+ curl_easy_setopt(req, CURLOPT_HEADERDATA, this);
+
+ curl_easy_setopt(req, CURLOPT_PROGRESSFUNCTION, progressCallbackWrapper);
+ curl_easy_setopt(req, CURLOPT_PROGRESSDATA, this);
+ curl_easy_setopt(req, CURLOPT_NOPROGRESS, 0);
+
+ curl_easy_setopt(req, CURLOPT_HTTPHEADER, requestHeaders);
+
+ if (request.head)
+ curl_easy_setopt(req, CURLOPT_NOBODY, 1);
+
+ if (request.verifyTLS)
+ curl_easy_setopt(req, CURLOPT_CAINFO, getEnv("SSL_CERT_FILE", "/etc/ssl/certs/ca-certificates.crt").c_str());
+ else {
+ curl_easy_setopt(req, CURLOPT_SSL_VERIFYPEER, 0);
+ curl_easy_setopt(req, CURLOPT_SSL_VERIFYHOST, 0);
+ }
+
+ result.data = std::make_shared<std::string>();
}
- return _isInterrupted;
- }
- static int progressCallbackWrapper(void * userp, double dltotal, double dlnow, double ultotal, double ulnow)
- {
- return ((CurlDownloader *) userp)->progressCallback(dltotal, dlnow);
- }
+ void finish(CURLcode code)
+ {
+ if (showProgress)
+ //std::cerr << "\e[" << moveBack << "D\e[K\n";
+ std::cerr << "\n";
- CurlDownloader()
- : data(make_ref<std::string>())
- {
- requestHeaders = 0;
+ long httpStatus = 0;
+ curl_easy_getinfo(req, CURLINFO_RESPONSE_CODE, &httpStatus);
- curl = curl_easy_init();
- if (!curl) throw nix::Error("unable to initialize curl");
- }
+ char * effectiveUrlCStr;
+ curl_easy_getinfo(req, CURLINFO_EFFECTIVE_URL, &effectiveUrlCStr);
+ if (effectiveUrlCStr)
+ result.effectiveUrl = effectiveUrlCStr;
- ~CurlDownloader()
- {
- if (curl) curl_easy_cleanup(curl);
- if (requestHeaders) curl_slist_free_all(requestHeaders);
- }
+ debug(format("finished download of ‘%s’; curl status = %d, HTTP status = %d, body = %d bytes")
+ % request.uri % code % httpStatus % (result.data ? result.data->size() : 0));
- bool fetch(const string & url, const DownloadOptions & options)
- {
- showProgress =
- options.showProgress == DownloadOptions::yes ||
- (options.showProgress == DownloadOptions::automatic && isatty(STDERR_FILENO));
+ if (code == CURLE_WRITE_ERROR && result.etag == request.expectedETag) {
+ code = CURLE_OK;
+ httpStatus = 304;
+ }
+
+ if (code == CURLE_OK &&
+ (httpStatus == 200 || httpStatus == 304 || httpStatus == 226 /* FTP */ || httpStatus == 0 /* other protocol */))
+ {
+ result.cached = httpStatus == 304;
+ promise.set_value(result);
+ done = true;
+ } else {
+ Error err =
+ (httpStatus == 404 || code == CURLE_FILE_COULDNT_READ_FILE) ? NotFound :
+ httpStatus == 403 ? Forbidden :
+ (httpStatus == 408 || httpStatus == 500 || httpStatus == 503
+ || httpStatus == 504 || httpStatus == 522 || httpStatus == 524
+ || code == CURLE_COULDNT_RESOLVE_HOST) ? Transient :
+ Misc;
- curl_easy_reset(curl);
+ attempt++;
- curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
- curl_easy_setopt(curl, CURLOPT_USERAGENT, ("Nix/" + nixVersion).c_str());
- curl_easy_setopt(curl, CURLOPT_FAILONERROR, 1);
+ auto exc =
+ httpStatus != 0
+ ? DownloadError(err, format("unable to download ‘%s’: HTTP error %d") % request.uri % httpStatus)
+ : DownloadError(err, format("unable to download ‘%s’: %s (%d)") % request.uri % curl_easy_strerror(code) % code);
+
+ /* If this is a transient error, then maybe retry the
+ download after a while. */
+ if (err == Transient && attempt < request.tries) {
+ int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(downloader.mt19937));
+ printMsg(lvlError, format("warning: %s; retrying in %d ms") % exc.what() % ms);
+ embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms);
+ downloader.enqueueItem(shared_from_this());
+ }
+ else
+ fail(exc);
+ }
+ }
+ };
- curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writeCallbackWrapper);
- curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) this);
+ struct State
+ {
+ bool quit = false;
+ std::vector<std::shared_ptr<DownloadItem>> incoming;
+ };
- curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, headerCallbackWrapper);
- curl_easy_setopt(curl, CURLOPT_HEADERDATA, (void *) this);
+ Sync<State> state_;
- curl_easy_setopt(curl, CURLOPT_PROGRESSFUNCTION, progressCallbackWrapper);
- curl_easy_setopt(curl, CURLOPT_PROGRESSDATA, (void *) this);
- curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0);
+ /* We can't use a std::condition_variable to wake up the curl
+ thread, because it only monitors file descriptors. So use a
+ pipe instead. */
+ Pipe wakeupPipe;
- curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
+ std::thread workerThread;
- curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
+ CurlDownloader()
+ {
+ static std::once_flag globalInit;
+ std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL);
- if (options.verifyTLS)
- curl_easy_setopt(curl, CURLOPT_CAINFO, getEnv("SSL_CERT_FILE", "/etc/ssl/certs/ca-certificates.crt").c_str());
- else {
- curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0);
- curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 0);
- }
+ curlm = curl_multi_init();
- data = make_ref<std::string>();
+ curl_multi_setopt(curlm, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX);
+ curl_multi_setopt(curlm, CURLMOPT_MAX_TOTAL_CONNECTIONS, 25); // FIXME: configurable
- if (requestHeaders) {
- curl_slist_free_all(requestHeaders);
- requestHeaders = 0;
- }
+ wakeupPipe.create();
+ fcntl(wakeupPipe.readSide.get(), F_SETFL, O_NONBLOCK);
+
+ workerThread = std::thread([&]() { workerThreadEntry(); });
+ }
- if (!options.expectedETag.empty()) {
- this->expectedETag = options.expectedETag;
- requestHeaders = curl_slist_append(requestHeaders, ("If-None-Match: " + options.expectedETag).c_str());
+ ~CurlDownloader()
+ {
+ /* Signal the worker thread to exit. */
+ {
+ auto state(state_.lock());
+ state->quit = true;
}
+ writeFull(wakeupPipe.writeSide.get(), " ");
- curl_easy_setopt(curl, CURLOPT_HTTPHEADER, requestHeaders);
+ workerThread.join();
- if (options.head)
- curl_easy_setopt(curl, CURLOPT_NOBODY, 1);
+ if (curlm) curl_multi_cleanup(curlm);
+ }
- if (showProgress) {
- std::cerr << (format("downloading ‘%1%’... ") % url);
- std::cerr.flush();
- startTime = getTime();
- }
+ void workerThreadMain()
+ {
+ std::map<CURL *, std::shared_ptr<DownloadItem>> items;
+
+ bool quit;
+
+ std::chrono::steady_clock::time_point nextWakeup;
+
+ while (!quit) {
+ checkInterrupt();
+
+ /* Let curl do its thing. */
+ int running;
+ CURLMcode mc = curl_multi_perform(curlm, &running);
+ if (mc != CURLM_OK)
+ throw nix::Error(format("unexpected error from curl_multi_perform(): %s") % curl_multi_strerror(mc));
+
+ /* Set the promises of any finished requests. */
+ CURLMsg * msg;
+ int left;
+ while ((msg = curl_multi_info_read(curlm, &left))) {
+ if (msg->msg == CURLMSG_DONE) {
+ auto i = items.find(msg->easy_handle);
+ assert(i != items.end());
+ i->second->finish(msg->data.result);
+ curl_multi_remove_handle(curlm, i->second->req);
+ i->second->active = false;
+ items.erase(i);
+ }
+ }
- CURLcode res = curl_easy_perform(curl);
- if (showProgress)
- //std::cerr << "\e[" << moveBack << "D\e[K\n";
- std::cerr << "\n";
- checkInterrupt();
- if (res == CURLE_WRITE_ERROR && etag == options.expectedETag) return false;
-
- long httpStatus = -1;
- curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpStatus);
-
- if (res != CURLE_OK) {
- Error err =
- httpStatus == 404 ? NotFound :
- httpStatus == 403 ? Forbidden :
- (httpStatus == 408 || httpStatus == 500 || httpStatus == 503
- || httpStatus == 504 || httpStatus == 522 || httpStatus == 524
- || res == CURLE_COULDNT_RESOLVE_HOST) ? Transient :
- Misc;
- if (res == CURLE_HTTP_RETURNED_ERROR && httpStatus != -1)
- throw DownloadError(err, format("unable to download ‘%s’: HTTP error %d")
- % url % httpStatus);
- else
- throw DownloadError(err, format("unable to download ‘%s’: %s (%d)")
- % url % curl_easy_strerror(res) % res);
- }
+ /* Wait for activity, including wakeup events. */
+ int numfds = 0;
+ struct curl_waitfd extraFDs[1];
+ extraFDs[0].fd = wakeupPipe.readSide.get();
+ extraFDs[0].events = CURL_WAIT_POLLIN;
+ extraFDs[0].revents = 0;
+ auto sleepTimeMs =
+ nextWakeup != std::chrono::steady_clock::time_point()
+ ? std::max(0, (int) std::chrono::duration_cast<std::chrono::milliseconds>(nextWakeup - std::chrono::steady_clock::now()).count())
+ : 1000000000;
+ //printMsg(lvlVomit, format("download thread waiting for %d ms") % sleepTimeMs);
+ mc = curl_multi_wait(curlm, extraFDs, 1, sleepTimeMs, &numfds);
+ if (mc != CURLM_OK)
+ throw nix::Error(format("unexpected error from curl_multi_wait(): %s") % curl_multi_strerror(mc));
+
+ nextWakeup = std::chrono::steady_clock::time_point();
+
+ /* Add new curl requests from the incoming requests queue,
+ except for requests that are embargoed (waiting for a
+ retry timeout to expire). FIXME: should use a priority
+ queue for the embargoed items to prevent repeated O(n)
+ checks. */
+ if (extraFDs[0].revents & CURL_WAIT_POLLIN) {
+ char buf[1024];
+ auto res = read(extraFDs[0].fd, buf, sizeof(buf));
+ if (res == -1 && errno != EINTR)
+ throw SysError("reading curl wakeup socket");
+ }
- char *effectiveUrlCStr;
- curl_easy_getinfo(curl, CURLINFO_EFFECTIVE_URL, &effectiveUrlCStr);
- if (effectiveUrlCStr)
- effectiveUrl = effectiveUrlCStr;
+ std::vector<std::shared_ptr<DownloadItem>> incoming, embargoed;
+ auto now = std::chrono::steady_clock::now();
+
+ {
+ auto state(state_.lock());
+ for (auto & item: state->incoming) {
+ if (item->embargo <= now)
+ incoming.push_back(item);
+ else {
+ embargoed.push_back(item);
+ if (nextWakeup == std::chrono::steady_clock::time_point()
+ || item->embargo < nextWakeup)
+ nextWakeup = item->embargo;
+ }
+ }
+ state->incoming = embargoed;
+ quit = state->quit;
+ }
- if (httpStatus == 304) return false;
+ for (auto & item : incoming) {
+ debug(format("starting download of %s") % item->request.uri);
+ item->init();
+ curl_multi_add_handle(curlm, item->req);
+ item->active = true;
+ items[item->req] = item;
+ }
+ }
- return true;
+ debug("download thread shutting down");
}
- DownloadResult download(string url, const DownloadOptions & options) override
+ void workerThreadEntry()
{
- size_t attempt = 0;
+ try {
+ workerThreadMain();
+ } catch (Interrupted & e) {
+ } catch (std::exception & e) {
+ printMsg(lvlError, format("unexpected error in download thread: %s") % e.what());
+ }
- while (true) {
- try {
- DownloadResult res;
- if (fetch(resolveUri(url), options)) {
- res.cached = false;
- res.data = data;
- } else
- res.cached = true;
- res.effectiveUrl = effectiveUrl;
- res.etag = etag;
- return res;
- } catch (DownloadError & e) {
- attempt++;
- if (e.error != Transient || attempt >= options.tries) throw;
- auto ms = options.baseRetryTimeMs * (1 << (attempt - 1));
- printMsg(lvlError, format("warning: %s; retrying in %d ms") % e.what() % ms);
- std::this_thread::sleep_for(std::chrono::milliseconds(ms));
- }
+ {
+ auto state(state_.lock());
+ state->incoming.clear();
+ state->quit = true;
}
}
+
+ void enqueueItem(std::shared_ptr<DownloadItem> item)
+ {
+ {
+ auto state(state_.lock());
+ if (state->quit)
+ throw nix::Error("cannot enqueue download request because the download thread is shutting down");
+ state->incoming.push_back(item);
+ }
+ writeFull(wakeupPipe.writeSide.get(), " ");
+ }
+
+ std::future<DownloadResult> enqueueDownload(const DownloadRequest & request) override
+ {
+ auto item = std::make_shared<DownloadItem>(*this, request);
+ enqueueItem(item);
+ return item->promise.get_future();
+ }
};
+ref<Downloader> getDownloader()
+{
+ static std::shared_ptr<Downloader> downloader;
+ static std::once_flag downloaderCreated;
+ std::call_once(downloaderCreated, [&]() { downloader = makeDownloader(); });
+ return ref<Downloader>(downloader);
+}
+
ref<Downloader> makeDownloader()
{
return make_ref<CurlDownloader>();
}
+DownloadResult Downloader::download(const DownloadRequest & request)
+{
+ return enqueueDownload(request).get();
+}
+
Path Downloader::downloadCached(ref<Store> store, const string & url_, bool unpack, string name, const Hash & expectedHash, string * effectiveUrl)
{
auto url = resolveUri(url_);
@@ -303,9 +511,9 @@ Path Downloader::downloadCached(ref<Store> store, const string & url_, bool unpa
if (!skip) {
try {
- DownloadOptions options;
- options.expectedETag = expectedETag;
- auto res = download(url, options);
+ DownloadRequest request(url);
+ request.expectedETag = expectedETag;
+ auto res = download(request);
if (effectiveUrl)
*effectiveUrl = res.effectiveUrl;
diff --git a/src/libstore/download.hh b/src/libstore/download.hh
index 011b85f47..6b90ff202 100644
--- a/src/libstore/download.hh
+++ b/src/libstore/download.hh
@@ -4,24 +4,30 @@
#include "hash.hh"
#include <string>
+#include <future>
namespace nix {
-struct DownloadOptions
+struct DownloadRequest
{
+ std::string uri;
std::string expectedETag;
bool verifyTLS = true;
enum { yes, no, automatic } showProgress = yes;
bool head = false;
size_t tries = 1;
- unsigned int baseRetryTimeMs = 100;
+ unsigned int baseRetryTimeMs = 250;
+
+ DownloadRequest(const std::string & uri) : uri(uri) { }
};
struct DownloadResult
{
+ enum Status { Success, NotFound, Forbidden, Misc, Transient };
+ Status status;
bool cached;
- string etag;
- string effectiveUrl;
+ std::string etag;
+ std::string effectiveUrl;
std::shared_ptr<std::string> data;
};
@@ -29,14 +35,29 @@ class Store;
struct Downloader
{
- virtual DownloadResult download(string url, const DownloadOptions & options) = 0;
+ /* Enqueue a download request, returning a future to the result of
+ the download. The future may throw a DownloadError
+ exception. */
+ virtual std::future<DownloadResult> enqueueDownload(const DownloadRequest & request) = 0;
+
+ /* Synchronously download a file. */
+ DownloadResult download(const DownloadRequest & request);
- Path downloadCached(ref<Store> store, const string & url, bool unpack, string name = "",
- const Hash & expectedHash = Hash(), string * effectiveUrl = nullptr);
+ /* Check if the specified file is already in ~/.cache/nix/tarballs
+ and is more recent than ‘tarball-ttl’ seconds. Otherwise,
+ use the recorded ETag to verify if the server has a more
+ recent version, and if so, download it to the Nix store. */
+ Path downloadCached(ref<Store> store, const string & uri, bool unpack, string name = "",
+ const Hash & expectedHash = Hash(), string * effectiveUri = nullptr);
enum Error { NotFound, Forbidden, Misc, Transient };
};
+/* Return a shared Downloader object. Using this object is preferred
+ because it enables connection reuse and HTTP/2 multiplexing. */
+ref<Downloader> getDownloader();
+
+/* Return a new Downloader object. */
ref<Downloader> makeDownloader();
class DownloadError : public Error
diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc
index bdcd2fd39..91ee6fcb6 100644
--- a/src/libstore/http-binary-cache-store.cc
+++ b/src/libstore/http-binary-cache-store.cc
@@ -13,17 +13,12 @@ private:
Path cacheUri;
- Pool<Downloader> downloaders;
-
public:
HttpBinaryCacheStore(
const Params & params, const Path & _cacheUri)
: BinaryCacheStore(params)
, cacheUri(_cacheUri)
- , downloaders(
- std::numeric_limits<size_t>::max(),
- []() { return makeDownloader(); })
{
if (cacheUri.back() == '/')
cacheUri.pop_back();
@@ -54,12 +49,11 @@ protected:
bool fileExists(const std::string & path) override
{
try {
- auto downloader(downloaders.get());
- DownloadOptions options;
- options.showProgress = DownloadOptions::no;
- options.head = true;
- options.tries = 5;
- downloader->download(cacheUri + "/" + path, options);
+ DownloadRequest request(cacheUri + "/" + path);
+ request.showProgress = DownloadRequest::no;
+ request.head = true;
+ request.tries = 5;
+ getDownloader()->download(request);
return true;
} catch (DownloadError & e) {
/* S3 buckets return 403 if a file doesn't exist and the
@@ -77,13 +71,11 @@ protected:
std::shared_ptr<std::string> getFile(const std::string & path) override
{
- auto downloader(downloaders.get());
- DownloadOptions options;
- options.showProgress = DownloadOptions::no;
- options.tries = 5;
- options.baseRetryTimeMs = 1000;
+ DownloadRequest request(cacheUri + "/" + path);
+ request.showProgress = DownloadRequest::no;
+ request.tries = 8;
try {
- return downloader->download(cacheUri + "/" + path, options).data;
+ return getDownloader()->download(request).data;
} catch (DownloadError & e) {
if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden)
return 0;
diff --git a/src/nix-channel/nix-channel.cc b/src/nix-channel/nix-channel.cc
index 0f7858aa5..5b4c21819 100755
--- a/src/nix-channel/nix-channel.cc
+++ b/src/nix-channel/nix-channel.cc
@@ -85,7 +85,7 @@ static void update(const StringSet & channelNames)
// got redirected in the process, so that we can grab the various parts of a nix channel
// definition from a consistent location if the redirect changes mid-download.
auto effectiveUrl = string{};
- auto dl = makeDownloader();
+ auto dl = getDownloader();
auto filename = dl->downloadCached(store, url, false, "", Hash(), &effectiveUrl);
url = chomp(std::move(effectiveUrl));
@@ -114,10 +114,10 @@ static void update(const StringSet & channelNames)
if (!unpacked) {
// The URL doesn't unpack directly, so let's try treating it like a full channel folder with files in it
// Check if the channel advertises a binary cache.
- DownloadOptions opts;
- opts.showProgress = DownloadOptions::no;
+ DownloadRequest request(url + "/binary-cache-url");
+ request.showProgress = DownloadRequest::no;
try {
- auto dlRes = dl->download(url + "/binary-cache-url", opts);
+ auto dlRes = dl->download(request);
extraAttrs = "binaryCacheURL = \"" + *dlRes.data + "\";";
} catch (DownloadError & e) {
}
diff --git a/src/nix-prefetch-url/nix-prefetch-url.cc b/src/nix-prefetch-url/nix-prefetch-url.cc
index 00f5ae28d..2bf2b2e5c 100644
--- a/src/nix-prefetch-url/nix-prefetch-url.cc
+++ b/src/nix-prefetch-url/nix-prefetch-url.cc
@@ -158,7 +158,7 @@ int main(int argc, char * * argv)
auto actualUri = resolveMirrorUri(state, uri);
/* Download the file. */
- auto result = makeDownloader()->download(actualUri, DownloadOptions());
+ auto result = getDownloader()->download(DownloadRequest(actualUri));
AutoDelete tmpDir(createTempDir(), true);
Path tmpFile = (Path) tmpDir + "/tmp";