diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/libstore/binary-cache-store.cc | 18 | ||||
-rw-r--r-- | src/libstore/download.cc | 98 | ||||
-rw-r--r-- | src/libstore/download.hh | 11 | ||||
-rw-r--r-- | src/libstore/http-binary-cache-store.cc | 56 | ||||
-rw-r--r-- | src/libstore/store-api.cc | 94 | ||||
-rw-r--r-- | src/libutil/retry.hh | 38 | ||||
-rw-r--r-- | src/libutil/types.hh | 2 | ||||
-rw-r--r-- | src/nix/copy.cc | 2 |
8 files changed, 151 insertions, 168 deletions
diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc index 8b736056e..4527ee6ba 100644 --- a/src/libstore/binary-cache-store.cc +++ b/src/libstore/binary-cache-store.cc @@ -10,8 +10,6 @@ #include "nar-info-disk-cache.hh" #include "nar-accessor.hh" #include "json.hh" -#include "retry.hh" -#include "download.hh" #include <chrono> @@ -81,15 +79,13 @@ void BinaryCacheStore::getFile(const std::string & path, Sink & sink) std::shared_ptr<std::string> BinaryCacheStore::getFile(const std::string & path) { - return retry<std::shared_ptr<std::string>>(downloadSettings.tries, [&]() -> std::shared_ptr<std::string> { - StringSink sink; - try { - getFile(path, sink); - } catch (NoSuchBinaryCacheFile &) { - return nullptr; - } - return sink.s; - }); + StringSink sink; + try { + getFile(path, sink); + } catch (NoSuchBinaryCacheFile &) { + return nullptr; + } + return sink.s; } Path BinaryCacheStore::narInfoFileFor(const Path & storePath) diff --git a/src/libstore/download.cc b/src/libstore/download.cc index 071eb5a96..aec68d627 100644 --- a/src/libstore/download.cc +++ b/src/libstore/download.cc @@ -8,7 +8,6 @@ #include "compression.hh" #include "pathlocks.hh" #include "finally.hh" -#include "retry.hh" #ifdef ENABLE_S3 #include <aws/core/client/ClientConfiguration.h> @@ -20,9 +19,11 @@ #include <curl/curl.h> #include <algorithm> +#include <cmath> #include <cstring> #include <iostream> #include <queue> +#include <random> #include <thread> using namespace std::string_literals; @@ -45,6 +46,9 @@ struct CurlDownloader : public Downloader { CURLM * curlm = 0; + std::random_device rd; + std::mt19937 mt19937; + struct DownloadItem : public std::enable_shared_from_this<DownloadItem> { CurlDownloader & downloader; @@ -57,10 +61,20 @@ struct CurlDownloader : public Downloader bool active = false; // whether the handle has been added to the multi object std::string status; + unsigned int attempt = 0; + + /* Don't start this download until the specified time point + has been reached. */ + std::chrono::steady_clock::time_point embargo; + struct curl_slist * requestHeaders = 0; std::string encoding; + bool acceptRanges = false; + + curl_off_t writtenToSink = 0; + DownloadItem(CurlDownloader & downloader, const DownloadRequest & request, Callback<DownloadResult> callback) @@ -71,9 +85,10 @@ struct CurlDownloader : public Downloader {request.uri}, request.parentAct) , callback(callback) , finalSink([this](const unsigned char * data, size_t len) { - if (this->request.dataCallback) + if (this->request.dataCallback) { + writtenToSink += len; this->request.dataCallback((char *) data, len); - else + } else this->result.data->append((char *) data, len); }) { @@ -151,6 +166,7 @@ struct CurlDownloader : public Downloader status = ss.size() >= 2 ? ss[1] : ""; result.data = std::make_shared<std::string>(); result.bodySize = 0; + acceptRanges = false; encoding = ""; } else { auto i = line.find(':'); @@ -168,7 +184,9 @@ struct CurlDownloader : public Downloader return 0; } } else if (name == "content-encoding") - encoding = trim(string(line, i + 1));; + encoding = trim(string(line, i + 1)); + else if (name == "accept-ranges" && toLower(trim(std::string(line, i + 1))) == "bytes") + acceptRanges = true; } } return realSize; @@ -286,6 +304,9 @@ struct CurlDownloader : public Downloader curl_easy_setopt(req, CURLOPT_NETRC_FILE, settings.netrcFile.get().c_str()); curl_easy_setopt(req, CURLOPT_NETRC, CURL_NETRC_OPTIONAL); + if (writtenToSink) + curl_easy_setopt(req, CURLOPT_RESUME_FROM_LARGE, writtenToSink); + result.data = std::make_shared<std::string>(); result.bodySize = 0; } @@ -320,7 +341,7 @@ struct CurlDownloader : public Downloader failEx(writeException); else if (code == CURLE_OK && - (httpStatus == 200 || httpStatus == 201 || httpStatus == 204 || httpStatus == 304 || httpStatus == 226 /* FTP */ || httpStatus == 0 /* other protocol */)) + (httpStatus == 200 || httpStatus == 201 || httpStatus == 204 || httpStatus == 206 || httpStatus == 304 || httpStatus == 226 /* FTP */ || httpStatus == 0 /* other protocol */)) { result.cached = httpStatus == 304; done = true; @@ -377,7 +398,9 @@ struct CurlDownloader : public Downloader } } - fail( + attempt++; + + auto exc = code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted ? DownloadError(Interrupted, fmt("%s of '%s' was interrupted", request.verb(), request.uri)) : httpStatus != 0 @@ -388,15 +411,41 @@ struct CurlDownloader : public Downloader ) : DownloadError(err, fmt("unable to %s '%s': %s (%d)", - request.verb(), request.uri, curl_easy_strerror(code), code))); + request.verb(), request.uri, curl_easy_strerror(code), code)); + + /* If this is a transient error, then maybe retry the + download after a while. If we're writing to a + sink, we can only retry if the server supports + ranged requests. */ + if (err == Transient + && attempt < request.tries + && (!this->request.dataCallback + || writtenToSink == 0 + || (acceptRanges && encoding.empty()))) + { + int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(downloader.mt19937)); + if (writtenToSink) + warn("%s; retrying from offset %d in %d ms", exc.what(), writtenToSink, ms); + else + warn("%s; retrying in %d ms", exc.what(), ms); + embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms); + downloader.enqueueItem(shared_from_this()); + } + else + fail(exc); } } }; struct State { + struct EmbargoComparator { + bool operator() (const std::shared_ptr<DownloadItem> & i1, const std::shared_ptr<DownloadItem> & i2) { + return i1->embargo > i2->embargo; + } + }; bool quit = false; - std::vector<std::shared_ptr<DownloadItem>> incoming; + std::priority_queue<std::shared_ptr<DownloadItem>, std::vector<std::shared_ptr<DownloadItem>>, EmbargoComparator> incoming; }; Sync<State> state_; @@ -409,6 +458,7 @@ struct CurlDownloader : public Downloader std::thread workerThread; CurlDownloader() + : mt19937(rd()) { static std::once_flag globalInit; std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL); @@ -502,7 +552,9 @@ struct CurlDownloader : public Downloader nextWakeup = std::chrono::steady_clock::time_point(); - /* Add new curl requests from the incoming requests queue. */ + /* Add new curl requests from the incoming requests queue, + except for requests that are embargoed (waiting for a + retry timeout to expire). */ if (extraFDs[0].revents & CURL_WAIT_POLLIN) { char buf[1024]; auto res = read(extraFDs[0].fd, buf, sizeof(buf)); @@ -511,9 +563,22 @@ struct CurlDownloader : public Downloader } std::vector<std::shared_ptr<DownloadItem>> incoming; + auto now = std::chrono::steady_clock::now(); + { auto state(state_.lock()); - std::swap(state->incoming, incoming); + while (!state->incoming.empty()) { + auto item = state->incoming.top(); + if (item->embargo <= now) { + incoming.push_back(item); + state->incoming.pop(); + } else { + if (nextWakeup == std::chrono::steady_clock::time_point() + || item->embargo < nextWakeup) + nextWakeup = item->embargo; + break; + } + } quit = state->quit; } @@ -535,12 +600,12 @@ struct CurlDownloader : public Downloader workerThreadMain(); } catch (nix::Interrupted & e) { } catch (std::exception & e) { - printError(format("unexpected error in download thread: %s") % e.what()); + printError("unexpected error in download thread: %s", e.what()); } { auto state(state_.lock()); - state->incoming.clear(); + while (!state->incoming.empty()) state->incoming.pop(); state->quit = true; } } @@ -556,7 +621,7 @@ struct CurlDownloader : public Downloader auto state(state_.lock()); if (state->quit) throw nix::Error("cannot enqueue download request because the download thread is shutting down"); - state->incoming.push_back(item); + state->incoming.push(item); } writeFull(wakeupPipe.writeSide.get(), " "); } @@ -639,9 +704,7 @@ std::future<DownloadResult> Downloader::enqueueDownload(const DownloadRequest & DownloadResult Downloader::download(const DownloadRequest & request) { - return retry<DownloadResult>(request.tries, [&]() { - return enqueueDownload(request).get(); - }); + return enqueueDownload(request).get(); } void Downloader::download(DownloadRequest && request, Sink & sink) @@ -828,7 +891,7 @@ CachedDownloadResult Downloader::downloadCached( writeFile(dataFile, url + "\n" + res.etag + "\n" + std::to_string(time(0)) + "\n"); } catch (DownloadError & e) { if (storePath.empty()) throw; - warn("%s; using cached result", e.msg()); + warn("warning: %s; using cached result", e.msg()); result.etag = expectedETag; } } @@ -892,4 +955,5 @@ bool isUri(const string & s) return scheme == "http" || scheme == "https" || scheme == "file" || scheme == "channel" || scheme == "git" || scheme == "s3" || scheme == "ssh"; } + } diff --git a/src/libstore/download.hh b/src/libstore/download.hh index 7548b83ae..c095ad053 100644 --- a/src/libstore/download.hh +++ b/src/libstore/download.hh @@ -99,13 +99,11 @@ struct Downloader std::future<DownloadResult> enqueueDownload(const DownloadRequest & request); - /* Synchronously download a file. The request will be retried in - case of transient failures. */ + /* Synchronously download a file. */ DownloadResult download(const DownloadRequest & request); /* Download a file, writing its data to a sink. The sink will be - invoked on the thread of the caller. The request will not be - retried in case of transient failures. */ + invoked on the thread of the caller. */ void download(DownloadRequest && request, Sink & sink); /* Check if the specified file is already in ~/.cache/nix/tarballs @@ -131,11 +129,6 @@ public: DownloadError(Downloader::Error error, const FormatOrString & fs) : Error(fs), error(error) { } - - bool isTransient() override - { - return error == Downloader::Error::Transient; - } }; bool isUri(const string & s); diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc index ff2c10354..ea39d4f22 100644 --- a/src/libstore/http-binary-cache-store.cc +++ b/src/libstore/http-binary-cache-store.cc @@ -2,7 +2,6 @@ #include "download.hh" #include "globals.hh" #include "nar-info-disk-cache.hh" -#include "retry.hh" namespace nix { @@ -136,46 +135,21 @@ protected: { checkEnabled(); - struct State - { - DownloadRequest request; - std::function<void()> tryDownload; - unsigned int attempt = 0; - State(DownloadRequest && request) : request(request) {} - }; - - auto state = std::make_shared<State>(makeRequest(path)); - - state->tryDownload = [callback, state, this]() { - getDownloader()->enqueueDownload(state->request, - {[callback, state, this](std::future<DownloadResult> result) { - try { - callback(result.get().data); - } catch (DownloadError & e) { - if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) - return callback(std::shared_ptr<std::string>()); - ++state->attempt; - if (state->attempt < state->request.tries && e.isTransient()) { - auto ms = retrySleepTime(state->attempt); - warn("%s; retrying in %d ms", e.what(), ms); - /* We can't sleep here because that would - block the download thread. So use a - separate thread for sleeping. */ - std::thread([state, ms]() { - std::this_thread::sleep_for(std::chrono::milliseconds(ms)); - state->tryDownload(); - }).detach(); - } else { - maybeDisable(); - callback.rethrow(); - } - } catch (...) { - callback.rethrow(); - } - }}); - }; - - state->tryDownload(); + auto request(makeRequest(path)); + + getDownloader()->enqueueDownload(request, + {[callback, this](std::future<DownloadResult> result) { + try { + callback(result.get().data); + } catch (DownloadError & e) { + if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) + return callback(std::shared_ptr<std::string>()); + maybeDisable(); + callback.rethrow(); + } catch (...) { + callback.rethrow(); + } + }}); } }; diff --git a/src/libstore/store-api.cc b/src/libstore/store-api.cc index 77f52f0f3..e9042443c 100644 --- a/src/libstore/store-api.cc +++ b/src/libstore/store-api.cc @@ -6,11 +6,10 @@ #include "thread-pool.hh" #include "json.hh" #include "derivations.hh" -#include "retry.hh" -#include "download.hh" #include <future> + namespace nix { @@ -580,57 +579,54 @@ void Store::buildPaths(const PathSet & paths, BuildMode buildMode) void copyStorePath(ref<Store> srcStore, ref<Store> dstStore, const Path & storePath, RepairFlag repair, CheckSigsFlag checkSigs) { - retry<void>(downloadSettings.tries, [&]() { - - auto srcUri = srcStore->getUri(); - auto dstUri = dstStore->getUri(); - - Activity act(*logger, lvlInfo, actCopyPath, - srcUri == "local" || srcUri == "daemon" - ? fmt("copying path '%s' to '%s'", storePath, dstUri) - : dstUri == "local" || dstUri == "daemon" - ? fmt("copying path '%s' from '%s'", storePath, srcUri) - : fmt("copying path '%s' from '%s' to '%s'", storePath, srcUri, dstUri), - {storePath, srcUri, dstUri}); - PushActivity pact(act.id); - - auto info = srcStore->queryPathInfo(storePath); - - uint64_t total = 0; - - if (!info->narHash) { - StringSink sink; - srcStore->narFromPath({storePath}, sink); - auto info2 = make_ref<ValidPathInfo>(*info); - info2->narHash = hashString(htSHA256, *sink.s); - if (!info->narSize) info2->narSize = sink.s->size(); - if (info->ultimate) info2->ultimate = false; - info = info2; - - StringSource source(*sink.s); - dstStore->addToStore(*info, source, repair, checkSigs); - return; - } + auto srcUri = srcStore->getUri(); + auto dstUri = dstStore->getUri(); + + Activity act(*logger, lvlInfo, actCopyPath, + srcUri == "local" || srcUri == "daemon" + ? fmt("copying path '%s' to '%s'", storePath, dstUri) + : dstUri == "local" || dstUri == "daemon" + ? fmt("copying path '%s' from '%s'", storePath, srcUri) + : fmt("copying path '%s' from '%s' to '%s'", storePath, srcUri, dstUri), + {storePath, srcUri, dstUri}); + PushActivity pact(act.id); + + auto info = srcStore->queryPathInfo(storePath); + + uint64_t total = 0; + + if (!info->narHash) { + StringSink sink; + srcStore->narFromPath({storePath}, sink); + auto info2 = make_ref<ValidPathInfo>(*info); + info2->narHash = hashString(htSHA256, *sink.s); + if (!info->narSize) info2->narSize = sink.s->size(); + if (info->ultimate) info2->ultimate = false; + info = info2; + + StringSource source(*sink.s); + dstStore->addToStore(*info, source, repair, checkSigs); + return; + } - if (info->ultimate) { - auto info2 = make_ref<ValidPathInfo>(*info); - info2->ultimate = false; - info = info2; - } + if (info->ultimate) { + auto info2 = make_ref<ValidPathInfo>(*info); + info2->ultimate = false; + info = info2; + } - auto source = sinkToSource([&](Sink & sink) { - LambdaSink wrapperSink([&](const unsigned char * data, size_t len) { - sink(data, len); - total += len; - act.progress(total, info->narSize); - }); - srcStore->narFromPath({storePath}, wrapperSink); - }, [&]() { - throw EndOfFile("NAR for '%s' fetched from '%s' is incomplete", storePath, srcStore->getUri()); + auto source = sinkToSource([&](Sink & sink) { + LambdaSink wrapperSink([&](const unsigned char * data, size_t len) { + sink(data, len); + total += len; + act.progress(total, info->narSize); }); - - dstStore->addToStore(*info, *source, repair, checkSigs); + srcStore->narFromPath({storePath}, wrapperSink); + }, [&]() { + throw EndOfFile("NAR for '%s' fetched from '%s' is incomplete", storePath, srcStore->getUri()); }); + + dstStore->addToStore(*info, *source, repair, checkSigs); } diff --git a/src/libutil/retry.hh b/src/libutil/retry.hh deleted file mode 100644 index b45cb37f7..000000000 --- a/src/libutil/retry.hh +++ /dev/null @@ -1,38 +0,0 @@ -#pragma once - -#include "logging.hh" - -#include <functional> -#include <cmath> -#include <random> -#include <thread> - -namespace nix { - -inline unsigned int retrySleepTime(unsigned int attempt) -{ - std::random_device rd; - std::mt19937 mt19937; - return 250.0 * std::pow(2.0f, - attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(mt19937)); -} - -template<typename C> -C retry(unsigned int attempts, std::function<C()> && f) -{ - unsigned int attempt = 0; - while (true) { - try { - return f(); - } catch (BaseError & e) { - ++attempt; - if (attempt >= attempts || !e.isTransient()) - throw; - auto ms = retrySleepTime(attempt); - warn("%s; retrying in %d ms", e.what(), ms); - std::this_thread::sleep_for(std::chrono::milliseconds(ms)); - } - } -} - -} diff --git a/src/libutil/types.hh b/src/libutil/types.hh index 88e3243f4..92bf469b5 100644 --- a/src/libutil/types.hh +++ b/src/libutil/types.hh @@ -109,8 +109,6 @@ public: const string & msg() const { return err; } const string & prefix() const { return prefix_; } BaseError & addPrefix(const FormatOrString & fs); - - virtual bool isTransient() { return false; } }; #define MakeError(newClass, superClass) \ diff --git a/src/nix/copy.cc b/src/nix/copy.cc index 5f9051f40..b1aceb15c 100644 --- a/src/nix/copy.cc +++ b/src/nix/copy.cc @@ -36,7 +36,7 @@ struct CmdCopy : StorePathsCommand .set(&checkSigs, NoCheckSigs); mkFlag() - .longName("substitute") + .longName("substitute-on-destination") .shortName('s') .description("whether to try substitutes on the destination store (only supported by SSH)") .set(&substitute, Substitute); |