diff options
author | Eelco Dolstra <eelco.dolstra@logicblox.com> | 2016-09-16 18:54:14 +0200 |
---|---|---|
committer | Eelco Dolstra <eelco.dolstra@logicblox.com> | 2016-09-16 18:54:14 +0200 |
commit | 75989bdca773eedb8b8d1cc8a7675900358acd25 (patch) | |
tree | 2d1dce1431662f441cead67d8754e96eb4db6807 /src | |
parent | 054be5025762c5e1c7e853c4fa5d7eed8da1727f (diff) |
Make computeFSClosure() single-threaded again
The fact that queryPathInfo() is synchronous meant that we needed a
thread for every concurrent binary cache lookup, even though they end
up being handled by the same download thread. Requiring hundreds of
threads is not a good idea. So now there is an asynchronous version of
queryPathInfo() that takes a callback function to process the
result. Similarly, enqueueDownload() now takes a callback rather than
returning a future.
Thus, a command like
nix path-info --store https://cache.nixos.org/ -r /nix/store/slljrzwmpygy1daay14kjszsr9xix063-nixos-16.09beta231.dccf8c5
that returns 4941 paths now takes 1.87s using only 2 threads (the main
thread and the downloader thread). (This is with a prewarmed
CloudFront.)
Diffstat (limited to 'src')
-rw-r--r-- | src/libstore/binary-cache-store.cc | 32 | ||||
-rw-r--r-- | src/libstore/binary-cache-store.hh | 10 | ||||
-rw-r--r-- | src/libstore/download.cc | 38 | ||||
-rw-r--r-- | src/libstore/download.hh | 10 | ||||
-rw-r--r-- | src/libstore/http-binary-cache-store.cc | 25 | ||||
-rw-r--r-- | src/libstore/local-binary-cache-store.cc | 24 | ||||
-rw-r--r-- | src/libstore/local-store.cc | 59 | ||||
-rw-r--r-- | src/libstore/local-store.hh | 4 | ||||
-rw-r--r-- | src/libstore/misc.cc | 104 | ||||
-rw-r--r-- | src/libstore/remote-store.cc | 64 | ||||
-rw-r--r-- | src/libstore/remote-store.hh | 4 | ||||
-rw-r--r-- | src/libstore/s3-binary-cache-store.cc | 56 | ||||
-rw-r--r-- | src/libstore/store-api.cc | 98 | ||||
-rw-r--r-- | src/libstore/store-api.hh | 9 | ||||
-rw-r--r-- | src/libutil/util.cc | 11 | ||||
-rw-r--r-- | src/libutil/util.hh | 39 |
16 files changed, 385 insertions, 202 deletions
diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc index e71ea6a57..0ffbd6e55 100644 --- a/src/libstore/binary-cache-store.cc +++ b/src/libstore/binary-cache-store.cc @@ -12,6 +12,8 @@ #include <chrono> +#include <future> + namespace nix { BinaryCacheStore::BinaryCacheStore(const Params & params) @@ -58,6 +60,19 @@ void BinaryCacheStore::notImpl() throw Error("operation not implemented for binary cache stores"); } +std::shared_ptr<std::string> BinaryCacheStore::getFile(const std::string & path) +{ + std::promise<std::shared_ptr<std::string>> promise; + getFile(path, + [&](std::shared_ptr<std::string> result) { + promise.set_value(result); + }, + [&](std::exception_ptr exc) { + promise.set_exception(exc); + }); + return promise.get_future().get(); +} + Path BinaryCacheStore::narInfoFileFor(const Path & storePath) { assertStorePath(storePath); @@ -176,17 +191,22 @@ void BinaryCacheStore::narFromPath(const Path & storePath, Sink & sink) sink((unsigned char *) nar->c_str(), nar->size()); } -std::shared_ptr<ValidPathInfo> BinaryCacheStore::queryPathInfoUncached(const Path & storePath) +void BinaryCacheStore::queryPathInfoUncached(const Path & storePath, + std::function<void(std::shared_ptr<ValidPathInfo>)> success, + std::function<void(std::exception_ptr exc)> failure) { auto narInfoFile = narInfoFileFor(storePath); - auto data = getFile(narInfoFile); - if (!data) return 0; - auto narInfo = make_ref<NarInfo>(*this, *data, narInfoFile); + getFile(narInfoFile, + [=](std::shared_ptr<std::string> data) { + if (!data) return success(0); - stats.narInfoRead++; + stats.narInfoRead++; - return std::shared_ptr<NarInfo>(narInfo); + callSuccess(success, failure, (std::shared_ptr<ValidPathInfo>) + std::make_shared<NarInfo>(*this, *data, narInfoFile)); + }, + failure); } Path BinaryCacheStore::addToStore(const string & name, const Path & srcPath, diff --git a/src/libstore/binary-cache-store.hh b/src/libstore/binary-cache-store.hh index 2d10179f3..41671b7d9 100644 --- a/src/libstore/binary-cache-store.hh +++ b/src/libstore/binary-cache-store.hh @@ -31,7 +31,11 @@ protected: /* Return the contents of the specified file, or null if it doesn't exist. */ - virtual std::shared_ptr<std::string> getFile(const std::string & path) = 0; + virtual void getFile(const std::string & path, + std::function<void(std::shared_ptr<std::string>)> success, + std::function<void(std::exception_ptr exc)> failure) = 0; + + std::shared_ptr<std::string> getFile(const std::string & path); bool wantMassQuery_ = false; int priority = 50; @@ -56,7 +60,9 @@ public: PathSet queryAllValidPaths() override { notImpl(); } - std::shared_ptr<ValidPathInfo> queryPathInfoUncached(const Path & path) override; + void queryPathInfoUncached(const Path & path, + std::function<void(std::shared_ptr<ValidPathInfo>)> success, + std::function<void(std::exception_ptr exc)> failure) override; void queryReferrers(const Path & path, PathSet & referrers) override diff --git a/src/libstore/download.cc b/src/libstore/download.cc index b2d223da9..ca324595a 100644 --- a/src/libstore/download.cc +++ b/src/libstore/download.cc @@ -47,8 +47,9 @@ struct CurlDownloader : public Downloader CurlDownloader & downloader; DownloadRequest request; DownloadResult result; - bool done = false; // whether the promise has been set - std::promise<DownloadResult> promise; + bool done = false; // whether either the success or failure function has been called + std::function<void(const DownloadResult &)> success; + std::function<void(std::exception_ptr exc)> failure; CURL * req = 0; bool active = false; // whether the handle has been added to the multi object std::string status; @@ -86,7 +87,7 @@ struct CurlDownloader : public Downloader if (requestHeaders) curl_slist_free_all(requestHeaders); try { if (!done) - fail(DownloadError(Transient, format("download of ‘%s’ was interrupted") % request.uri)); + fail(DownloadError(Interrupted, format("download of ‘%s’ was interrupted") % request.uri)); } catch (...) { ignoreException(); } @@ -95,8 +96,9 @@ struct CurlDownloader : public Downloader template<class T> void fail(const T & e) { - promise.set_exception(std::make_exception_ptr(e)); + assert(!done); done = true; + failure(std::make_exception_ptr(e)); } size_t writeCallback(void * contents, size_t size, size_t nmemb) @@ -239,7 +241,7 @@ struct CurlDownloader : public Downloader (httpStatus == 200 || httpStatus == 304 || httpStatus == 226 /* FTP */ || httpStatus == 0 /* other protocol */)) { result.cached = httpStatus == 304; - promise.set_value(result); + success(result); done = true; } else { Error err = @@ -253,9 +255,11 @@ struct CurlDownloader : public Downloader attempt++; auto exc = - httpStatus != 0 - ? DownloadError(err, format("unable to download ‘%s’: HTTP error %d") % request.uri % httpStatus) - : DownloadError(err, format("unable to download ‘%s’: %s (%d)") % request.uri % curl_easy_strerror(code) % code); + code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted + ? DownloadError(Interrupted, format("download of ‘%s’ was interrupted") % request.uri) + : httpStatus != 0 + ? DownloadError(err, format("unable to download ‘%s’: HTTP error %d") % request.uri % httpStatus) + : DownloadError(err, format("unable to download ‘%s’: %s (%d)") % request.uri % curl_easy_strerror(code) % code); /* If this is a transient error, then maybe retry the download after a while. */ @@ -414,7 +418,7 @@ struct CurlDownloader : public Downloader { try { workerThreadMain(); - } catch (Interrupted & e) { + } catch (nix::Interrupted & e) { } catch (std::exception & e) { printMsg(lvlError, format("unexpected error in download thread: %s") % e.what()); } @@ -437,11 +441,14 @@ struct CurlDownloader : public Downloader writeFull(wakeupPipe.writeSide.get(), " "); } - std::future<DownloadResult> enqueueDownload(const DownloadRequest & request) override + void enqueueDownload(const DownloadRequest & request, + std::function<void(const DownloadResult &)> success, + std::function<void(std::exception_ptr exc)> failure) override { auto item = std::make_shared<DownloadItem>(*this, request); + item->success = success; + item->failure = failure; enqueueItem(item); - return item->promise.get_future(); } }; @@ -458,6 +465,15 @@ ref<Downloader> makeDownloader() return make_ref<CurlDownloader>(); } +std::future<DownloadResult> Downloader::enqueueDownload(const DownloadRequest & request) +{ + auto promise = std::make_shared<std::promise<DownloadResult>>(); + enqueueDownload(request, + [promise](const DownloadResult & result) { promise->set_value(result); }, + [promise](std::exception_ptr exc) { promise->set_exception(exc); }); + return promise->get_future(); +} + DownloadResult Downloader::download(const DownloadRequest & request) { return enqueueDownload(request).get(); diff --git a/src/libstore/download.hh b/src/libstore/download.hh index 6b90ff202..82b5d641f 100644 --- a/src/libstore/download.hh +++ b/src/libstore/download.hh @@ -23,8 +23,6 @@ struct DownloadRequest struct DownloadResult { - enum Status { Success, NotFound, Forbidden, Misc, Transient }; - Status status; bool cached; std::string etag; std::string effectiveUrl; @@ -38,7 +36,11 @@ struct Downloader /* Enqueue a download request, returning a future to the result of the download. The future may throw a DownloadError exception. */ - virtual std::future<DownloadResult> enqueueDownload(const DownloadRequest & request) = 0; + virtual void enqueueDownload(const DownloadRequest & request, + std::function<void(const DownloadResult &)> success, + std::function<void(std::exception_ptr exc)> failure) = 0; + + std::future<DownloadResult> enqueueDownload(const DownloadRequest & request); /* Synchronously download a file. */ DownloadResult download(const DownloadRequest & request); @@ -50,7 +52,7 @@ struct Downloader Path downloadCached(ref<Store> store, const string & uri, bool unpack, string name = "", const Hash & expectedHash = Hash(), string * effectiveUri = nullptr); - enum Error { NotFound, Forbidden, Misc, Transient }; + enum Error { NotFound, Forbidden, Misc, Transient, Interrupted }; }; /* Return a shared Downloader object. Using this object is preferred diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc index 91ee6fcb6..60728de04 100644 --- a/src/libstore/http-binary-cache-store.cc +++ b/src/libstore/http-binary-cache-store.cc @@ -69,18 +69,27 @@ protected: throw UploadToHTTP("uploading to an HTTP binary cache is not supported"); } - std::shared_ptr<std::string> getFile(const std::string & path) override + void getFile(const std::string & path, + std::function<void(std::shared_ptr<std::string>)> success, + std::function<void(std::exception_ptr exc)> failure) { DownloadRequest request(cacheUri + "/" + path); request.showProgress = DownloadRequest::no; request.tries = 8; - try { - return getDownloader()->download(request).data; - } catch (DownloadError & e) { - if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) - return 0; - throw; - } + + getDownloader()->enqueueDownload(request, + [success](const DownloadResult & result) { + success(result.data); + }, + [success, failure](std::exception_ptr exc) { + try { + std::rethrow_exception(exc); + } catch (DownloadError & e) { + if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) + success(0); + failure(exc); + } + }); } }; diff --git a/src/libstore/local-binary-cache-store.cc b/src/libstore/local-binary-cache-store.cc index 91d2650fe..0f377989b 100644 --- a/src/libstore/local-binary-cache-store.cc +++ b/src/libstore/local-binary-cache-store.cc @@ -32,7 +32,19 @@ protected: void upsertFile(const std::string & path, const std::string & data) override; - std::shared_ptr<std::string> getFile(const std::string & path) override; + void getFile(const std::string & path, + std::function<void(std::shared_ptr<std::string>)> success, + std::function<void(std::exception_ptr exc)> failure) override + { + sync2async<std::shared_ptr<std::string>>(success, failure, [&]() { + try { + return std::make_shared<std::string>(readFile(binaryCacheDir + "/" + path)); + } catch (SysError & e) { + if (e.errNo == ENOENT) return std::shared_ptr<std::string>(); + throw; + } + }); + } PathSet queryAllValidPaths() override { @@ -76,16 +88,6 @@ void LocalBinaryCacheStore::upsertFile(const std::string & path, const std::stri atomicWrite(binaryCacheDir + "/" + path, data); } -std::shared_ptr<std::string> LocalBinaryCacheStore::getFile(const std::string & path) -{ - try { - return std::make_shared<std::string>(readFile(binaryCacheDir + "/" + path)); - } catch (SysError & e) { - if (e.errNo == ENOENT) return 0; - throw; - } -} - static RegisterStoreImplementation regStore([]( const std::string & uri, const Store::Params & params) -> std::shared_ptr<Store> diff --git a/src/libstore/local-store.cc b/src/libstore/local-store.cc index 10056f2f1..466cea727 100644 --- a/src/libstore/local-store.cc +++ b/src/libstore/local-store.cc @@ -577,49 +577,54 @@ Hash parseHashField(const Path & path, const string & s) } -std::shared_ptr<ValidPathInfo> LocalStore::queryPathInfoUncached(const Path & path) +void LocalStore::queryPathInfoUncached(const Path & path, + std::function<void(std::shared_ptr<ValidPathInfo>)> success, + std::function<void(std::exception_ptr exc)> failure) { - auto info = std::make_shared<ValidPathInfo>(); - info->path = path; + sync2async<std::shared_ptr<ValidPathInfo>>(success, failure, [&]() { - assertStorePath(path); + auto info = std::make_shared<ValidPathInfo>(); + info->path = path; - return retrySQLite<std::shared_ptr<ValidPathInfo>>([&]() { - auto state(_state.lock()); + assertStorePath(path); + + return retrySQLite<std::shared_ptr<ValidPathInfo>>([&]() { + auto state(_state.lock()); - /* Get the path info. */ - auto useQueryPathInfo(state->stmtQueryPathInfo.use()(path)); + /* Get the path info. */ + auto useQueryPathInfo(state->stmtQueryPathInfo.use()(path)); - if (!useQueryPathInfo.next()) - return std::shared_ptr<ValidPathInfo>(); + if (!useQueryPathInfo.next()) + return std::shared_ptr<ValidPathInfo>(); - info->id = useQueryPathInfo.getInt(0); + info->id = useQueryPathInfo.getInt(0); - info->narHash = parseHashField(path, useQueryPathInfo.getStr(1)); + info->narHash = parseHashField(path, useQueryPathInfo.getStr(1)); - info->registrationTime = useQueryPathInfo.getInt(2); + info->registrationTime = useQueryPathInfo.getInt(2); - auto s = (const char *) sqlite3_column_text(state->stmtQueryPathInfo, 3); - if (s) info->deriver = s; + auto s = (const char *) sqlite3_column_text(state->stmtQueryPathInfo, 3); + if (s) info->deriver = s; - /* Note that narSize = NULL yields 0. */ - info->narSize = useQueryPathInfo.getInt(4); + /* Note that narSize = NULL yields 0. */ + info->narSize = useQueryPathInfo.getInt(4); - info->ultimate = useQueryPathInfo.getInt(5) == 1; + info->ultimate = useQueryPathInfo.getInt(5) == 1; - s = (const char *) sqlite3_column_text(state->stmtQueryPathInfo, 6); - if (s) info->sigs = tokenizeString<StringSet>(s, " "); + s = (const char *) sqlite3_column_text(state->stmtQueryPathInfo, 6); + if (s) info->sigs = tokenizeString<StringSet>(s, " "); - s = (const char *) sqlite3_column_text(state->stmtQueryPathInfo, 7); - if (s) info->ca = s; + s = (const char *) sqlite3_column_text(state->stmtQueryPathInfo, 7); + if (s) info->ca = s; - /* Get the references. */ - auto useQueryReferences(state->stmtQueryReferences.use()(info->id)); + /* Get the references. */ + auto useQueryReferences(state->stmtQueryReferences.use()(info->id)); - while (useQueryReferences.next()) - info->references.insert(useQueryReferences.getStr(0)); + while (useQueryReferences.next()) + info->references.insert(useQueryReferences.getStr(0)); - return info; + return info; + }); }); } diff --git a/src/libstore/local-store.hh b/src/libstore/local-store.hh index 5b5960cf2..24188130d 100644 --- a/src/libstore/local-store.hh +++ b/src/libstore/local-store.hh @@ -106,7 +106,9 @@ public: PathSet queryAllValidPaths() override; - std::shared_ptr<ValidPathInfo> queryPathInfoUncached(const Path & path) override; + void queryPathInfoUncached(const Path & path, + std::function<void(std::shared_ptr<ValidPathInfo>)> success, + std::function<void(std::exception_ptr exc)> failure) override; void queryReferrers(const Path & path, PathSet & referrers) override; diff --git a/src/libstore/misc.cc b/src/libstore/misc.cc index da654ba0d..0c2c49e55 100644 --- a/src/libstore/misc.cc +++ b/src/libstore/misc.cc @@ -8,66 +8,90 @@ namespace nix { -void Store::computeFSClosure(const Path & path, - PathSet & paths, bool flipDirection, bool includeOutputs, bool includeDerivers) +void Store::computeFSClosure(const Path & startPath, + PathSet & paths_, bool flipDirection, bool includeOutputs, bool includeDerivers) { - ThreadPool pool; + struct State + { + size_t pending; + PathSet & paths; + std::exception_ptr exc; + }; - Sync<bool> state_; + Sync<State> state_(State{0, paths_, 0}); - std::function<void(Path)> doPath; + std::function<void(const Path &)> enqueue; - doPath = [&](const Path & path) { + std::condition_variable done; + + enqueue = [&](const Path & path) -> void { { auto state(state_.lock()); - if (paths.count(path)) return; - paths.insert(path); + if (state->exc) return; + if (state->paths.count(path)) return; + state->paths.insert(path); + state->pending++; } - auto info = queryPathInfo(path); + queryPathInfo(path, + [&, path](ref<ValidPathInfo> info) { + // FIXME: calls to isValidPath() should be async - if (flipDirection) { + if (flipDirection) { - PathSet referrers; - queryReferrers(path, referrers); - for (auto & ref : referrers) - if (ref != path) - pool.enqueue(std::bind(doPath, ref)); + PathSet referrers; + queryReferrers(path, referrers); + for (auto & ref : referrers) + if (ref != path) + enqueue(ref); - if (includeOutputs) { - PathSet derivers = queryValidDerivers(path); - for (auto & i : derivers) - pool.enqueue(std::bind(doPath, i)); - } + if (includeOutputs) + for (auto & i : queryValidDerivers(path)) + enqueue(i); - if (includeDerivers && isDerivation(path)) { - PathSet outputs = queryDerivationOutputs(path); - for (auto & i : outputs) - if (isValidPath(i) && queryPathInfo(i)->deriver == path) - pool.enqueue(std::bind(doPath, i)); - } + if (includeDerivers && isDerivation(path)) + for (auto & i : queryDerivationOutputs(path)) + if (isValidPath(i) && queryPathInfo(i)->deriver == path) + enqueue(i); - } else { + } else { - for (auto & ref : info->references) - if (ref != path) - pool.enqueue(std::bind(doPath, ref)); + for (auto & ref : info->references) + if (ref != path) + enqueue(ref); - if (includeOutputs && isDerivation(path)) { - PathSet outputs = queryDerivationOutputs(path); - for (auto & i : outputs) - if (isValidPath(i)) pool.enqueue(std::bind(doPath, i)); - } + if (includeOutputs && isDerivation(path)) + for (auto & i : queryDerivationOutputs(path)) + if (isValidPath(i)) enqueue(i); - if (includeDerivers && isValidPath(info->deriver)) - pool.enqueue(std::bind(doPath, info->deriver)); + if (includeDerivers && isValidPath(info->deriver)) + enqueue(info->deriver); - } + } + + { + auto state(state_.lock()); + assert(state->pending); + if (!--state->pending) done.notify_one(); + } + + }, + + [&, path](std::exception_ptr exc) { + auto state(state_.lock()); + if (!state->exc) state->exc = exc; + assert(state->pending); + if (!--state->pending) done.notify_one(); + }); }; - pool.enqueue(std::bind(doPath, path)); + enqueue(startPath); - pool.process(); + { + auto state(state_.lock()); + while (state->pending) state.wait(done); + if (state->exc) std::rethrow_exception(state->exc); + } } diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index 94075f3b9..7b73557a5 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -246,36 +246,40 @@ void RemoteStore::querySubstitutablePathInfos(const PathSet & paths, } -std::shared_ptr<ValidPathInfo> RemoteStore::queryPathInfoUncached(const Path & path) -{ - auto conn(connections->get()); - conn->to << wopQueryPathInfo << path; - try { - conn->processStderr(); - } catch (Error & e) { - // Ugly backwards compatibility hack. - if (e.msg().find("is not valid") != std::string::npos) - throw InvalidPath(e.what()); - throw; - } - if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 17) { - bool valid = readInt(conn->from) != 0; - if (!valid) throw InvalidPath(format("path ‘%s’ is not valid") % path); - } - auto info = std::make_shared<ValidPathInfo>(); - info->path = path; - info->deriver = readString(conn->from); - if (info->deriver != "") assertStorePath(info->deriver); - info->narHash = parseHash(htSHA256, readString(conn->from)); - info->references = readStorePaths<PathSet>(*this, conn->from); - info->registrationTime = readInt(conn->from); - info->narSize = readLongLong(conn->from); - if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) { - info->ultimate = readInt(conn->from) != 0; - info->sigs = readStrings<StringSet>(conn->from); - info->ca = readString(conn->from); - } - return info; +void RemoteStore::queryPathInfoUncached(const Path & path, + std::function<void(std::shared_ptr<ValidPathInfo>)> success, + std::function<void(std::exception_ptr exc)> failure) +{ + sync2async<std::shared_ptr<ValidPathInfo>>(success, failure, [&]() { + auto conn(connections->get()); + conn->to << wopQueryPathInfo << path; + try { + conn->processStderr(); + } catch (Error & e) { + // Ugly backwards compatibility hack. + if (e.msg().find("is not valid") != std::string::npos) + throw InvalidPath(e.what()); + throw; + } + if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 17) { + bool valid = readInt(conn->from) != 0; + if (!valid) throw InvalidPath(format("path ‘%s’ is not valid") % path); + } + auto info = std::make_shared<ValidPathInfo>(); + info->path = path; + info->deriver = readString(conn->from); + if (info->deriver != "") assertStorePath(info->deriver); + info->narHash = parseHash(htSHA256, readString(conn->from)); + info->references = readStorePaths<PathSet>(*this, conn->from); + info->registrationTime = readInt(conn->from); + info->narSize = readLongLong(conn->from); + if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) { + info->ultimate = readInt(conn->from) != 0; + info->sigs = readStrings<StringSet>(conn->from); + info->ca = readString(conn->from); + } + return info; + }); } diff --git a/src/libstore/remote-store.hh b/src/libstore/remote-store.hh index e756805ea..9879337d6 100644 --- a/src/libstore/remote-store.hh +++ b/src/libstore/remote-store.hh @@ -34,7 +34,9 @@ public: PathSet queryAllValidPaths() override; - std::shared_ptr<ValidPathInfo> queryPathInfoUncached(const Path & path) override; + void queryPathInfoUncached(const Path & path, + std::function<void(std::shared_ptr<ValidPathInfo>)> success, + std::function<void(std::exception_ptr exc)> failure) override; void queryReferrers(const Path & path, PathSet & referrers) override; diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index ed95620bb..0722e43d5 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -167,46 +167,50 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore stats.putTimeMs += duration; } - std::shared_ptr<std::string> getFile(const std::string & path) override + void getFile(const std::string & path, + std::function<void(std::shared_ptr<std::string>)> success, + std::function<void(std::exception_ptr exc)> failure) override { - debug(format("fetching ‘s3://%1%/%2%’...") % bucketName % path); + sync2async<std::shared_ptr<std::string>>(success, failure, [&]() { + debug(format("fetching ‘s3://%1%/%2%’...") % bucketName % path); - auto request = - Aws::S3::Model::GetObjectRequest() - .WithBucket(bucketName) - .WithKey(path); + auto request = + Aws::S3::Model::GetObjectRequest() + .WithBucket(bucketName) + .WithKey(path); - request.SetResponseStreamFactory([&]() { - return Aws::New<std::stringstream>("STRINGSTREAM"); - }); + request.SetResponseStreamFactory([&]() { + return Aws::New<std::stringstream>("STRINGSTREAM"); + }); - stats.get++; + stats.get++; - try { + try { - auto now1 = std::chrono::steady_clock::now(); + auto now1 = std::chrono::steady_clock::now(); - auto result = checkAws(format("AWS error fetching ‘%s’") % path, - client->GetObject(request)); + auto result = checkAws(format("AWS error fetching ‘%s’") % path, + client->GetObject(request)); - auto now2 = std::chrono::steady_clock::now(); + auto now2 = std::chrono::steady_clock::now(); - auto res = dynamic_cast<std::stringstream &>(result.GetBody()).str(); + auto res = dynamic_cast<std::stringstream &>(result.GetBody()).str(); - auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count(); + auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count(); - printMsg(lvlTalkative, format("downloaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms") - % bucketName % path % res.size() % duration); + printMsg(lvlTalkative, format("downloaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms") + % bucketName % path % res.size() % duration); - stats.getBytes += res.size(); - stats.getTimeMs += duration; + stats.getBytes += res.size(); + stats.getTimeMs += duration; - return std::make_shared<std::string>(res); + return std::make_shared<std::string>(res); - } catch (S3Error & e) { - if (e.err == Aws::S3::S3Errors::NO_SUCH_KEY) return 0; - throw; - } + } catch (S3Error & e) { + if (e.err == Aws::S3::S3Errors::NO_SUCH_KEY) return std::shared_ptr<std::string>(); + throw; + } + }); } PathSet queryAllValidPaths() override diff --git a/src/libstore/store-api.cc b/src/libstore/store-api.cc index 5dd56f905..427170843 100644 --- a/src/libstore/store-api.cc +++ b/src/libstore/store-api.cc @@ -4,6 +4,8 @@ #include "util.hh" #include "nar-info-disk-cache.hh" +#include <future> + namespace nix { @@ -283,51 +285,79 @@ bool Store::isValidPath(const Path & storePath) ref<const ValidPathInfo> Store::queryPathInfo(const Path & storePath) { + std::promise<ref<ValidPathInfo>> promise; + + queryPathInfo(storePath, + [&](ref<ValidPathInfo> info) { + promise.set_value(info); + }, + [&](std::exception_ptr exc) { + promise.set_exception(exc); + }); + + return promise.get_future().get(); +} + + +void Store::queryPathInfo(const Path & storePath, + std::function<void(ref<ValidPathInfo>)> success, + std::function<void(std::exception_ptr exc)> failure) +{ auto hashPart = storePathToHash(storePath); - { - auto state_(state.lock()); - auto res = state_->pathInfoCache.get(hashPart); - if (res) { - stats.narInfoReadAverted++; - if (!*res) - throw InvalidPath(format("path ‘%s’ is not valid") % storePath); - return ref<ValidPathInfo>(*res); + try { + + { + auto res = state.lock()->pathInfoCache.get(hashPart); + if (res) { + stats.narInfoReadAverted++; + if (!*res) + throw InvalidPath(format("path ‘%s’ is not valid") % storePath); + return success(ref<ValidPathInfo>(*res)); + } } - } - if (diskCache) { - auto res = diskCache->lookupNarInfo(getUri(), hashPart); - if (res.first != NarInfoDiskCache::oUnknown) { - stats.narInfoReadAverted++; - auto state_(state.lock()); - state_->pathInfoCache.upsert(hashPart, - res.first == NarInfoDiskCache::oInvalid ? 0 : res.second); - if (res.first == NarInfoDiskCache::oInvalid || - (res.second->path != storePath && storePathToName(storePath) != "")) - throw InvalidPath(format("path ‘%s’ is not valid") % storePath); - return ref<ValidPathInfo>(res.second); + if (diskCache) { + auto res = diskCache->lookupNarInfo(getUri(), hashPart); + if (res.first != NarInfoDiskCache::oUnknown) { + stats.narInfoReadAverted++; + { + auto state_(state.lock()); + state_->pathInfoCache.upsert(hashPart, + res.first == NarInfoDiskCache::oInvalid ? 0 : res.second); + if (res.first == NarInfoDiskCache::oInvalid || + (res.second->path != storePath && storePathToName(storePath) != "")) + throw InvalidPath(format("path ‘%s’ is not valid") % storePath); + } + return success(ref<ValidPathInfo>(res.second)); + } } + + } catch (std::exception & e) { + return callFailure(failure); } - auto info = queryPathInfoUncached(storePath); + queryPathInfoUncached(storePath, + [this, storePath, hashPart, success, failure](std::shared_ptr<ValidPathInfo> info) { - if (diskCache) - diskCache->upsertNarInfo(getUri(), hashPart, info); + if (diskCache) + diskCache->upsertNarInfo(getUri(), hashPart, info); - { - auto state_(state.lock()); - state_->pathInfoCache.upsert(hashPart, info); - } + { + auto state_(state.lock()); + state_->pathInfoCache.upsert(hashPart, info); + } - if (!info - || (info->path != storePath && storePathToName(storePath) != "")) - { - stats.narInfoMissing++; - throw InvalidPath(format("path ‘%s’ is not valid") % storePath); - } + if (!info + || (info->path != storePath && storePathToName(storePath) != "")) + { + stats.narInfoMissing++; + return failure(std::make_exception_ptr(InvalidPath(format("path ‘%s’ is not valid") % storePath))); + } + + callSuccess(success, failure, ref<ValidPathInfo>(info)); - return ref<ValidPathInfo>(info); + }, failure); } diff --git a/src/libstore/store-api.hh b/src/libstore/store-api.hh index 41fc58fc4..cba4deaad 100644 --- a/src/libstore/store-api.hh +++ b/src/libstore/store-api.hh @@ -319,9 +319,16 @@ public: the name part of the store path. */ ref<const ValidPathInfo> queryPathInfo(const Path & path); + /* Asynchronous version of queryPathInfo(). */ + void queryPathInfo(const Path & path, + std::function<void(ref<ValidPathInfo>)> success, + std::function<void(std::exception_ptr exc)> failure); + protected: - virtual std::shared_ptr<ValidPathInfo> queryPathInfoUncached(const Path & path) = 0; + virtual void queryPathInfoUncached(const Path & path, + std::function<void(std::shared_ptr<ValidPathInfo>)> success, + std::function<void(std::exception_ptr exc)> failure) = 0; public: diff --git a/src/libutil/util.cc b/src/libutil/util.cc index 8e029fb48..1750e0373 100644 --- a/src/libutil/util.cc +++ b/src/libutil/util.cc @@ -1215,4 +1215,15 @@ string base64Decode(const string & s) } +void callFailure(const std::function<void(std::exception_ptr exc)> & failure) +{ + try { + failure(std::current_exception()); + } catch (std::exception & e) { + printMsg(lvlError, format("uncaught exception: %s") % e.what()); + abort(); + } +} + + } diff --git a/src/libutil/util.hh b/src/libutil/util.hh index 9bf548326..182a38fb3 100644 --- a/src/libutil/util.hh +++ b/src/libutil/util.hh @@ -376,4 +376,43 @@ string get(const T & map, const string & key, const string & def = "") } +/* Call ‘failure’ with the current exception as argument. If ‘failure’ + throws an exception, abort the program. */ +void callFailure(const std::function<void(std::exception_ptr exc)> & failure); + + +/* Evaluate the function ‘f’. If it returns a value, call ‘success’ + with that value as its argument. If it or ‘success’ throws an + exception, call ‘failure’. If ‘failure’ throws an exception, abort + the program. */ +template<class T> +void sync2async( + const std::function<void(T)> & success, + const std::function<void(std::exception_ptr exc)> & failure, + const std::function<T()> & f) +{ + try { + success(f()); + } catch (...) { + callFailure(failure); + } +} + + +/* Call the function ‘success’. If it throws an exception, call + ‘failure’. If that throws an exception, abort the program. */ +template<class T> +void callSuccess( + const std::function<void(T)> & success, + const std::function<void(std::exception_ptr exc)> & failure, + T && arg) +{ + try { + success(arg); + } catch (...) { + callFailure(failure); + } +} + + } |