aboutsummaryrefslogtreecommitdiff
path: root/src/libstore
diff options
context:
space:
mode:
authorEelco Dolstra <edolstra@gmail.com>2019-06-24 21:48:52 +0200
committerEelco Dolstra <edolstra@gmail.com>2019-06-24 21:50:08 +0200
commita67cf5a3585c41dd9f219a2c7aa9cf67fa69520b (patch)
tree19cd95e120f118d66d241e50348c60384273e68c /src/libstore
parent15fa70cd1b853f5e62662b99ccb9ef3da6cfadff (diff)
Fix 'error 9 while decompressing xz file'
Once we've started writing data to a Sink, we can't restart a download request, because then we end up writing duplicate data to the Sink. Therefore we shouldn't handle retries in Downloader but at a higher level (in particular, in copyStorePath()). Fixes #2952.
Diffstat (limited to 'src/libstore')
-rw-r--r--src/libstore/binary-cache-store.cc18
-rw-r--r--src/libstore/download.cc67
-rw-r--r--src/libstore/download.hh11
-rw-r--r--src/libstore/http-binary-cache-store.cc57
-rw-r--r--src/libstore/store-api.cc94
5 files changed, 122 insertions, 125 deletions
diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc
index 4527ee6ba..8b736056e 100644
--- a/src/libstore/binary-cache-store.cc
+++ b/src/libstore/binary-cache-store.cc
@@ -10,6 +10,8 @@
#include "nar-info-disk-cache.hh"
#include "nar-accessor.hh"
#include "json.hh"
+#include "retry.hh"
+#include "download.hh"
#include <chrono>
@@ -79,13 +81,15 @@ void BinaryCacheStore::getFile(const std::string & path, Sink & sink)
std::shared_ptr<std::string> BinaryCacheStore::getFile(const std::string & path)
{
- StringSink sink;
- try {
- getFile(path, sink);
- } catch (NoSuchBinaryCacheFile &) {
- return nullptr;
- }
- return sink.s;
+ return retry<std::shared_ptr<std::string>>(downloadSettings.tries, [&]() -> std::shared_ptr<std::string> {
+ StringSink sink;
+ try {
+ getFile(path, sink);
+ } catch (NoSuchBinaryCacheFile &) {
+ return nullptr;
+ }
+ return sink.s;
+ });
}
Path BinaryCacheStore::narInfoFileFor(const Path & storePath)
diff --git a/src/libstore/download.cc b/src/libstore/download.cc
index 70cfaf0dd..cf79b2af5 100644
--- a/src/libstore/download.cc
+++ b/src/libstore/download.cc
@@ -8,6 +8,7 @@
#include "compression.hh"
#include "pathlocks.hh"
#include "finally.hh"
+#include "retry.hh"
#ifdef ENABLE_S3
#include <aws/core/client/ClientConfiguration.h>
@@ -19,11 +20,9 @@
#include <curl/curl.h>
#include <algorithm>
-#include <cmath>
#include <cstring>
#include <iostream>
#include <queue>
-#include <random>
#include <thread>
using namespace std::string_literals;
@@ -46,9 +45,6 @@ struct CurlDownloader : public Downloader
{
CURLM * curlm = 0;
- std::random_device rd;
- std::mt19937 mt19937;
-
struct DownloadItem : public std::enable_shared_from_this<DownloadItem>
{
CurlDownloader & downloader;
@@ -61,12 +57,6 @@ struct CurlDownloader : public Downloader
bool active = false; // whether the handle has been added to the multi object
std::string status;
- unsigned int attempt = 0;
-
- /* Don't start this download until the specified time point
- has been reached. */
- std::chrono::steady_clock::time_point embargo;
-
struct curl_slist * requestHeaders = 0;
std::string encoding;
@@ -385,9 +375,7 @@ struct CurlDownloader : public Downloader
}
}
- attempt++;
-
- auto exc =
+ fail(
code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted
? DownloadError(Interrupted, fmt("%s of '%s' was interrupted", request.verb(), request.uri))
: httpStatus != 0
@@ -398,31 +386,15 @@ struct CurlDownloader : public Downloader
)
: DownloadError(err,
fmt("unable to %s '%s': %s (%d)",
- request.verb(), request.uri, curl_easy_strerror(code), code));
-
- /* If this is a transient error, then maybe retry the
- download after a while. */
- if (err == Transient && attempt < request.tries) {
- int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(downloader.mt19937));
- printError(format("warning: %s; retrying in %d ms") % exc.what() % ms);
- embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms);
- downloader.enqueueItem(shared_from_this());
- }
- else
- fail(exc);
+ request.verb(), request.uri, curl_easy_strerror(code), code)));
}
}
};
struct State
{
- struct EmbargoComparator {
- bool operator() (const std::shared_ptr<DownloadItem> & i1, const std::shared_ptr<DownloadItem> & i2) {
- return i1->embargo > i2->embargo;
- }
- };
bool quit = false;
- std::priority_queue<std::shared_ptr<DownloadItem>, std::vector<std::shared_ptr<DownloadItem>>, EmbargoComparator> incoming;
+ std::vector<std::shared_ptr<DownloadItem>> incoming;
};
Sync<State> state_;
@@ -435,7 +407,6 @@ struct CurlDownloader : public Downloader
std::thread workerThread;
CurlDownloader()
- : mt19937(rd())
{
static std::once_flag globalInit;
std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL);
@@ -529,9 +500,7 @@ struct CurlDownloader : public Downloader
nextWakeup = std::chrono::steady_clock::time_point();
- /* Add new curl requests from the incoming requests queue,
- except for requests that are embargoed (waiting for a
- retry timeout to expire). */
+ /* Add new curl requests from the incoming requests queue. */
if (extraFDs[0].revents & CURL_WAIT_POLLIN) {
char buf[1024];
auto res = read(extraFDs[0].fd, buf, sizeof(buf));
@@ -540,22 +509,9 @@ struct CurlDownloader : public Downloader
}
std::vector<std::shared_ptr<DownloadItem>> incoming;
- auto now = std::chrono::steady_clock::now();
-
{
auto state(state_.lock());
- while (!state->incoming.empty()) {
- auto item = state->incoming.top();
- if (item->embargo <= now) {
- incoming.push_back(item);
- state->incoming.pop();
- } else {
- if (nextWakeup == std::chrono::steady_clock::time_point()
- || item->embargo < nextWakeup)
- nextWakeup = item->embargo;
- break;
- }
- }
+ std::swap(state->incoming, incoming);
quit = state->quit;
}
@@ -582,7 +538,7 @@ struct CurlDownloader : public Downloader
{
auto state(state_.lock());
- while (!state->incoming.empty()) state->incoming.pop();
+ state->incoming.clear();
state->quit = true;
}
}
@@ -598,7 +554,7 @@ struct CurlDownloader : public Downloader
auto state(state_.lock());
if (state->quit)
throw nix::Error("cannot enqueue download request because the download thread is shutting down");
- state->incoming.push(item);
+ state->incoming.push_back(item);
}
writeFull(wakeupPipe.writeSide.get(), " ");
}
@@ -681,7 +637,9 @@ std::future<DownloadResult> Downloader::enqueueDownload(const DownloadRequest &
DownloadResult Downloader::download(const DownloadRequest & request)
{
- return enqueueDownload(request).get();
+ return retry<DownloadResult>(request.tries, [&]() {
+ return enqueueDownload(request).get();
+ });
}
void Downloader::download(DownloadRequest && request, Sink & sink)
@@ -868,7 +826,7 @@ CachedDownloadResult Downloader::downloadCached(
writeFile(dataFile, url + "\n" + res.etag + "\n" + std::to_string(time(0)) + "\n");
} catch (DownloadError & e) {
if (storePath.empty()) throw;
- printError(format("warning: %1%; using cached result") % e.msg());
+ warn("%s; using cached result", e.msg());
result.etag = expectedETag;
}
}
@@ -931,5 +889,4 @@ bool isUri(const string & s)
return scheme == "http" || scheme == "https" || scheme == "file" || scheme == "channel" || scheme == "git" || scheme == "s3" || scheme == "ssh";
}
-
}
diff --git a/src/libstore/download.hh b/src/libstore/download.hh
index c095ad053..7548b83ae 100644
--- a/src/libstore/download.hh
+++ b/src/libstore/download.hh
@@ -99,11 +99,13 @@ struct Downloader
std::future<DownloadResult> enqueueDownload(const DownloadRequest & request);
- /* Synchronously download a file. */
+ /* Synchronously download a file. The request will be retried in
+ case of transient failures. */
DownloadResult download(const DownloadRequest & request);
/* Download a file, writing its data to a sink. The sink will be
- invoked on the thread of the caller. */
+ invoked on the thread of the caller. The request will not be
+ retried in case of transient failures. */
void download(DownloadRequest && request, Sink & sink);
/* Check if the specified file is already in ~/.cache/nix/tarballs
@@ -129,6 +131,11 @@ public:
DownloadError(Downloader::Error error, const FormatOrString & fs)
: Error(fs), error(error)
{ }
+
+ bool isTransient() override
+ {
+ return error == Downloader::Error::Transient;
+ }
};
bool isUri(const string & s);
diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc
index 11c34fdac..ff2c10354 100644
--- a/src/libstore/http-binary-cache-store.cc
+++ b/src/libstore/http-binary-cache-store.cc
@@ -2,6 +2,7 @@
#include "download.hh"
#include "globals.hh"
#include "nar-info-disk-cache.hh"
+#include "retry.hh"
namespace nix {
@@ -113,7 +114,6 @@ protected:
DownloadRequest makeRequest(const std::string & path)
{
DownloadRequest request(cacheUri + "/" + path);
- request.tries = 8;
return request;
}
@@ -136,21 +136,46 @@ protected:
{
checkEnabled();
- auto request(makeRequest(path));
-
- getDownloader()->enqueueDownload(request,
- {[callback, this](std::future<DownloadResult> result) {
- try {
- callback(result.get().data);
- } catch (DownloadError & e) {
- if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden)
- return callback(std::shared_ptr<std::string>());
- maybeDisable();
- callback.rethrow();
- } catch (...) {
- callback.rethrow();
- }
- }});
+ struct State
+ {
+ DownloadRequest request;
+ std::function<void()> tryDownload;
+ unsigned int attempt = 0;
+ State(DownloadRequest && request) : request(request) {}
+ };
+
+ auto state = std::make_shared<State>(makeRequest(path));
+
+ state->tryDownload = [callback, state, this]() {
+ getDownloader()->enqueueDownload(state->request,
+ {[callback, state, this](std::future<DownloadResult> result) {
+ try {
+ callback(result.get().data);
+ } catch (DownloadError & e) {
+ if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden)
+ return callback(std::shared_ptr<std::string>());
+ ++state->attempt;
+ if (state->attempt < state->request.tries && e.isTransient()) {
+ auto ms = retrySleepTime(state->attempt);
+ warn("%s; retrying in %d ms", e.what(), ms);
+ /* We can't sleep here because that would
+ block the download thread. So use a
+ separate thread for sleeping. */
+ std::thread([state, ms]() {
+ std::this_thread::sleep_for(std::chrono::milliseconds(ms));
+ state->tryDownload();
+ }).detach();
+ } else {
+ maybeDisable();
+ callback.rethrow();
+ }
+ } catch (...) {
+ callback.rethrow();
+ }
+ }});
+ };
+
+ state->tryDownload();
}
};
diff --git a/src/libstore/store-api.cc b/src/libstore/store-api.cc
index c5a771030..f577799da 100644
--- a/src/libstore/store-api.cc
+++ b/src/libstore/store-api.cc
@@ -6,10 +6,11 @@
#include "thread-pool.hh"
#include "json.hh"
#include "derivations.hh"
+#include "retry.hh"
+#include "download.hh"
#include <future>
-
namespace nix {
@@ -572,54 +573,57 @@ void Store::buildPaths(const PathSet & paths, BuildMode buildMode)
void copyStorePath(ref<Store> srcStore, ref<Store> dstStore,
const Path & storePath, RepairFlag repair, CheckSigsFlag checkSigs)
{
- auto srcUri = srcStore->getUri();
- auto dstUri = dstStore->getUri();
-
- Activity act(*logger, lvlInfo, actCopyPath,
- srcUri == "local" || srcUri == "daemon"
- ? fmt("copying path '%s' to '%s'", storePath, dstUri)
- : dstUri == "local" || dstUri == "daemon"
- ? fmt("copying path '%s' from '%s'", storePath, srcUri)
- : fmt("copying path '%s' from '%s' to '%s'", storePath, srcUri, dstUri),
- {storePath, srcUri, dstUri});
- PushActivity pact(act.id);
-
- auto info = srcStore->queryPathInfo(storePath);
-
- uint64_t total = 0;
-
- if (!info->narHash) {
- StringSink sink;
- srcStore->narFromPath({storePath}, sink);
- auto info2 = make_ref<ValidPathInfo>(*info);
- info2->narHash = hashString(htSHA256, *sink.s);
- if (!info->narSize) info2->narSize = sink.s->size();
- if (info->ultimate) info2->ultimate = false;
- info = info2;
-
- StringSource source(*sink.s);
- dstStore->addToStore(*info, source, repair, checkSigs);
- return;
- }
+ retry<void>(downloadSettings.tries, [&]() {
- if (info->ultimate) {
- auto info2 = make_ref<ValidPathInfo>(*info);
- info2->ultimate = false;
- info = info2;
- }
+ auto srcUri = srcStore->getUri();
+ auto dstUri = dstStore->getUri();
+
+ Activity act(*logger, lvlInfo, actCopyPath,
+ srcUri == "local" || srcUri == "daemon"
+ ? fmt("copying path '%s' to '%s'", storePath, dstUri)
+ : dstUri == "local" || dstUri == "daemon"
+ ? fmt("copying path '%s' from '%s'", storePath, srcUri)
+ : fmt("copying path '%s' from '%s' to '%s'", storePath, srcUri, dstUri),
+ {storePath, srcUri, dstUri});
+ PushActivity pact(act.id);
+
+ auto info = srcStore->queryPathInfo(storePath);
+
+ uint64_t total = 0;
+
+ if (!info->narHash) {
+ StringSink sink;
+ srcStore->narFromPath({storePath}, sink);
+ auto info2 = make_ref<ValidPathInfo>(*info);
+ info2->narHash = hashString(htSHA256, *sink.s);
+ if (!info->narSize) info2->narSize = sink.s->size();
+ if (info->ultimate) info2->ultimate = false;
+ info = info2;
- auto source = sinkToSource([&](Sink & sink) {
- LambdaSink wrapperSink([&](const unsigned char * data, size_t len) {
- sink(data, len);
- total += len;
- act.progress(total, info->narSize);
+ StringSource source(*sink.s);
+ dstStore->addToStore(*info, source, repair, checkSigs);
+ return;
+ }
+
+ if (info->ultimate) {
+ auto info2 = make_ref<ValidPathInfo>(*info);
+ info2->ultimate = false;
+ info = info2;
+ }
+
+ auto source = sinkToSource([&](Sink & sink) {
+ LambdaSink wrapperSink([&](const unsigned char * data, size_t len) {
+ sink(data, len);
+ total += len;
+ act.progress(total, info->narSize);
+ });
+ srcStore->narFromPath({storePath}, wrapperSink);
+ }, [&]() {
+ throw EndOfFile("NAR for '%s' fetched from '%s' is incomplete", storePath, srcStore->getUri());
});
- srcStore->narFromPath({storePath}, wrapperSink);
- }, [&]() {
- throw EndOfFile("NAR for '%s' fetched from '%s' is incomplete", storePath, srcStore->getUri());
- });
- dstStore->addToStore(*info, *source, repair, checkSigs);
+ dstStore->addToStore(*info, *source, repair, checkSigs);
+ });
}