aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEelco Dolstra <eelco.dolstra@logicblox.com>2016-09-16 18:54:14 +0200
committerEelco Dolstra <eelco.dolstra@logicblox.com>2016-09-16 18:54:14 +0200
commit75989bdca773eedb8b8d1cc8a7675900358acd25 (patch)
tree2d1dce1431662f441cead67d8754e96eb4db6807 /src
parent054be5025762c5e1c7e853c4fa5d7eed8da1727f (diff)
Make computeFSClosure() single-threaded again
The fact that queryPathInfo() is synchronous meant that we needed a thread for every concurrent binary cache lookup, even though they end up being handled by the same download thread. Requiring hundreds of threads is not a good idea. So now there is an asynchronous version of queryPathInfo() that takes a callback function to process the result. Similarly, enqueueDownload() now takes a callback rather than returning a future. Thus, a command like nix path-info --store https://cache.nixos.org/ -r /nix/store/slljrzwmpygy1daay14kjszsr9xix063-nixos-16.09beta231.dccf8c5 that returns 4941 paths now takes 1.87s using only 2 threads (the main thread and the downloader thread). (This is with a prewarmed CloudFront.)
Diffstat (limited to 'src')
-rw-r--r--src/libstore/binary-cache-store.cc32
-rw-r--r--src/libstore/binary-cache-store.hh10
-rw-r--r--src/libstore/download.cc38
-rw-r--r--src/libstore/download.hh10
-rw-r--r--src/libstore/http-binary-cache-store.cc25
-rw-r--r--src/libstore/local-binary-cache-store.cc24
-rw-r--r--src/libstore/local-store.cc59
-rw-r--r--src/libstore/local-store.hh4
-rw-r--r--src/libstore/misc.cc104
-rw-r--r--src/libstore/remote-store.cc64
-rw-r--r--src/libstore/remote-store.hh4
-rw-r--r--src/libstore/s3-binary-cache-store.cc56
-rw-r--r--src/libstore/store-api.cc98
-rw-r--r--src/libstore/store-api.hh9
-rw-r--r--src/libutil/util.cc11
-rw-r--r--src/libutil/util.hh39
16 files changed, 385 insertions, 202 deletions
diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc
index e71ea6a57..0ffbd6e55 100644
--- a/src/libstore/binary-cache-store.cc
+++ b/src/libstore/binary-cache-store.cc
@@ -12,6 +12,8 @@
#include <chrono>
+#include <future>
+
namespace nix {
BinaryCacheStore::BinaryCacheStore(const Params & params)
@@ -58,6 +60,19 @@ void BinaryCacheStore::notImpl()
throw Error("operation not implemented for binary cache stores");
}
+std::shared_ptr<std::string> BinaryCacheStore::getFile(const std::string & path)
+{
+ std::promise<std::shared_ptr<std::string>> promise;
+ getFile(path,
+ [&](std::shared_ptr<std::string> result) {
+ promise.set_value(result);
+ },
+ [&](std::exception_ptr exc) {
+ promise.set_exception(exc);
+ });
+ return promise.get_future().get();
+}
+
Path BinaryCacheStore::narInfoFileFor(const Path & storePath)
{
assertStorePath(storePath);
@@ -176,17 +191,22 @@ void BinaryCacheStore::narFromPath(const Path & storePath, Sink & sink)
sink((unsigned char *) nar->c_str(), nar->size());
}
-std::shared_ptr<ValidPathInfo> BinaryCacheStore::queryPathInfoUncached(const Path & storePath)
+void BinaryCacheStore::queryPathInfoUncached(const Path & storePath,
+ std::function<void(std::shared_ptr<ValidPathInfo>)> success,
+ std::function<void(std::exception_ptr exc)> failure)
{
auto narInfoFile = narInfoFileFor(storePath);
- auto data = getFile(narInfoFile);
- if (!data) return 0;
- auto narInfo = make_ref<NarInfo>(*this, *data, narInfoFile);
+ getFile(narInfoFile,
+ [=](std::shared_ptr<std::string> data) {
+ if (!data) return success(0);
- stats.narInfoRead++;
+ stats.narInfoRead++;
- return std::shared_ptr<NarInfo>(narInfo);
+ callSuccess(success, failure, (std::shared_ptr<ValidPathInfo>)
+ std::make_shared<NarInfo>(*this, *data, narInfoFile));
+ },
+ failure);
}
Path BinaryCacheStore::addToStore(const string & name, const Path & srcPath,
diff --git a/src/libstore/binary-cache-store.hh b/src/libstore/binary-cache-store.hh
index 2d10179f3..41671b7d9 100644
--- a/src/libstore/binary-cache-store.hh
+++ b/src/libstore/binary-cache-store.hh
@@ -31,7 +31,11 @@ protected:
/* Return the contents of the specified file, or null if it
doesn't exist. */
- virtual std::shared_ptr<std::string> getFile(const std::string & path) = 0;
+ virtual void getFile(const std::string & path,
+ std::function<void(std::shared_ptr<std::string>)> success,
+ std::function<void(std::exception_ptr exc)> failure) = 0;
+
+ std::shared_ptr<std::string> getFile(const std::string & path);
bool wantMassQuery_ = false;
int priority = 50;
@@ -56,7 +60,9 @@ public:
PathSet queryAllValidPaths() override
{ notImpl(); }
- std::shared_ptr<ValidPathInfo> queryPathInfoUncached(const Path & path) override;
+ void queryPathInfoUncached(const Path & path,
+ std::function<void(std::shared_ptr<ValidPathInfo>)> success,
+ std::function<void(std::exception_ptr exc)> failure) override;
void queryReferrers(const Path & path,
PathSet & referrers) override
diff --git a/src/libstore/download.cc b/src/libstore/download.cc
index b2d223da9..ca324595a 100644
--- a/src/libstore/download.cc
+++ b/src/libstore/download.cc
@@ -47,8 +47,9 @@ struct CurlDownloader : public Downloader
CurlDownloader & downloader;
DownloadRequest request;
DownloadResult result;
- bool done = false; // whether the promise has been set
- std::promise<DownloadResult> promise;
+ bool done = false; // whether either the success or failure function has been called
+ std::function<void(const DownloadResult &)> success;
+ std::function<void(std::exception_ptr exc)> failure;
CURL * req = 0;
bool active = false; // whether the handle has been added to the multi object
std::string status;
@@ -86,7 +87,7 @@ struct CurlDownloader : public Downloader
if (requestHeaders) curl_slist_free_all(requestHeaders);
try {
if (!done)
- fail(DownloadError(Transient, format("download of ‘%s’ was interrupted") % request.uri));
+ fail(DownloadError(Interrupted, format("download of ‘%s’ was interrupted") % request.uri));
} catch (...) {
ignoreException();
}
@@ -95,8 +96,9 @@ struct CurlDownloader : public Downloader
template<class T>
void fail(const T & e)
{
- promise.set_exception(std::make_exception_ptr(e));
+ assert(!done);
done = true;
+ failure(std::make_exception_ptr(e));
}
size_t writeCallback(void * contents, size_t size, size_t nmemb)
@@ -239,7 +241,7 @@ struct CurlDownloader : public Downloader
(httpStatus == 200 || httpStatus == 304 || httpStatus == 226 /* FTP */ || httpStatus == 0 /* other protocol */))
{
result.cached = httpStatus == 304;
- promise.set_value(result);
+ success(result);
done = true;
} else {
Error err =
@@ -253,9 +255,11 @@ struct CurlDownloader : public Downloader
attempt++;
auto exc =
- httpStatus != 0
- ? DownloadError(err, format("unable to download ‘%s’: HTTP error %d") % request.uri % httpStatus)
- : DownloadError(err, format("unable to download ‘%s’: %s (%d)") % request.uri % curl_easy_strerror(code) % code);
+ code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted
+ ? DownloadError(Interrupted, format("download of ‘%s’ was interrupted") % request.uri)
+ : httpStatus != 0
+ ? DownloadError(err, format("unable to download ‘%s’: HTTP error %d") % request.uri % httpStatus)
+ : DownloadError(err, format("unable to download ‘%s’: %s (%d)") % request.uri % curl_easy_strerror(code) % code);
/* If this is a transient error, then maybe retry the
download after a while. */
@@ -414,7 +418,7 @@ struct CurlDownloader : public Downloader
{
try {
workerThreadMain();
- } catch (Interrupted & e) {
+ } catch (nix::Interrupted & e) {
} catch (std::exception & e) {
printMsg(lvlError, format("unexpected error in download thread: %s") % e.what());
}
@@ -437,11 +441,14 @@ struct CurlDownloader : public Downloader
writeFull(wakeupPipe.writeSide.get(), " ");
}
- std::future<DownloadResult> enqueueDownload(const DownloadRequest & request) override
+ void enqueueDownload(const DownloadRequest & request,
+ std::function<void(const DownloadResult &)> success,
+ std::function<void(std::exception_ptr exc)> failure) override
{
auto item = std::make_shared<DownloadItem>(*this, request);
+ item->success = success;
+ item->failure = failure;
enqueueItem(item);
- return item->promise.get_future();
}
};
@@ -458,6 +465,15 @@ ref<Downloader> makeDownloader()
return make_ref<CurlDownloader>();
}
+std::future<DownloadResult> Downloader::enqueueDownload(const DownloadRequest & request)
+{
+ auto promise = std::make_shared<std::promise<DownloadResult>>();
+ enqueueDownload(request,
+ [promise](const DownloadResult & result) { promise->set_value(result); },
+ [promise](std::exception_ptr exc) { promise->set_exception(exc); });
+ return promise->get_future();
+}
+
DownloadResult Downloader::download(const DownloadRequest & request)
{
return enqueueDownload(request).get();
diff --git a/src/libstore/download.hh b/src/libstore/download.hh
index 6b90ff202..82b5d641f 100644
--- a/src/libstore/download.hh
+++ b/src/libstore/download.hh
@@ -23,8 +23,6 @@ struct DownloadRequest
struct DownloadResult
{
- enum Status { Success, NotFound, Forbidden, Misc, Transient };
- Status status;
bool cached;
std::string etag;
std::string effectiveUrl;
@@ -38,7 +36,11 @@ struct Downloader
/* Enqueue a download request, returning a future to the result of
the download. The future may throw a DownloadError
exception. */
- virtual std::future<DownloadResult> enqueueDownload(const DownloadRequest & request) = 0;
+ virtual void enqueueDownload(const DownloadRequest & request,
+ std::function<void(const DownloadResult &)> success,
+ std::function<void(std::exception_ptr exc)> failure) = 0;
+
+ std::future<DownloadResult> enqueueDownload(const DownloadRequest & request);
/* Synchronously download a file. */
DownloadResult download(const DownloadRequest & request);
@@ -50,7 +52,7 @@ struct Downloader
Path downloadCached(ref<Store> store, const string & uri, bool unpack, string name = "",
const Hash & expectedHash = Hash(), string * effectiveUri = nullptr);
- enum Error { NotFound, Forbidden, Misc, Transient };
+ enum Error { NotFound, Forbidden, Misc, Transient, Interrupted };
};
/* Return a shared Downloader object. Using this object is preferred
diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc
index 91ee6fcb6..60728de04 100644
--- a/src/libstore/http-binary-cache-store.cc
+++ b/src/libstore/http-binary-cache-store.cc
@@ -69,18 +69,27 @@ protected:
throw UploadToHTTP("uploading to an HTTP binary cache is not supported");
}
- std::shared_ptr<std::string> getFile(const std::string & path) override
+ void getFile(const std::string & path,
+ std::function<void(std::shared_ptr<std::string>)> success,
+ std::function<void(std::exception_ptr exc)> failure)
{
DownloadRequest request(cacheUri + "/" + path);
request.showProgress = DownloadRequest::no;
request.tries = 8;
- try {
- return getDownloader()->download(request).data;
- } catch (DownloadError & e) {
- if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden)
- return 0;
- throw;
- }
+
+ getDownloader()->enqueueDownload(request,
+ [success](const DownloadResult & result) {
+ success(result.data);
+ },
+ [success, failure](std::exception_ptr exc) {
+ try {
+ std::rethrow_exception(exc);
+ } catch (DownloadError & e) {
+ if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden)
+ success(0);
+ failure(exc);
+ }
+ });
}
};
diff --git a/src/libstore/local-binary-cache-store.cc b/src/libstore/local-binary-cache-store.cc
index 91d2650fe..0f377989b 100644
--- a/src/libstore/local-binary-cache-store.cc
+++ b/src/libstore/local-binary-cache-store.cc
@@ -32,7 +32,19 @@ protected:
void upsertFile(const std::string & path, const std::string & data) override;
- std::shared_ptr<std::string> getFile(const std::string & path) override;
+ void getFile(const std::string & path,
+ std::function<void(std::shared_ptr<std::string>)> success,
+ std::function<void(std::exception_ptr exc)> failure) override
+ {
+ sync2async<std::shared_ptr<std::string>>(success, failure, [&]() {
+ try {
+ return std::make_shared<std::string>(readFile(binaryCacheDir + "/" + path));
+ } catch (SysError & e) {
+ if (e.errNo == ENOENT) return std::shared_ptr<std::string>();
+ throw;
+ }
+ });
+ }
PathSet queryAllValidPaths() override
{
@@ -76,16 +88,6 @@ void LocalBinaryCacheStore::upsertFile(const std::string & path, const std::stri
atomicWrite(binaryCacheDir + "/" + path, data);
}
-std::shared_ptr<std::string> LocalBinaryCacheStore::getFile(const std::string & path)
-{
- try {
- return std::make_shared<std::string>(readFile(binaryCacheDir + "/" + path));
- } catch (SysError & e) {
- if (e.errNo == ENOENT) return 0;
- throw;
- }
-}
-
static RegisterStoreImplementation regStore([](
const std::string & uri, const Store::Params & params)
-> std::shared_ptr<Store>
diff --git a/src/libstore/local-store.cc b/src/libstore/local-store.cc
index 10056f2f1..466cea727 100644
--- a/src/libstore/local-store.cc
+++ b/src/libstore/local-store.cc
@@ -577,49 +577,54 @@ Hash parseHashField(const Path & path, const string & s)
}
-std::shared_ptr<ValidPathInfo> LocalStore::queryPathInfoUncached(const Path & path)
+void LocalStore::queryPathInfoUncached(const Path & path,
+ std::function<void(std::shared_ptr<ValidPathInfo>)> success,
+ std::function<void(std::exception_ptr exc)> failure)
{
- auto info = std::make_shared<ValidPathInfo>();
- info->path = path;
+ sync2async<std::shared_ptr<ValidPathInfo>>(success, failure, [&]() {
- assertStorePath(path);
+ auto info = std::make_shared<ValidPathInfo>();
+ info->path = path;
- return retrySQLite<std::shared_ptr<ValidPathInfo>>([&]() {
- auto state(_state.lock());
+ assertStorePath(path);
+
+ return retrySQLite<std::shared_ptr<ValidPathInfo>>([&]() {
+ auto state(_state.lock());
- /* Get the path info. */
- auto useQueryPathInfo(state->stmtQueryPathInfo.use()(path));
+ /* Get the path info. */
+ auto useQueryPathInfo(state->stmtQueryPathInfo.use()(path));
- if (!useQueryPathInfo.next())
- return std::shared_ptr<ValidPathInfo>();
+ if (!useQueryPathInfo.next())
+ return std::shared_ptr<ValidPathInfo>();
- info->id = useQueryPathInfo.getInt(0);
+ info->id = useQueryPathInfo.getInt(0);
- info->narHash = parseHashField(path, useQueryPathInfo.getStr(1));
+ info->narHash = parseHashField(path, useQueryPathInfo.getStr(1));
- info->registrationTime = useQueryPathInfo.getInt(2);
+ info->registrationTime = useQueryPathInfo.getInt(2);
- auto s = (const char *) sqlite3_column_text(state->stmtQueryPathInfo, 3);
- if (s) info->deriver = s;
+ auto s = (const char *) sqlite3_column_text(state->stmtQueryPathInfo, 3);
+ if (s) info->deriver = s;
- /* Note that narSize = NULL yields 0. */
- info->narSize = useQueryPathInfo.getInt(4);
+ /* Note that narSize = NULL yields 0. */
+ info->narSize = useQueryPathInfo.getInt(4);
- info->ultimate = useQueryPathInfo.getInt(5) == 1;
+ info->ultimate = useQueryPathInfo.getInt(5) == 1;
- s = (const char *) sqlite3_column_text(state->stmtQueryPathInfo, 6);
- if (s) info->sigs = tokenizeString<StringSet>(s, " ");
+ s = (const char *) sqlite3_column_text(state->stmtQueryPathInfo, 6);
+ if (s) info->sigs = tokenizeString<StringSet>(s, " ");
- s = (const char *) sqlite3_column_text(state->stmtQueryPathInfo, 7);
- if (s) info->ca = s;
+ s = (const char *) sqlite3_column_text(state->stmtQueryPathInfo, 7);
+ if (s) info->ca = s;
- /* Get the references. */
- auto useQueryReferences(state->stmtQueryReferences.use()(info->id));
+ /* Get the references. */
+ auto useQueryReferences(state->stmtQueryReferences.use()(info->id));
- while (useQueryReferences.next())
- info->references.insert(useQueryReferences.getStr(0));
+ while (useQueryReferences.next())
+ info->references.insert(useQueryReferences.getStr(0));
- return info;
+ return info;
+ });
});
}
diff --git a/src/libstore/local-store.hh b/src/libstore/local-store.hh
index 5b5960cf2..24188130d 100644
--- a/src/libstore/local-store.hh
+++ b/src/libstore/local-store.hh
@@ -106,7 +106,9 @@ public:
PathSet queryAllValidPaths() override;
- std::shared_ptr<ValidPathInfo> queryPathInfoUncached(const Path & path) override;
+ void queryPathInfoUncached(const Path & path,
+ std::function<void(std::shared_ptr<ValidPathInfo>)> success,
+ std::function<void(std::exception_ptr exc)> failure) override;
void queryReferrers(const Path & path, PathSet & referrers) override;
diff --git a/src/libstore/misc.cc b/src/libstore/misc.cc
index da654ba0d..0c2c49e55 100644
--- a/src/libstore/misc.cc
+++ b/src/libstore/misc.cc
@@ -8,66 +8,90 @@
namespace nix {
-void Store::computeFSClosure(const Path & path,
- PathSet & paths, bool flipDirection, bool includeOutputs, bool includeDerivers)
+void Store::computeFSClosure(const Path & startPath,
+ PathSet & paths_, bool flipDirection, bool includeOutputs, bool includeDerivers)
{
- ThreadPool pool;
+ struct State
+ {
+ size_t pending;
+ PathSet & paths;
+ std::exception_ptr exc;
+ };
- Sync<bool> state_;
+ Sync<State> state_(State{0, paths_, 0});
- std::function<void(Path)> doPath;
+ std::function<void(const Path &)> enqueue;
- doPath = [&](const Path & path) {
+ std::condition_variable done;
+
+ enqueue = [&](const Path & path) -> void {
{
auto state(state_.lock());
- if (paths.count(path)) return;
- paths.insert(path);
+ if (state->exc) return;
+ if (state->paths.count(path)) return;
+ state->paths.insert(path);
+ state->pending++;
}
- auto info = queryPathInfo(path);
+ queryPathInfo(path,
+ [&, path](ref<ValidPathInfo> info) {
+ // FIXME: calls to isValidPath() should be async
- if (flipDirection) {
+ if (flipDirection) {
- PathSet referrers;
- queryReferrers(path, referrers);
- for (auto & ref : referrers)
- if (ref != path)
- pool.enqueue(std::bind(doPath, ref));
+ PathSet referrers;
+ queryReferrers(path, referrers);
+ for (auto & ref : referrers)
+ if (ref != path)
+ enqueue(ref);
- if (includeOutputs) {
- PathSet derivers = queryValidDerivers(path);
- for (auto & i : derivers)
- pool.enqueue(std::bind(doPath, i));
- }
+ if (includeOutputs)
+ for (auto & i : queryValidDerivers(path))
+ enqueue(i);
- if (includeDerivers && isDerivation(path)) {
- PathSet outputs = queryDerivationOutputs(path);
- for (auto & i : outputs)
- if (isValidPath(i) && queryPathInfo(i)->deriver == path)
- pool.enqueue(std::bind(doPath, i));
- }
+ if (includeDerivers && isDerivation(path))
+ for (auto & i : queryDerivationOutputs(path))
+ if (isValidPath(i) && queryPathInfo(i)->deriver == path)
+ enqueue(i);
- } else {
+ } else {
- for (auto & ref : info->references)
- if (ref != path)
- pool.enqueue(std::bind(doPath, ref));
+ for (auto & ref : info->references)
+ if (ref != path)
+ enqueue(ref);
- if (includeOutputs && isDerivation(path)) {
- PathSet outputs = queryDerivationOutputs(path);
- for (auto & i : outputs)
- if (isValidPath(i)) pool.enqueue(std::bind(doPath, i));
- }
+ if (includeOutputs && isDerivation(path))
+ for (auto & i : queryDerivationOutputs(path))
+ if (isValidPath(i)) enqueue(i);
- if (includeDerivers && isValidPath(info->deriver))
- pool.enqueue(std::bind(doPath, info->deriver));
+ if (includeDerivers && isValidPath(info->deriver))
+ enqueue(info->deriver);
- }
+ }
+
+ {
+ auto state(state_.lock());
+ assert(state->pending);
+ if (!--state->pending) done.notify_one();
+ }
+
+ },
+
+ [&, path](std::exception_ptr exc) {
+ auto state(state_.lock());
+ if (!state->exc) state->exc = exc;
+ assert(state->pending);
+ if (!--state->pending) done.notify_one();
+ });
};
- pool.enqueue(std::bind(doPath, path));
+ enqueue(startPath);
- pool.process();
+ {
+ auto state(state_.lock());
+ while (state->pending) state.wait(done);
+ if (state->exc) std::rethrow_exception(state->exc);
+ }
}
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc
index 94075f3b9..7b73557a5 100644
--- a/src/libstore/remote-store.cc
+++ b/src/libstore/remote-store.cc
@@ -246,36 +246,40 @@ void RemoteStore::querySubstitutablePathInfos(const PathSet & paths,
}
-std::shared_ptr<ValidPathInfo> RemoteStore::queryPathInfoUncached(const Path & path)
-{
- auto conn(connections->get());
- conn->to << wopQueryPathInfo << path;
- try {
- conn->processStderr();
- } catch (Error & e) {
- // Ugly backwards compatibility hack.
- if (e.msg().find("is not valid") != std::string::npos)
- throw InvalidPath(e.what());
- throw;
- }
- if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 17) {
- bool valid = readInt(conn->from) != 0;
- if (!valid) throw InvalidPath(format("path ‘%s’ is not valid") % path);
- }
- auto info = std::make_shared<ValidPathInfo>();
- info->path = path;
- info->deriver = readString(conn->from);
- if (info->deriver != "") assertStorePath(info->deriver);
- info->narHash = parseHash(htSHA256, readString(conn->from));
- info->references = readStorePaths<PathSet>(*this, conn->from);
- info->registrationTime = readInt(conn->from);
- info->narSize = readLongLong(conn->from);
- if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) {
- info->ultimate = readInt(conn->from) != 0;
- info->sigs = readStrings<StringSet>(conn->from);
- info->ca = readString(conn->from);
- }
- return info;
+void RemoteStore::queryPathInfoUncached(const Path & path,
+ std::function<void(std::shared_ptr<ValidPathInfo>)> success,
+ std::function<void(std::exception_ptr exc)> failure)
+{
+ sync2async<std::shared_ptr<ValidPathInfo>>(success, failure, [&]() {
+ auto conn(connections->get());
+ conn->to << wopQueryPathInfo << path;
+ try {
+ conn->processStderr();
+ } catch (Error & e) {
+ // Ugly backwards compatibility hack.
+ if (e.msg().find("is not valid") != std::string::npos)
+ throw InvalidPath(e.what());
+ throw;
+ }
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 17) {
+ bool valid = readInt(conn->from) != 0;
+ if (!valid) throw InvalidPath(format("path ‘%s’ is not valid") % path);
+ }
+ auto info = std::make_shared<ValidPathInfo>();
+ info->path = path;
+ info->deriver = readString(conn->from);
+ if (info->deriver != "") assertStorePath(info->deriver);
+ info->narHash = parseHash(htSHA256, readString(conn->from));
+ info->references = readStorePaths<PathSet>(*this, conn->from);
+ info->registrationTime = readInt(conn->from);
+ info->narSize = readLongLong(conn->from);
+ if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) {
+ info->ultimate = readInt(conn->from) != 0;
+ info->sigs = readStrings<StringSet>(conn->from);
+ info->ca = readString(conn->from);
+ }
+ return info;
+ });
}
diff --git a/src/libstore/remote-store.hh b/src/libstore/remote-store.hh
index e756805ea..9879337d6 100644
--- a/src/libstore/remote-store.hh
+++ b/src/libstore/remote-store.hh
@@ -34,7 +34,9 @@ public:
PathSet queryAllValidPaths() override;
- std::shared_ptr<ValidPathInfo> queryPathInfoUncached(const Path & path) override;
+ void queryPathInfoUncached(const Path & path,
+ std::function<void(std::shared_ptr<ValidPathInfo>)> success,
+ std::function<void(std::exception_ptr exc)> failure) override;
void queryReferrers(const Path & path, PathSet & referrers) override;
diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc
index ed95620bb..0722e43d5 100644
--- a/src/libstore/s3-binary-cache-store.cc
+++ b/src/libstore/s3-binary-cache-store.cc
@@ -167,46 +167,50 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
stats.putTimeMs += duration;
}
- std::shared_ptr<std::string> getFile(const std::string & path) override
+ void getFile(const std::string & path,
+ std::function<void(std::shared_ptr<std::string>)> success,
+ std::function<void(std::exception_ptr exc)> failure) override
{
- debug(format("fetching ‘s3://%1%/%2%’...") % bucketName % path);
+ sync2async<std::shared_ptr<std::string>>(success, failure, [&]() {
+ debug(format("fetching ‘s3://%1%/%2%’...") % bucketName % path);
- auto request =
- Aws::S3::Model::GetObjectRequest()
- .WithBucket(bucketName)
- .WithKey(path);
+ auto request =
+ Aws::S3::Model::GetObjectRequest()
+ .WithBucket(bucketName)
+ .WithKey(path);
- request.SetResponseStreamFactory([&]() {
- return Aws::New<std::stringstream>("STRINGSTREAM");
- });
+ request.SetResponseStreamFactory([&]() {
+ return Aws::New<std::stringstream>("STRINGSTREAM");
+ });
- stats.get++;
+ stats.get++;
- try {
+ try {
- auto now1 = std::chrono::steady_clock::now();
+ auto now1 = std::chrono::steady_clock::now();
- auto result = checkAws(format("AWS error fetching ‘%s’") % path,
- client->GetObject(request));
+ auto result = checkAws(format("AWS error fetching ‘%s’") % path,
+ client->GetObject(request));
- auto now2 = std::chrono::steady_clock::now();
+ auto now2 = std::chrono::steady_clock::now();
- auto res = dynamic_cast<std::stringstream &>(result.GetBody()).str();
+ auto res = dynamic_cast<std::stringstream &>(result.GetBody()).str();
- auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
+ auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
- printMsg(lvlTalkative, format("downloaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms")
- % bucketName % path % res.size() % duration);
+ printMsg(lvlTalkative, format("downloaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms")
+ % bucketName % path % res.size() % duration);
- stats.getBytes += res.size();
- stats.getTimeMs += duration;
+ stats.getBytes += res.size();
+ stats.getTimeMs += duration;
- return std::make_shared<std::string>(res);
+ return std::make_shared<std::string>(res);
- } catch (S3Error & e) {
- if (e.err == Aws::S3::S3Errors::NO_SUCH_KEY) return 0;
- throw;
- }
+ } catch (S3Error & e) {
+ if (e.err == Aws::S3::S3Errors::NO_SUCH_KEY) return std::shared_ptr<std::string>();
+ throw;
+ }
+ });
}
PathSet queryAllValidPaths() override
diff --git a/src/libstore/store-api.cc b/src/libstore/store-api.cc
index 5dd56f905..427170843 100644
--- a/src/libstore/store-api.cc
+++ b/src/libstore/store-api.cc
@@ -4,6 +4,8 @@
#include "util.hh"
#include "nar-info-disk-cache.hh"
+#include <future>
+
namespace nix {
@@ -283,51 +285,79 @@ bool Store::isValidPath(const Path & storePath)
ref<const ValidPathInfo> Store::queryPathInfo(const Path & storePath)
{
+ std::promise<ref<ValidPathInfo>> promise;
+
+ queryPathInfo(storePath,
+ [&](ref<ValidPathInfo> info) {
+ promise.set_value(info);
+ },
+ [&](std::exception_ptr exc) {
+ promise.set_exception(exc);
+ });
+
+ return promise.get_future().get();
+}
+
+
+void Store::queryPathInfo(const Path & storePath,
+ std::function<void(ref<ValidPathInfo>)> success,
+ std::function<void(std::exception_ptr exc)> failure)
+{
auto hashPart = storePathToHash(storePath);
- {
- auto state_(state.lock());
- auto res = state_->pathInfoCache.get(hashPart);
- if (res) {
- stats.narInfoReadAverted++;
- if (!*res)
- throw InvalidPath(format("path ‘%s’ is not valid") % storePath);
- return ref<ValidPathInfo>(*res);
+ try {
+
+ {
+ auto res = state.lock()->pathInfoCache.get(hashPart);
+ if (res) {
+ stats.narInfoReadAverted++;
+ if (!*res)
+ throw InvalidPath(format("path ‘%s’ is not valid") % storePath);
+ return success(ref<ValidPathInfo>(*res));
+ }
}
- }
- if (diskCache) {
- auto res = diskCache->lookupNarInfo(getUri(), hashPart);
- if (res.first != NarInfoDiskCache::oUnknown) {
- stats.narInfoReadAverted++;
- auto state_(state.lock());
- state_->pathInfoCache.upsert(hashPart,
- res.first == NarInfoDiskCache::oInvalid ? 0 : res.second);
- if (res.first == NarInfoDiskCache::oInvalid ||
- (res.second->path != storePath && storePathToName(storePath) != ""))
- throw InvalidPath(format("path ‘%s’ is not valid") % storePath);
- return ref<ValidPathInfo>(res.second);
+ if (diskCache) {
+ auto res = diskCache->lookupNarInfo(getUri(), hashPart);
+ if (res.first != NarInfoDiskCache::oUnknown) {
+ stats.narInfoReadAverted++;
+ {
+ auto state_(state.lock());
+ state_->pathInfoCache.upsert(hashPart,
+ res.first == NarInfoDiskCache::oInvalid ? 0 : res.second);
+ if (res.first == NarInfoDiskCache::oInvalid ||
+ (res.second->path != storePath && storePathToName(storePath) != ""))
+ throw InvalidPath(format("path ‘%s’ is not valid") % storePath);
+ }
+ return success(ref<ValidPathInfo>(res.second));
+ }
}
+
+ } catch (std::exception & e) {
+ return callFailure(failure);
}
- auto info = queryPathInfoUncached(storePath);
+ queryPathInfoUncached(storePath,
+ [this, storePath, hashPart, success, failure](std::shared_ptr<ValidPathInfo> info) {
- if (diskCache)
- diskCache->upsertNarInfo(getUri(), hashPart, info);
+ if (diskCache)
+ diskCache->upsertNarInfo(getUri(), hashPart, info);
- {
- auto state_(state.lock());
- state_->pathInfoCache.upsert(hashPart, info);
- }
+ {
+ auto state_(state.lock());
+ state_->pathInfoCache.upsert(hashPart, info);
+ }
- if (!info
- || (info->path != storePath && storePathToName(storePath) != ""))
- {
- stats.narInfoMissing++;
- throw InvalidPath(format("path ‘%s’ is not valid") % storePath);
- }
+ if (!info
+ || (info->path != storePath && storePathToName(storePath) != ""))
+ {
+ stats.narInfoMissing++;
+ return failure(std::make_exception_ptr(InvalidPath(format("path ‘%s’ is not valid") % storePath)));
+ }
+
+ callSuccess(success, failure, ref<ValidPathInfo>(info));
- return ref<ValidPathInfo>(info);
+ }, failure);
}
diff --git a/src/libstore/store-api.hh b/src/libstore/store-api.hh
index 41fc58fc4..cba4deaad 100644
--- a/src/libstore/store-api.hh
+++ b/src/libstore/store-api.hh
@@ -319,9 +319,16 @@ public:
the name part of the store path. */
ref<const ValidPathInfo> queryPathInfo(const Path & path);
+ /* Asynchronous version of queryPathInfo(). */
+ void queryPathInfo(const Path & path,
+ std::function<void(ref<ValidPathInfo>)> success,
+ std::function<void(std::exception_ptr exc)> failure);
+
protected:
- virtual std::shared_ptr<ValidPathInfo> queryPathInfoUncached(const Path & path) = 0;
+ virtual void queryPathInfoUncached(const Path & path,
+ std::function<void(std::shared_ptr<ValidPathInfo>)> success,
+ std::function<void(std::exception_ptr exc)> failure) = 0;
public:
diff --git a/src/libutil/util.cc b/src/libutil/util.cc
index 8e029fb48..1750e0373 100644
--- a/src/libutil/util.cc
+++ b/src/libutil/util.cc
@@ -1215,4 +1215,15 @@ string base64Decode(const string & s)
}
+void callFailure(const std::function<void(std::exception_ptr exc)> & failure)
+{
+ try {
+ failure(std::current_exception());
+ } catch (std::exception & e) {
+ printMsg(lvlError, format("uncaught exception: %s") % e.what());
+ abort();
+ }
+}
+
+
}
diff --git a/src/libutil/util.hh b/src/libutil/util.hh
index 9bf548326..182a38fb3 100644
--- a/src/libutil/util.hh
+++ b/src/libutil/util.hh
@@ -376,4 +376,43 @@ string get(const T & map, const string & key, const string & def = "")
}
+/* Call ‘failure’ with the current exception as argument. If ‘failure’
+ throws an exception, abort the program. */
+void callFailure(const std::function<void(std::exception_ptr exc)> & failure);
+
+
+/* Evaluate the function ‘f’. If it returns a value, call ‘success’
+ with that value as its argument. If it or ‘success’ throws an
+ exception, call ‘failure’. If ‘failure’ throws an exception, abort
+ the program. */
+template<class T>
+void sync2async(
+ const std::function<void(T)> & success,
+ const std::function<void(std::exception_ptr exc)> & failure,
+ const std::function<T()> & f)
+{
+ try {
+ success(f());
+ } catch (...) {
+ callFailure(failure);
+ }
+}
+
+
+/* Call the function ‘success’. If it throws an exception, call
+ ‘failure’. If that throws an exception, abort the program. */
+template<class T>
+void callSuccess(
+ const std::function<void(T)> & success,
+ const std::function<void(std::exception_ptr exc)> & failure,
+ T && arg)
+{
+ try {
+ success(arg);
+ } catch (...) {
+ callFailure(failure);
+ }
+}
+
+
}