diff options
-rw-r--r-- | src/libstore/crypto.cc | 12 | ||||
-rw-r--r-- | src/libstore/crypto.hh | 2 | ||||
-rw-r--r-- | src/libutil/thread-pool.cc | 81 | ||||
-rw-r--r-- | src/libutil/thread-pool.hh | 52 | ||||
-rw-r--r-- | src/libutil/util.cc | 2 | ||||
-rw-r--r-- | src/libutil/util.hh | 2 | ||||
-rw-r--r-- | src/nix/command.cc | 21 | ||||
-rw-r--r-- | src/nix/command.hh | 17 | ||||
-rw-r--r-- | src/nix/progress-bar.cc | 72 | ||||
-rw-r--r-- | src/nix/progress-bar.hh | 49 | ||||
-rw-r--r-- | src/nix/verify.cc | 124 |
11 files changed, 432 insertions, 2 deletions
diff --git a/src/libstore/crypto.cc b/src/libstore/crypto.cc index caba22c1e..94c582d65 100644 --- a/src/libstore/crypto.cc +++ b/src/libstore/crypto.cc @@ -1,5 +1,6 @@ #include "crypto.hh" #include "util.hh" +#include "globals.hh" #if HAVE_SODIUM #include <sodium.h> @@ -98,4 +99,15 @@ bool verifyDetached(const std::string & data, const std::string & sig, #endif } +PublicKeys getDefaultPublicKeys() +{ + PublicKeys publicKeys; + for (auto s : settings.get("binary-cache-public-keys", Strings())) { + PublicKey key(s); + publicKeys.emplace(key.name, key); + // FIXME: filter duplicates + } + return publicKeys; +} + } diff --git a/src/libstore/crypto.hh b/src/libstore/crypto.hh index 38d5fe2a8..9110af3aa 100644 --- a/src/libstore/crypto.hh +++ b/src/libstore/crypto.hh @@ -49,4 +49,6 @@ typedef std::map<std::string, PublicKey> PublicKeys; bool verifyDetached(const std::string & data, const std::string & sig, const PublicKeys & publicKeys); +PublicKeys getDefaultPublicKeys(); + } diff --git a/src/libutil/thread-pool.cc b/src/libutil/thread-pool.cc new file mode 100644 index 000000000..819aed748 --- /dev/null +++ b/src/libutil/thread-pool.cc @@ -0,0 +1,81 @@ +#include "thread-pool.hh" + +namespace nix { + +ThreadPool::ThreadPool(size_t _nrThreads) + : nrThreads(_nrThreads) +{ + if (!nrThreads) { + nrThreads = std::thread::hardware_concurrency(); + if (!nrThreads) nrThreads = 1; + } +} + +void ThreadPool::enqueue(const work_t & t) +{ + auto state_(state.lock()); + state_->left.push(t); + wakeup.notify_one(); +} + +void ThreadPool::process() +{ + printMsg(lvlDebug, format("starting pool of %d threads") % nrThreads); + + std::vector<std::thread> workers; + + for (size_t n = 0; n < nrThreads; n++) + workers.push_back(std::thread([&]() { + bool first = true; + + while (true) { + work_t work; + { + auto state_(state.lock()); + if (state_->exception) return; + if (!first) { + assert(state_->pending); + state_->pending--; + } + first = false; + while (state_->left.empty()) { + if (!state_->pending) { + wakeup.notify_all(); + return; + } + if (state_->exception) return; + state_.wait(wakeup); + } + work = state_->left.front(); + state_->left.pop(); + state_->pending++; + } + + try { + work(); + } catch (std::exception & e) { + auto state_(state.lock()); + if (state_->exception) + printMsg(lvlError, format("error: %s") % e.what()); + else { + state_->exception = std::current_exception(); + wakeup.notify_all(); + } + } + } + + })); + + for (auto & thr : workers) + thr.join(); + + { + auto state_(state.lock()); + if (state_->exception) + std::rethrow_exception(state_->exception); + } +} + +} + + diff --git a/src/libutil/thread-pool.hh b/src/libutil/thread-pool.hh new file mode 100644 index 000000000..77641d88b --- /dev/null +++ b/src/libutil/thread-pool.hh @@ -0,0 +1,52 @@ +#pragma once + +#include "sync.hh" +#include "util.hh" + +#include <queue> +#include <functional> +#include <thread> + +namespace nix { + +/* A simple thread pool that executes a queue of work items + (lambdas). */ +class ThreadPool +{ +public: + + ThreadPool(size_t nrThreads = 0); + + // FIXME: use std::packaged_task? + typedef std::function<void()> work_t; + + /* Enqueue a function to be executed by the thread pool. */ + void enqueue(const work_t & t); + + /* Execute work items until the queue is empty. Note that work + items are allowed to add new items to the queue; this is + handled correctly. Queue processing stops prematurely if any + work item throws an exception. This exception is propagated to + the calling thread. If multiple work items throw an exception + concurrently, only one item is propagated; the others are + printed on stderr and otherwise ignored. */ + void process(); + +private: + + size_t nrThreads; + + struct State + { + std::queue<work_t> left; + size_t pending = 0; + std::exception_ptr exception; + }; + + Sync<State> state; + + std::condition_variable wakeup; + +}; + +} diff --git a/src/libutil/util.cc b/src/libutil/util.cc index 25246a3e8..d4ac3fb60 100644 --- a/src/libutil/util.cc +++ b/src/libutil/util.cc @@ -548,7 +548,7 @@ void writeToStderr(const string & s) } -void (*_writeToStderr) (const unsigned char * buf, size_t count) = 0; +std::function<void(const unsigned char * buf, size_t count)> _writeToStderr; void readFull(int fd, unsigned char * buf, size_t count) diff --git a/src/libutil/util.hh b/src/libutil/util.hh index 3606f6ec9..0a72cc592 100644 --- a/src/libutil/util.hh +++ b/src/libutil/util.hh @@ -167,7 +167,7 @@ void warnOnce(bool & haveWarned, const FormatOrString & fs); void writeToStderr(const string & s); -extern void (*_writeToStderr) (const unsigned char * buf, size_t count); +extern std::function<void(const unsigned char * buf, size_t count)> _writeToStderr; /* Wrappers arount read()/write() that read/write exactly the diff --git a/src/nix/command.cc b/src/nix/command.cc index 0c2103392..a89246a93 100644 --- a/src/nix/command.cc +++ b/src/nix/command.cc @@ -69,4 +69,25 @@ void StoreCommand::run() run(openStoreAt(storeUri)); } +StorePathsCommand::StorePathsCommand() +{ + expectArgs("paths", &storePaths); + mkFlag('r', "recursive", "apply operation to closure of the specified paths", &recursive); +} + +void StorePathsCommand::run(ref<Store> store) +{ + for (auto & storePath : storePaths) + storePath = followLinksToStorePath(storePath); + + if (recursive) { + PathSet closure; + for (auto & storePath : storePaths) + store->computeFSClosure(storePath, closure, false, false); + storePaths = store->topoSortPaths(closure); + } + + run(store, storePaths); +} + } diff --git a/src/nix/command.hh b/src/nix/command.hh index 5bf391d93..8397244ca 100644 --- a/src/nix/command.hh +++ b/src/nix/command.hh @@ -24,6 +24,23 @@ struct StoreCommand : virtual Command virtual void run(ref<Store>) = 0; }; +/* A command that operates on zero or more store paths. */ +struct StorePathsCommand : public StoreCommand +{ +private: + + Paths storePaths; + bool recursive = false; + +public: + + StorePathsCommand(); + + virtual void run(ref<Store> store, Paths storePaths) = 0; + + void run(ref<Store> store) override; +}; + typedef std::map<std::string, ref<Command>> Commands; /* An argument parser that supports multiple subcommands, diff --git a/src/nix/progress-bar.cc b/src/nix/progress-bar.cc new file mode 100644 index 000000000..ed7b578e2 --- /dev/null +++ b/src/nix/progress-bar.cc @@ -0,0 +1,72 @@ +#include "progress-bar.hh" + +#include <iostream> + +namespace nix { + +ProgressBar::ProgressBar() +{ + _writeToStderr = [&](const unsigned char * buf, size_t count) { + auto state_(state.lock()); + assert(!state_->done); + std::cerr << "\r\e[K" << std::string((const char *) buf, count); + render(*state_); + }; +} + +ProgressBar::~ProgressBar() +{ + done(); +} + +void ProgressBar::updateStatus(const std::string & s) +{ + auto state_(state.lock()); + assert(!state_->done); + state_->status = s; + render(*state_); +} + +void ProgressBar::done() +{ + auto state_(state.lock()); + assert(state_->activities.empty()); + state_->done = true; + std::cerr << "\r\e[K"; + std::cerr.flush(); + _writeToStderr = decltype(_writeToStderr)(); +} + +void ProgressBar::render(State & state_) +{ + std::cerr << '\r' << state_.status; + if (!state_.activities.empty()) { + if (!state_.status.empty()) std::cerr << ' '; + std::cerr << *state_.activities.rbegin(); + } + std::cerr << "\e[K"; + std::cerr.flush(); +} + + +ProgressBar::Activity ProgressBar::startActivity(const FormatOrString & fs) +{ + return Activity(*this, fs); +} + +ProgressBar::Activity::Activity(ProgressBar & pb, const FormatOrString & fs) + : pb(pb) +{ + auto state_(pb.state.lock()); + state_->activities.push_back(fs.s); + it = state_->activities.end(); --it; + pb.render(*state_); +} + +ProgressBar::Activity::~Activity() +{ + auto state_(pb.state.lock()); + state_->activities.erase(it); +} + +} diff --git a/src/nix/progress-bar.hh b/src/nix/progress-bar.hh new file mode 100644 index 000000000..2dda24346 --- /dev/null +++ b/src/nix/progress-bar.hh @@ -0,0 +1,49 @@ +#pragma once + +#include "sync.hh" +#include "util.hh" + +namespace nix { + +class ProgressBar +{ +private: + struct State + { + std::string status; + bool done = false; + std::list<std::string> activities; + }; + + Sync<State> state; + +public: + + ProgressBar(); + + ~ProgressBar(); + + void updateStatus(const std::string & s); + + void done(); + + class Activity + { + friend class ProgressBar; + private: + ProgressBar & pb; + std::list<std::string>::iterator it; + Activity(ProgressBar & pb, const FormatOrString & fs); + public: + ~Activity(); + }; + + Activity startActivity(const FormatOrString & fs); + +private: + + void render(State & state_); + +}; + +} diff --git a/src/nix/verify.cc b/src/nix/verify.cc new file mode 100644 index 000000000..ef3e9fcc2 --- /dev/null +++ b/src/nix/verify.cc @@ -0,0 +1,124 @@ +#include "affinity.hh" // FIXME +#include "command.hh" +#include "progress-bar.hh" +#include "shared.hh" +#include "store-api.hh" +#include "sync.hh" +#include "thread-pool.hh" + +#include <atomic> + +using namespace nix; + +struct CmdVerifyPaths : StorePathsCommand +{ + bool noContents = false; + bool noSigs = false; + + CmdVerifyPaths() + { + mkFlag(0, "no-contents", "do not verify the contents of each store path", &noContents); + mkFlag(0, "no-sigs", "do not verify whether each store path has a valid signature", &noSigs); + } + + std::string name() override + { + return "verify-paths"; + } + + std::string description() override + { + return "verify the integrity of store paths"; + } + + void run(ref<Store> store, Paths storePaths) override + { + restoreAffinity(); // FIXME + + auto publicKeys = getDefaultPublicKeys(); + + std::atomic<size_t> untrusted{0}; + std::atomic<size_t> corrupted{0}; + std::atomic<size_t> done{0}; + std::atomic<size_t> failed{0}; + + ProgressBar progressBar; + + auto showProgress = [&](bool final) { + std::string s; + if (final) + s = (format("checked %d paths") % storePaths.size()).str(); + else + s = (format("[%d/%d checked") % done % storePaths.size()).str(); + if (corrupted > 0) + s += (format(", %d corrupted") % corrupted).str(); + if (untrusted > 0) + s += (format(", %d untrusted") % untrusted).str(); + if (failed > 0) + s += (format(", %d failed") % failed).str(); + if (!final) s += "]"; + return s; + }; + + progressBar.updateStatus(showProgress(false)); + + ThreadPool pool; + + auto doPath = [&](const Path & storePath) { + try { + progressBar.startActivity(format("checking ‘%s’") % storePath); + + auto info = store->queryPathInfo(storePath); + + if (!noContents) { + + HashSink sink(info.narHash.type); + store->narFromPath(storePath, sink); + + auto hash = sink.finish(); + + if (hash.first != info.narHash) { + corrupted = 1; + printMsg(lvlError, + format("path ‘%s’ was modified! expected hash ‘%s’, got ‘%s’") + % storePath % printHash(info.narHash) % printHash(hash.first)); + } + + } + + if (!noSigs) { + + if (!info.ultimate && !info.checkSignatures(publicKeys)) { + untrusted++; + printMsg(lvlError, format("path ‘%s’ is untrusted") % storePath); + } + + } + + done++; + + progressBar.updateStatus(showProgress(false)); + + } catch (Error & e) { + printMsg(lvlError, format(ANSI_RED "error:" ANSI_NORMAL " %s") % e.what()); + failed++; + } + }; + + for (auto & storePath : storePaths) + pool.enqueue(std::bind(doPath, storePath)); + + pool.process(); + + progressBar.done(); + + printMsg(lvlInfo, showProgress(true)); + + throw Exit( + (corrupted ? 1 : 0) | + (untrusted ? 2 : 0) | + (failed ? 4 : 0)); + } +}; + +static RegisterCommand r1(make_ref<CmdVerifyPaths>()); |