diff options
author | Eelco Dolstra <eelco.dolstra@logicblox.com> | 2016-02-24 11:39:56 +0100 |
---|---|---|
committer | Eelco Dolstra <eelco.dolstra@logicblox.com> | 2016-02-24 11:39:56 +0100 |
commit | 5f862658c3f8e518fb631d0536f2b38f107970e1 (patch) | |
tree | 34a38a2405fda244f547b06c6db264f3986e1aab | |
parent | d5626bf4c14f725136f2c5b6ac8bf818627352f0 (diff) |
Remove bad daemon connections from the pool
This is necessary for long-running processes like hydra-queue-runner:
if a nix-daemon worker is killed, we need to stop reusing that
connection.
-rw-r--r-- | src/libstore/remote-store.cc | 6 | ||||
-rw-r--r-- | src/libutil/pool.hh | 18 | ||||
-rw-r--r-- | src/libutil/serialise.cc | 22 | ||||
-rw-r--r-- | src/libutil/serialise.hh | 23 |
4 files changed, 55 insertions, 14 deletions
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index f6ec3fffb..2f540c640 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -40,7 +40,11 @@ template PathSet readStorePaths(Source & from); RemoteStore::RemoteStore(size_t maxConnections) - : connections(make_ref<Pool<Connection>>(maxConnections, [this]() { return openConnection(); })) + : connections(make_ref<Pool<Connection>>( + maxConnections, + [this]() { return openConnection(); }, + [](const ref<Connection> & r) { return r->to.good() && r->from.good(); } + )) { } diff --git a/src/libutil/pool.hh b/src/libutil/pool.hh index 4b865a193..b75a3cbf8 100644 --- a/src/libutil/pool.hh +++ b/src/libutil/pool.hh @@ -33,11 +33,17 @@ class Pool { public: + /* A function that produces new instances of R on demand. */ typedef std::function<ref<R>()> Factory; + /* A function that checks whether an instance of R is still + usable. Unusable instances are removed from the pool. */ + typedef std::function<bool(const ref<R> &)> Validator; + private: Factory factory; + Validator validator; struct State { @@ -53,8 +59,10 @@ private: public: Pool(size_t max = std::numeric_limits<size_t>::max, - const Factory & factory = []() { return make_ref<R>(); }) + const Factory & factory = []() { return make_ref<R>(); }, + const Validator & validator = [](ref<R> r) { return true; }) : factory(factory) + , validator(validator) { auto state_(state.lock()); state_->max = max; @@ -109,11 +117,13 @@ public: while (state_->idle.empty() && state_->inUse >= state_->max) state_.wait(wakeup); - if (!state_->idle.empty()) { + while (!state_->idle.empty()) { auto p = state_->idle.back(); state_->idle.pop_back(); - state_->inUse++; - return Handle(*this, p); + if (validator(p)) { + state_->inUse++; + return Handle(*this, p); + } } state_->inUse++; diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc index f136a1324..c9620e2bf 100644 --- a/src/libutil/serialise.cc +++ b/src/libutil/serialise.cc @@ -72,7 +72,17 @@ void FdSink::write(const unsigned char * data, size_t len) warned = true; } } - writeFull(fd, data, len); + try { + writeFull(fd, data, len); + } catch (SysError & e) { + _good = true; + } +} + + +bool FdSink::good() +{ + return _good; } @@ -119,12 +129,18 @@ size_t FdSource::readUnbuffered(unsigned char * data, size_t len) checkInterrupt(); n = ::read(fd, (char *) data, bufSize); } while (n == -1 && errno == EINTR); - if (n == -1) throw SysError("reading from file"); - if (n == 0) throw EndOfFile("unexpected end-of-file"); + if (n == -1) { _good = false; throw SysError("reading from file"); } + if (n == 0) { _good = false; throw EndOfFile("unexpected end-of-file"); } return n; } +bool FdSource::good() +{ + return _good; +} + + size_t StringSource::read(unsigned char * data, size_t len) { if (pos == s.size()) throw EndOfFile("end of string reached"); diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh index 979ff849f..9e269f392 100644 --- a/src/libutil/serialise.hh +++ b/src/libutil/serialise.hh @@ -12,6 +12,7 @@ struct Sink { virtual ~Sink() { } virtual void operator () (const unsigned char * data, size_t len) = 0; + virtual bool good() { return true; } }; @@ -25,7 +26,7 @@ struct BufferedSink : Sink : bufSize(bufSize), bufPos(0), buffer(0) { } ~BufferedSink(); - void operator () (const unsigned char * data, size_t len); + void operator () (const unsigned char * data, size_t len) override; void flush(); @@ -47,6 +48,8 @@ struct Source return the number of bytes stored. If blocks until at least one byte is available. */ virtual size_t read(unsigned char * data, size_t len) = 0; + + virtual bool good() { return true; } }; @@ -60,7 +63,7 @@ struct BufferedSource : Source : bufSize(bufSize), bufPosIn(0), bufPosOut(0), buffer(0) { } ~BufferedSource(); - size_t read(unsigned char * data, size_t len); + size_t read(unsigned char * data, size_t len) override; /* Underlying read call, to be overridden. */ virtual size_t readUnbuffered(unsigned char * data, size_t len) = 0; @@ -80,7 +83,12 @@ struct FdSink : BufferedSink FdSink(int fd) : fd(fd), warn(false), written(0) { } ~FdSink(); - void write(const unsigned char * data, size_t len); + void write(const unsigned char * data, size_t len) override; + + bool good() override; + +private: + bool _good = true; }; @@ -90,7 +98,10 @@ struct FdSource : BufferedSource int fd; FdSource() : fd(-1) { } FdSource(int fd) : fd(fd) { } - size_t readUnbuffered(unsigned char * data, size_t len); + size_t readUnbuffered(unsigned char * data, size_t len) override; + bool good() override; +private: + bool _good = true; }; @@ -98,7 +109,7 @@ struct FdSource : BufferedSource struct StringSink : Sink { string s; - void operator () (const unsigned char * data, size_t len); + void operator () (const unsigned char * data, size_t len) override; }; @@ -108,7 +119,7 @@ struct StringSource : Source const string & s; size_t pos; StringSource(const string & _s) : s(_s), pos(0) { } - size_t read(unsigned char * data, size_t len); + size_t read(unsigned char * data, size_t len) override; }; |