aboutsummaryrefslogtreecommitdiff
path: root/src/libstore/download.cc
diff options
context:
space:
mode:
authorEelco Dolstra <edolstra@gmail.com>2019-07-10 19:46:15 +0200
committerEelco Dolstra <edolstra@gmail.com>2019-07-10 19:46:15 +0200
commit03f09e1d18c39706b917964f14f9e40513e4aaea (patch)
tree3d8e762c0fdf100c9027787e48345fb8b5f73cf5 /src/libstore/download.cc
parentaa739e783993864aa6e0c8a4820e6b59f4626d92 (diff)
Revert "Fix 'error 9 while decompressing xz file'"
This reverts commit 78fa47a7f08a4cb6ee7061bf0bd86a40e1d6dc91.
Diffstat (limited to 'src/libstore/download.cc')
-rw-r--r--src/libstore/download.cc67
1 files changed, 55 insertions, 12 deletions
diff --git a/src/libstore/download.cc b/src/libstore/download.cc
index 7a2af237e..2267b5d35 100644
--- a/src/libstore/download.cc
+++ b/src/libstore/download.cc
@@ -8,7 +8,6 @@
#include "compression.hh"
#include "pathlocks.hh"
#include "finally.hh"
-#include "retry.hh"
#ifdef ENABLE_S3
#include <aws/core/client/ClientConfiguration.h>
@@ -20,9 +19,11 @@
#include <curl/curl.h>
#include <algorithm>
+#include <cmath>
#include <cstring>
#include <iostream>
#include <queue>
+#include <random>
#include <thread>
using namespace std::string_literals;
@@ -45,6 +46,9 @@ struct CurlDownloader : public Downloader
{
CURLM * curlm = 0;
+ std::random_device rd;
+ std::mt19937 mt19937;
+
struct DownloadItem : public std::enable_shared_from_this<DownloadItem>
{
CurlDownloader & downloader;
@@ -57,6 +61,12 @@ struct CurlDownloader : public Downloader
bool active = false; // whether the handle has been added to the multi object
std::string status;
+ unsigned int attempt = 0;
+
+ /* Don't start this download until the specified time point
+ has been reached. */
+ std::chrono::steady_clock::time_point embargo;
+
struct curl_slist * requestHeaders = 0;
std::string encoding;
@@ -377,7 +387,9 @@ struct CurlDownloader : public Downloader
}
}
- fail(
+ attempt++;
+
+ auto exc =
code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted
? DownloadError(Interrupted, fmt("%s of '%s' was interrupted", request.verb(), request.uri))
: httpStatus != 0
@@ -388,15 +400,31 @@ struct CurlDownloader : public Downloader
)
: DownloadError(err,
fmt("unable to %s '%s': %s (%d)",
- request.verb(), request.uri, curl_easy_strerror(code), code)));
+ request.verb(), request.uri, curl_easy_strerror(code), code));
+
+ /* If this is a transient error, then maybe retry the
+ download after a while. */
+ if (err == Transient && attempt < request.tries) {
+ int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(downloader.mt19937));
+ printError(format("warning: %s; retrying in %d ms") % exc.what() % ms);
+ embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms);
+ downloader.enqueueItem(shared_from_this());
+ }
+ else
+ fail(exc);
}
}
};
struct State
{
+ struct EmbargoComparator {
+ bool operator() (const std::shared_ptr<DownloadItem> & i1, const std::shared_ptr<DownloadItem> & i2) {
+ return i1->embargo > i2->embargo;
+ }
+ };
bool quit = false;
- std::vector<std::shared_ptr<DownloadItem>> incoming;
+ std::priority_queue<std::shared_ptr<DownloadItem>, std::vector<std::shared_ptr<DownloadItem>>, EmbargoComparator> incoming;
};
Sync<State> state_;
@@ -409,6 +437,7 @@ struct CurlDownloader : public Downloader
std::thread workerThread;
CurlDownloader()
+ : mt19937(rd())
{
static std::once_flag globalInit;
std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL);
@@ -502,7 +531,9 @@ struct CurlDownloader : public Downloader
nextWakeup = std::chrono::steady_clock::time_point();
- /* Add new curl requests from the incoming requests queue. */
+ /* Add new curl requests from the incoming requests queue,
+ except for requests that are embargoed (waiting for a
+ retry timeout to expire). */
if (extraFDs[0].revents & CURL_WAIT_POLLIN) {
char buf[1024];
auto res = read(extraFDs[0].fd, buf, sizeof(buf));
@@ -511,9 +542,22 @@ struct CurlDownloader : public Downloader
}
std::vector<std::shared_ptr<DownloadItem>> incoming;
+ auto now = std::chrono::steady_clock::now();
+
{
auto state(state_.lock());
- std::swap(state->incoming, incoming);
+ while (!state->incoming.empty()) {
+ auto item = state->incoming.top();
+ if (item->embargo <= now) {
+ incoming.push_back(item);
+ state->incoming.pop();
+ } else {
+ if (nextWakeup == std::chrono::steady_clock::time_point()
+ || item->embargo < nextWakeup)
+ nextWakeup = item->embargo;
+ break;
+ }
+ }
quit = state->quit;
}
@@ -540,7 +584,7 @@ struct CurlDownloader : public Downloader
{
auto state(state_.lock());
- state->incoming.clear();
+ while (!state->incoming.empty()) state->incoming.pop();
state->quit = true;
}
}
@@ -556,7 +600,7 @@ struct CurlDownloader : public Downloader
auto state(state_.lock());
if (state->quit)
throw nix::Error("cannot enqueue download request because the download thread is shutting down");
- state->incoming.push_back(item);
+ state->incoming.push(item);
}
writeFull(wakeupPipe.writeSide.get(), " ");
}
@@ -639,9 +683,7 @@ std::future<DownloadResult> Downloader::enqueueDownload(const DownloadRequest &
DownloadResult Downloader::download(const DownloadRequest & request)
{
- return retry<DownloadResult>(request.tries, [&]() {
- return enqueueDownload(request).get();
- });
+ return enqueueDownload(request).get();
}
void Downloader::download(DownloadRequest && request, Sink & sink)
@@ -827,7 +869,7 @@ CachedDownloadResult Downloader::downloadCached(
writeFile(dataFile, url + "\n" + res.etag + "\n" + std::to_string(time(0)) + "\n");
} catch (DownloadError & e) {
if (storePath.empty()) throw;
- warn("%s; using cached result", e.msg());
+ printError(format("warning: %1%; using cached result") % e.msg());
result.etag = expectedETag;
}
}
@@ -878,4 +920,5 @@ bool isUri(const string & s)
return scheme == "http" || scheme == "https" || scheme == "file" || scheme == "channel" || scheme == "git" || scheme == "s3" || scheme == "ssh";
}
+
}