diff options
Diffstat (limited to 'src/libstore')
-rw-r--r-- | src/libstore/binary-cache-store.cc | 37 | ||||
-rw-r--r-- | src/libstore/binary-cache-store.hh | 6 | ||||
-rw-r--r-- | src/libstore/download.cc | 37 | ||||
-rw-r--r-- | src/libstore/download.hh | 3 | ||||
-rw-r--r-- | src/libstore/http-binary-cache-store.cc | 18 | ||||
-rw-r--r-- | src/libstore/legacy-ssh-store.cc | 11 | ||||
-rw-r--r-- | src/libstore/local-binary-cache-store.cc | 17 | ||||
-rw-r--r-- | src/libstore/local-store.cc | 13 | ||||
-rw-r--r-- | src/libstore/local-store.hh | 3 | ||||
-rw-r--r-- | src/libstore/misc.cc | 17 | ||||
-rw-r--r-- | src/libstore/remote-store.cc | 9 | ||||
-rw-r--r-- | src/libstore/remote-store.hh | 3 | ||||
-rw-r--r-- | src/libstore/s3-binary-cache-store.cc | 9 | ||||
-rw-r--r-- | src/libstore/store-api.cc | 88 | ||||
-rw-r--r-- | src/libstore/store-api.hh | 6 |
15 files changed, 131 insertions, 146 deletions
diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc index 2e9a13e56..45be49076 100644 --- a/src/libstore/binary-cache-store.cc +++ b/src/libstore/binary-cache-store.cc @@ -58,12 +58,13 @@ std::shared_ptr<std::string> BinaryCacheStore::getFile(const std::string & path) { std::promise<std::shared_ptr<std::string>> promise; getFile(path, - [&](std::shared_ptr<std::string> result) { - promise.set_value(result); - }, - [&](std::exception_ptr exc) { - promise.set_exception(exc); - }); + {[&](std::future<std::shared_ptr<std::string>> result) { + try { + promise.set_value(result.get()); + } catch (...) { + promise.set_exception(std::current_exception()); + } + }}); return promise.get_future().get(); } @@ -218,8 +219,7 @@ void BinaryCacheStore::narFromPath(const Path & storePath, Sink & sink) } void BinaryCacheStore::queryPathInfoUncached(const Path & storePath, - std::function<void(std::shared_ptr<ValidPathInfo>)> success, - std::function<void(std::exception_ptr exc)> failure) + Callback<std::shared_ptr<ValidPathInfo>> callback) { auto uri = getUri(); auto act = std::make_shared<Activity>(*logger, lvlTalkative, actQueryPathInfo, @@ -229,17 +229,22 @@ void BinaryCacheStore::queryPathInfoUncached(const Path & storePath, auto narInfoFile = narInfoFileFor(storePath); getFile(narInfoFile, - [=](std::shared_ptr<std::string> data) { - if (!data) return success(0); + {[=](std::future<std::shared_ptr<std::string>> fut) { + try { + auto data = fut.get(); + + if (!data) return callback(nullptr); - stats.narInfoRead++; + stats.narInfoRead++; - callSuccess(success, failure, (std::shared_ptr<ValidPathInfo>) - std::make_shared<NarInfo>(*this, *data, narInfoFile)); + callback((std::shared_ptr<ValidPathInfo>) + std::make_shared<NarInfo>(*this, *data, narInfoFile)); - (void) act; // force Activity into this lambda to ensure it stays alive - }, - failure); + (void) act; // force Activity into this lambda to ensure it stays alive + } catch (...) { + callback.rethrow(); + } + }}); } Path BinaryCacheStore::addToStore(const string & name, const Path & srcPath, diff --git a/src/libstore/binary-cache-store.hh b/src/libstore/binary-cache-store.hh index e20b96844..fcde666be 100644 --- a/src/libstore/binary-cache-store.hh +++ b/src/libstore/binary-cache-store.hh @@ -41,8 +41,7 @@ public: /* Return the contents of the specified file, or null if it doesn't exist. */ virtual void getFile(const std::string & path, - std::function<void(std::shared_ptr<std::string>)> success, - std::function<void(std::exception_ptr exc)> failure) = 0; + Callback<std::shared_ptr<std::string>> callback) = 0; std::shared_ptr<std::string> getFile(const std::string & path); @@ -71,8 +70,7 @@ public: { unsupported(); } void queryPathInfoUncached(const Path & path, - std::function<void(std::shared_ptr<ValidPathInfo>)> success, - std::function<void(std::exception_ptr exc)> failure) override; + Callback<std::shared_ptr<ValidPathInfo>> callback) override; void queryReferrers(const Path & path, PathSet & referrers) override diff --git a/src/libstore/download.cc b/src/libstore/download.cc index fce701a15..afb066e14 100644 --- a/src/libstore/download.cc +++ b/src/libstore/download.cc @@ -81,8 +81,7 @@ struct CurlDownloader : public Downloader DownloadResult result; Activity act; bool done = false; // whether either the success or failure function has been called - std::function<void(const DownloadResult &)> success; - std::function<void(std::exception_ptr exc)> failure; + Callback<DownloadResult> callback; CURL * req = 0; bool active = false; // whether the handle has been added to the multi object std::string status; @@ -97,10 +96,13 @@ struct CurlDownloader : public Downloader std::string encoding; - DownloadItem(CurlDownloader & downloader, const DownloadRequest & request) + DownloadItem(CurlDownloader & downloader, + const DownloadRequest & request, + Callback<DownloadResult> callback) : downloader(downloader) , request(request) , act(*logger, lvlTalkative, actDownload, fmt("downloading '%s'", request.uri), {request.uri}, request.parentAct) + , callback(callback) { if (!request.expectedETag.empty()) requestHeaders = curl_slist_append(requestHeaders, ("If-None-Match: " + request.expectedETag).c_str()); @@ -129,7 +131,7 @@ struct CurlDownloader : public Downloader { assert(!done); done = true; - callFailure(failure, std::make_exception_ptr(e)); + callback.rethrow(std::make_exception_ptr(e)); } size_t writeCallback(void * contents, size_t size, size_t nmemb) @@ -316,11 +318,11 @@ struct CurlDownloader : public Downloader try { if (request.decompress) result.data = decodeContent(encoding, ref<std::string>(result.data)); - callSuccess(success, failure, const_cast<const DownloadResult &>(result)); act.progress(result.data->size(), result.data->size()); + callback(std::move(result)); } catch (...) { done = true; - callFailure(failure, std::current_exception()); + callback.rethrow(); } } else { // We treat most errors as transient, but won't retry when hopeless @@ -570,13 +572,12 @@ struct CurlDownloader : public Downloader } void enqueueDownload(const DownloadRequest & request, - std::function<void(const DownloadResult &)> success, - std::function<void(std::exception_ptr exc)> failure) override + Callback<DownloadResult> callback) override { /* Ugly hack to support s3:// URIs. */ if (hasPrefix(request.uri, "s3://")) { // FIXME: do this on a worker thread - sync2async<DownloadResult>(success, failure, [&]() -> DownloadResult { + try { #ifdef ENABLE_S3 S3Helper s3Helper("", Aws::Region::US_EAST_1); // FIXME: make configurable auto slash = request.uri.find('/', 5); @@ -590,18 +591,15 @@ struct CurlDownloader : public Downloader if (!s3Res.data) throw DownloadError(NotFound, fmt("S3 object '%s' does not exist", request.uri)); res.data = s3Res.data; - return res; + callback(std::move(res)); #else throw nix::Error("cannot download '%s' because Nix is not built with S3 support", request.uri); #endif - }); + } catch (...) { callback.rethrow(); } return; } - auto item = std::make_shared<DownloadItem>(*this, request); - item->success = success; - item->failure = failure; - enqueueItem(item); + enqueueItem(std::make_shared<DownloadItem>(*this, request, callback)); } }; @@ -622,8 +620,13 @@ std::future<DownloadResult> Downloader::enqueueDownload(const DownloadRequest & { auto promise = std::make_shared<std::promise<DownloadResult>>(); enqueueDownload(request, - [promise](const DownloadResult & result) { promise->set_value(result); }, - [promise](std::exception_ptr exc) { promise->set_exception(exc); }); + {[promise](std::future<DownloadResult> fut) { + try { + promise->set_value(fut.get()); + } catch (...) { + promise->set_exception(std::current_exception()); + } + }}); return promise->get_future(); } diff --git a/src/libstore/download.hh b/src/libstore/download.hh index 7ade756fc..01940f544 100644 --- a/src/libstore/download.hh +++ b/src/libstore/download.hh @@ -42,8 +42,7 @@ struct Downloader the download. The future may throw a DownloadError exception. */ virtual void enqueueDownload(const DownloadRequest & request, - std::function<void(const DownloadResult &)> success, - std::function<void(std::exception_ptr exc)> failure) = 0; + Callback<DownloadResult> callback) = 0; std::future<DownloadResult> enqueueDownload(const DownloadRequest & request); diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc index b9e9cd5da..b8d670417 100644 --- a/src/libstore/http-binary-cache-store.cc +++ b/src/libstore/http-binary-cache-store.cc @@ -78,27 +78,23 @@ protected: } void getFile(const std::string & path, - std::function<void(std::shared_ptr<std::string>)> success, - std::function<void(std::exception_ptr exc)> failure) override + Callback<std::shared_ptr<std::string>> callback) override { DownloadRequest request(cacheUri + "/" + path); request.tries = 8; getDownloader()->enqueueDownload(request, - [success](const DownloadResult & result) { - success(result.data); - }, - [success, failure](std::exception_ptr exc) { + {[callback](std::future<DownloadResult> result) { try { - std::rethrow_exception(exc); + callback(result.get().data); } catch (DownloadError & e) { if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) - return success(0); - failure(exc); + return callback(std::shared_ptr<std::string>()); + callback.rethrow(); } catch (...) { - failure(exc); + callback.rethrow(); } - }); + }}); } }; diff --git a/src/libstore/legacy-ssh-store.cc b/src/libstore/legacy-ssh-store.cc index 5dee25308..02d91ded0 100644 --- a/src/libstore/legacy-ssh-store.cc +++ b/src/libstore/legacy-ssh-store.cc @@ -84,10 +84,9 @@ struct LegacySSHStore : public Store } void queryPathInfoUncached(const Path & path, - std::function<void(std::shared_ptr<ValidPathInfo>)> success, - std::function<void(std::exception_ptr exc)> failure) override + Callback<std::shared_ptr<ValidPathInfo>> callback) override { - sync2async<std::shared_ptr<ValidPathInfo>>(success, failure, [&]() -> std::shared_ptr<ValidPathInfo> { + try { auto conn(connections->get()); debug("querying remote host '%s' for info on '%s'", host, path); @@ -97,7 +96,7 @@ struct LegacySSHStore : public Store auto info = std::make_shared<ValidPathInfo>(); conn->from >> info->path; - if (info->path.empty()) return nullptr; + if (info->path.empty()) return callback(nullptr); assert(path == info->path); PathSet references; @@ -116,8 +115,8 @@ struct LegacySSHStore : public Store auto s = readString(conn->from); assert(s == ""); - return info; - }); + callback(std::move(info)); + } catch (...) { callback.rethrow(); } } void addToStore(const ValidPathInfo & info, Source & source, diff --git a/src/libstore/local-binary-cache-store.cc b/src/libstore/local-binary-cache-store.cc index 2577e90ae..ae0ffa6a5 100644 --- a/src/libstore/local-binary-cache-store.cc +++ b/src/libstore/local-binary-cache-store.cc @@ -35,17 +35,14 @@ protected: const std::string & mimeType) override; void getFile(const std::string & path, - std::function<void(std::shared_ptr<std::string>)> success, - std::function<void(std::exception_ptr exc)> failure) override + Callback<std::shared_ptr<std::string>> callback) override { - sync2async<std::shared_ptr<std::string>>(success, failure, [&]() { - try { - return std::make_shared<std::string>(readFile(binaryCacheDir + "/" + path)); - } catch (SysError & e) { - if (e.errNo == ENOENT) return std::shared_ptr<std::string>(); - throw; - } - }); + try { + // FIXME: O(n) space + callback(std::make_shared<std::string>(readFile(binaryCacheDir + "/" + path))); + } catch (SysError & e) { + if (e.errNo == ENOENT) callback(nullptr); else callback.rethrow(); + } catch (...) { callback.rethrow(); } } PathSet queryAllValidPaths() override diff --git a/src/libstore/local-store.cc b/src/libstore/local-store.cc index 681abafef..3b2ba65f3 100644 --- a/src/libstore/local-store.cc +++ b/src/libstore/local-store.cc @@ -629,17 +629,15 @@ uint64_t LocalStore::addValidPath(State & state, void LocalStore::queryPathInfoUncached(const Path & path, - std::function<void(std::shared_ptr<ValidPathInfo>)> success, - std::function<void(std::exception_ptr exc)> failure) + Callback<std::shared_ptr<ValidPathInfo>> callback) { - sync2async<std::shared_ptr<ValidPathInfo>>(success, failure, [&]() { - + try { auto info = std::make_shared<ValidPathInfo>(); info->path = path; assertStorePath(path); - return retrySQLite<std::shared_ptr<ValidPathInfo>>([&]() { + callback(retrySQLite<std::shared_ptr<ValidPathInfo>>([&]() { auto state(_state.lock()); /* Get the path info. */ @@ -679,8 +677,9 @@ void LocalStore::queryPathInfoUncached(const Path & path, info->references.insert(useQueryReferences.getStr(0)); return info; - }); - }); + })); + + } catch (...) { callback.rethrow(); } } diff --git a/src/libstore/local-store.hh b/src/libstore/local-store.hh index 1209a0635..746bdbeed 100644 --- a/src/libstore/local-store.hh +++ b/src/libstore/local-store.hh @@ -127,8 +127,7 @@ public: PathSet queryAllValidPaths() override; void queryPathInfoUncached(const Path & path, - std::function<void(std::shared_ptr<ValidPathInfo>)> success, - std::function<void(std::exception_ptr exc)> failure) override; + Callback<std::shared_ptr<ValidPathInfo>> callback) override; void queryReferrers(const Path & path, PathSet & referrers) override; diff --git a/src/libstore/misc.cc b/src/libstore/misc.cc index a82aa4e9c..adcce026f 100644 --- a/src/libstore/misc.cc +++ b/src/libstore/misc.cc @@ -33,9 +33,11 @@ void Store::computeFSClosure(const PathSet & startPaths, state->pending++; } - queryPathInfo(path, - [&, path](ref<ValidPathInfo> info) { - // FIXME: calls to isValidPath() should be async + queryPathInfo(path, {[&, path](std::future<ref<ValidPathInfo>> fut) { + // FIXME: calls to isValidPath() should be async + + try { + auto info = fut.get(); if (flipDirection) { @@ -75,14 +77,13 @@ void Store::computeFSClosure(const PathSet & startPaths, if (!--state->pending) done.notify_one(); } - }, - - [&, path](std::exception_ptr exc) { + } catch (...) { auto state(state_.lock()); - if (!state->exc) state->exc = exc; + if (!state->exc) state->exc = std::current_exception(); assert(state->pending); if (!--state->pending) done.notify_one(); - }); + }; + }}); }; for (auto & startPath : startPaths) diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index a157c6c48..b72c940cd 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -294,10 +294,9 @@ void RemoteStore::querySubstitutablePathInfos(const PathSet & paths, void RemoteStore::queryPathInfoUncached(const Path & path, - std::function<void(std::shared_ptr<ValidPathInfo>)> success, - std::function<void(std::exception_ptr exc)> failure) + Callback<std::shared_ptr<ValidPathInfo>> callback) { - sync2async<std::shared_ptr<ValidPathInfo>>(success, failure, [&]() { + try { auto conn(connections->get()); conn->to << wopQueryPathInfo << path; try { @@ -324,8 +323,8 @@ void RemoteStore::queryPathInfoUncached(const Path & path, info->sigs = readStrings<StringSet>(conn->from); conn->from >> info->ca; } - return info; - }); + callback(std::move(info)); + } catch (...) { callback.rethrow(); } } diff --git a/src/libstore/remote-store.hh b/src/libstore/remote-store.hh index 95fa59a20..b488e34ce 100644 --- a/src/libstore/remote-store.hh +++ b/src/libstore/remote-store.hh @@ -40,8 +40,7 @@ public: PathSet queryAllValidPaths() override; void queryPathInfoUncached(const Path & path, - std::function<void(std::shared_ptr<ValidPathInfo>)> success, - std::function<void(std::exception_ptr exc)> failure) override; + Callback<std::shared_ptr<ValidPathInfo>> callback) override; void queryReferrers(const Path & path, PathSet & referrers) override; diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index 103f141a1..f2e8efc16 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -365,10 +365,9 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore } void getFile(const std::string & path, - std::function<void(std::shared_ptr<std::string>)> success, - std::function<void(std::exception_ptr exc)> failure) override + Callback<std::shared_ptr<std::string>> callback) override { - sync2async<std::shared_ptr<std::string>>(success, failure, [&]() { + try { stats.get++; auto res = s3Helper.getObject(bucketName, path); @@ -380,8 +379,8 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore printTalkative("downloaded 's3://%s/%s' (%d bytes) in %d ms", bucketName, path, res.data->size(), res.durationMs); - return res.data; - }); + callback(std::move(res.data)); + } catch (...) { callback.rethrow(); } } PathSet queryAllValidPaths() override diff --git a/src/libstore/store-api.cc b/src/libstore/store-api.cc index 49b32d115..9b0b7d632 100644 --- a/src/libstore/store-api.cc +++ b/src/libstore/store-api.cc @@ -305,20 +305,20 @@ ref<const ValidPathInfo> Store::queryPathInfo(const Path & storePath) std::promise<ref<ValidPathInfo>> promise; queryPathInfo(storePath, - [&](ref<ValidPathInfo> info) { - promise.set_value(info); - }, - [&](std::exception_ptr exc) { - promise.set_exception(exc); - }); + {[&](std::future<ref<ValidPathInfo>> result) { + try { + promise.set_value(result.get()); + } catch (...) { + promise.set_exception(std::current_exception()); + } + }}); return promise.get_future().get(); } void Store::queryPathInfo(const Path & storePath, - std::function<void(ref<ValidPathInfo>)> success, - std::function<void(std::exception_ptr exc)> failure) + Callback<ref<ValidPathInfo>> callback) { auto hashPart = storePathToHash(storePath); @@ -330,7 +330,7 @@ void Store::queryPathInfo(const Path & storePath, stats.narInfoReadAverted++; if (!*res) throw InvalidPath(format("path '%s' is not valid") % storePath); - return success(ref<ValidPathInfo>(*res)); + return callback(ref<ValidPathInfo>(*res)); } } @@ -346,35 +346,36 @@ void Store::queryPathInfo(const Path & storePath, (res.second->path != storePath && storePathToName(storePath) != "")) throw InvalidPath(format("path '%s' is not valid") % storePath); } - return success(ref<ValidPathInfo>(res.second)); + return callback(ref<ValidPathInfo>(res.second)); } } - } catch (std::exception & e) { - return callFailure(failure); - } + } catch (...) { return callback.rethrow(); } queryPathInfoUncached(storePath, - [this, storePath, hashPart, success, failure](std::shared_ptr<ValidPathInfo> info) { + {[this, storePath, hashPart, callback](std::future<std::shared_ptr<ValidPathInfo>> fut) { - if (diskCache) - diskCache->upsertNarInfo(getUri(), hashPart, info); + try { + auto info = fut.get(); - { - auto state_(state.lock()); - state_->pathInfoCache.upsert(hashPart, info); - } + if (diskCache) + diskCache->upsertNarInfo(getUri(), hashPart, info); - if (!info - || (info->path != storePath && storePathToName(storePath) != "")) - { - stats.narInfoMissing++; - return failure(std::make_exception_ptr(InvalidPath(format("path '%s' is not valid") % storePath))); - } + { + auto state_(state.lock()); + state_->pathInfoCache.upsert(hashPart, info); + } - callSuccess(success, failure, ref<ValidPathInfo>(info)); + if (!info + || (info->path != storePath && storePathToName(storePath) != "")) + { + stats.narInfoMissing++; + throw InvalidPath("path '%s' is not valid", storePath); + } - }, failure); + callback(ref<ValidPathInfo>(info)); + } catch (...) { callback.rethrow(); } + }}); } @@ -394,26 +395,19 @@ PathSet Store::queryValidPaths(const PathSet & paths, SubstituteFlag maybeSubsti auto doQuery = [&](const Path & path ) { checkInterrupt(); - queryPathInfo(path, - [path, &state_, &wakeup](ref<ValidPathInfo> info) { - auto state(state_.lock()); + queryPathInfo(path, {[path, &state_, &wakeup](std::future<ref<ValidPathInfo>> fut) { + auto state(state_.lock()); + try { + auto info = fut.get(); state->valid.insert(path); - assert(state->left); - if (!--state->left) - wakeup.notify_one(); - }, - [path, &state_, &wakeup](std::exception_ptr exc) { - auto state(state_.lock()); - try { - std::rethrow_exception(exc); - } catch (InvalidPath &) { - } catch (...) { - state->exc = exc; - } - assert(state->left); - if (!--state->left) - wakeup.notify_one(); - }); + } catch (InvalidPath &) { + } catch (...) { + state->exc = std::current_exception(); + } + assert(state->left); + if (!--state->left) + wakeup.notify_one(); + }}); }; for (auto & path : paths) diff --git a/src/libstore/store-api.hh b/src/libstore/store-api.hh index ea259f07e..6ee2d5506 100644 --- a/src/libstore/store-api.hh +++ b/src/libstore/store-api.hh @@ -355,14 +355,12 @@ public: /* Asynchronous version of queryPathInfo(). */ void queryPathInfo(const Path & path, - std::function<void(ref<ValidPathInfo>)> success, - std::function<void(std::exception_ptr exc)> failure); + Callback<ref<ValidPathInfo>> callback); protected: virtual void queryPathInfoUncached(const Path & path, - std::function<void(std::shared_ptr<ValidPathInfo>)> success, - std::function<void(std::exception_ptr exc)> failure) = 0; + Callback<std::shared_ptr<ValidPathInfo>> callback) = 0; public: |