aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEelco Dolstra <eelco.dolstra@logicblox.com>2016-02-24 11:39:56 +0100
committerEelco Dolstra <eelco.dolstra@logicblox.com>2016-02-24 11:39:56 +0100
commit5f862658c3f8e518fb631d0536f2b38f107970e1 (patch)
tree34a38a2405fda244f547b06c6db264f3986e1aab
parentd5626bf4c14f725136f2c5b6ac8bf818627352f0 (diff)
Remove bad daemon connections from the pool
This is necessary for long-running processes like hydra-queue-runner: if a nix-daemon worker is killed, we need to stop reusing that connection.
-rw-r--r--src/libstore/remote-store.cc6
-rw-r--r--src/libutil/pool.hh18
-rw-r--r--src/libutil/serialise.cc22
-rw-r--r--src/libutil/serialise.hh23
4 files changed, 55 insertions, 14 deletions
diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc
index f6ec3fffb..2f540c640 100644
--- a/src/libstore/remote-store.cc
+++ b/src/libstore/remote-store.cc
@@ -40,7 +40,11 @@ template PathSet readStorePaths(Source & from);
RemoteStore::RemoteStore(size_t maxConnections)
- : connections(make_ref<Pool<Connection>>(maxConnections, [this]() { return openConnection(); }))
+ : connections(make_ref<Pool<Connection>>(
+ maxConnections,
+ [this]() { return openConnection(); },
+ [](const ref<Connection> & r) { return r->to.good() && r->from.good(); }
+ ))
{
}
diff --git a/src/libutil/pool.hh b/src/libutil/pool.hh
index 4b865a193..b75a3cbf8 100644
--- a/src/libutil/pool.hh
+++ b/src/libutil/pool.hh
@@ -33,11 +33,17 @@ class Pool
{
public:
+ /* A function that produces new instances of R on demand. */
typedef std::function<ref<R>()> Factory;
+ /* A function that checks whether an instance of R is still
+ usable. Unusable instances are removed from the pool. */
+ typedef std::function<bool(const ref<R> &)> Validator;
+
private:
Factory factory;
+ Validator validator;
struct State
{
@@ -53,8 +59,10 @@ private:
public:
Pool(size_t max = std::numeric_limits<size_t>::max,
- const Factory & factory = []() { return make_ref<R>(); })
+ const Factory & factory = []() { return make_ref<R>(); },
+ const Validator & validator = [](ref<R> r) { return true; })
: factory(factory)
+ , validator(validator)
{
auto state_(state.lock());
state_->max = max;
@@ -109,11 +117,13 @@ public:
while (state_->idle.empty() && state_->inUse >= state_->max)
state_.wait(wakeup);
- if (!state_->idle.empty()) {
+ while (!state_->idle.empty()) {
auto p = state_->idle.back();
state_->idle.pop_back();
- state_->inUse++;
- return Handle(*this, p);
+ if (validator(p)) {
+ state_->inUse++;
+ return Handle(*this, p);
+ }
}
state_->inUse++;
diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc
index f136a1324..c9620e2bf 100644
--- a/src/libutil/serialise.cc
+++ b/src/libutil/serialise.cc
@@ -72,7 +72,17 @@ void FdSink::write(const unsigned char * data, size_t len)
warned = true;
}
}
- writeFull(fd, data, len);
+ try {
+ writeFull(fd, data, len);
+ } catch (SysError & e) {
+ _good = true;
+ }
+}
+
+
+bool FdSink::good()
+{
+ return _good;
}
@@ -119,12 +129,18 @@ size_t FdSource::readUnbuffered(unsigned char * data, size_t len)
checkInterrupt();
n = ::read(fd, (char *) data, bufSize);
} while (n == -1 && errno == EINTR);
- if (n == -1) throw SysError("reading from file");
- if (n == 0) throw EndOfFile("unexpected end-of-file");
+ if (n == -1) { _good = false; throw SysError("reading from file"); }
+ if (n == 0) { _good = false; throw EndOfFile("unexpected end-of-file"); }
return n;
}
+bool FdSource::good()
+{
+ return _good;
+}
+
+
size_t StringSource::read(unsigned char * data, size_t len)
{
if (pos == s.size()) throw EndOfFile("end of string reached");
diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh
index 979ff849f..9e269f392 100644
--- a/src/libutil/serialise.hh
+++ b/src/libutil/serialise.hh
@@ -12,6 +12,7 @@ struct Sink
{
virtual ~Sink() { }
virtual void operator () (const unsigned char * data, size_t len) = 0;
+ virtual bool good() { return true; }
};
@@ -25,7 +26,7 @@ struct BufferedSink : Sink
: bufSize(bufSize), bufPos(0), buffer(0) { }
~BufferedSink();
- void operator () (const unsigned char * data, size_t len);
+ void operator () (const unsigned char * data, size_t len) override;
void flush();
@@ -47,6 +48,8 @@ struct Source
return the number of bytes stored. If blocks until at least
one byte is available. */
virtual size_t read(unsigned char * data, size_t len) = 0;
+
+ virtual bool good() { return true; }
};
@@ -60,7 +63,7 @@ struct BufferedSource : Source
: bufSize(bufSize), bufPosIn(0), bufPosOut(0), buffer(0) { }
~BufferedSource();
- size_t read(unsigned char * data, size_t len);
+ size_t read(unsigned char * data, size_t len) override;
/* Underlying read call, to be overridden. */
virtual size_t readUnbuffered(unsigned char * data, size_t len) = 0;
@@ -80,7 +83,12 @@ struct FdSink : BufferedSink
FdSink(int fd) : fd(fd), warn(false), written(0) { }
~FdSink();
- void write(const unsigned char * data, size_t len);
+ void write(const unsigned char * data, size_t len) override;
+
+ bool good() override;
+
+private:
+ bool _good = true;
};
@@ -90,7 +98,10 @@ struct FdSource : BufferedSource
int fd;
FdSource() : fd(-1) { }
FdSource(int fd) : fd(fd) { }
- size_t readUnbuffered(unsigned char * data, size_t len);
+ size_t readUnbuffered(unsigned char * data, size_t len) override;
+ bool good() override;
+private:
+ bool _good = true;
};
@@ -98,7 +109,7 @@ struct FdSource : BufferedSource
struct StringSink : Sink
{
string s;
- void operator () (const unsigned char * data, size_t len);
+ void operator () (const unsigned char * data, size_t len) override;
};
@@ -108,7 +119,7 @@ struct StringSource : Source
const string & s;
size_t pos;
StringSource(const string & _s) : s(_s), pos(0) { }
- size_t read(unsigned char * data, size_t len);
+ size_t read(unsigned char * data, size_t len) override;
};