aboutsummaryrefslogtreecommitdiff
path: root/src/libstore
diff options
context:
space:
mode:
authorEelco Dolstra <edolstra@gmail.com>2021-08-20 11:18:35 +0200
committerEelco Dolstra <edolstra@gmail.com>2021-10-13 12:12:44 +0200
commit262520fcfe2544a7278b6b5967d0d8b605fd89d9 (patch)
tree05750f4917f309c33d50b7dd6265da3879ed4501 /src/libstore
parentff453b06f94b4305694beac8255d9ff51bed1a63 (diff)
Use a thread per connection
Diffstat (limited to 'src/libstore')
-rw-r--r--src/libstore/gc.cc127
1 files changed, 60 insertions, 67 deletions
diff --git a/src/libstore/gc.cc b/src/libstore/gc.cc
index ff66c3938..bb76ee084 100644
--- a/src/libstore/gc.cc
+++ b/src/libstore/gc.cc
@@ -636,6 +636,7 @@ void LocalStore::removeUnusedLinks(const GCState & state)
}
+
void LocalStore::collectGarbage(const GCOptions & options, GCResults & results)
{
GCState state(options, results);
@@ -674,83 +675,75 @@ void LocalStore::collectGarbage(const GCOptions & options, GCResults & results)
shutdownPipe.create();
std::thread serverThread([&]() {
- std::map<int, std::pair<std::unique_ptr<AutoCloseFD>, std::string>> fdClients;
- bool quit = false;
+ Sync<std::map<int, std::thread>> connections;
+ std::atomic_bool quit = false;
+
+ Finally cleanup([&]() {
+ debug("GC roots server shutting down");
+ while (true) {
+ auto item = remove_begin(*connections.lock());
+ if (!item) break;
+ auto & [fd, thread] = *item;
+ shutdown(fd, SHUT_RDWR);
+ thread.join();
+ }
+ });
- while (!quit) {
+ while (true) {
std::vector<struct pollfd> fds;
fds.push_back({.fd = shutdownPipe.readSide.get(), .events = POLLIN});
fds.push_back({.fd = fdServer.get(), .events = POLLIN});
- for (auto & i : fdClients)
- fds.push_back({.fd = i.first, .events = POLLIN});
auto count = poll(fds.data(), fds.size(), -1);
assert(count != -1);
- for (auto & fd : fds) {
- if (!fd.revents) continue;
- if (fd.fd == shutdownPipe.readSide.get())
- /* Parent is asking us to quit. */
- quit = true;
- else if (fd.fd == fdServer.get()) {
- /* Accept a new connection. */
- assert(fd.revents & POLLIN);
- auto fdClient = std::make_unique<AutoCloseFD>(accept(fdServer.get(), nullptr, nullptr));
- if (*fdClient) {
- auto fd = fdClient->get();
- fdClients.insert({fd, std::make_pair(std::move(fdClient), "")});
+ if (fds[0].revents)
+ /* Parent is asking us to quit. */
+ break;
+
+ if (fds[1].revents) {
+ /* Accept a new connection. */
+ assert(fds[1].revents & POLLIN);
+ AutoCloseFD fdClient = accept(fdServer.get(), nullptr, nullptr);
+ if (!fdClient) continue;
+
+ /* Process the connection in a separate thread. */
+ auto fdClient_ = fdClient.get();
+ std::thread clientThread([&, fdClient = std::move(fdClient)]() {
+ Finally cleanup([&]() {
+ connections.lock()->erase(fdClient.get());
+ });
+
+ while (true) {
+ try {
+ auto path = readLine(fdClient.get());
+ auto storePath = maybeParseStorePath(path);
+ if (storePath) {
+ debug("got new GC root '%s'", path);
+ auto hashPart = std::string(storePath->hashPart());
+ auto shared(state.shared.lock());
+ shared->tempRoots.insert(hashPart);
+ /* If this path is currently being
+ deleted, then we have to wait until
+ deletion is finished to ensure that
+ the client doesn't start
+ re-creating it before we're
+ done. FIXME: ideally we would use a
+ FD for this so we don't block the
+ poll loop. */
+ while (shared->pending == hashPart) {
+ debug("synchronising with deletion of path '%s'", path);
+ shared.wait(state.wakeup);
+ }
+ } else
+ printError("received garbage instead of a root from client");
+ writeFull(fdClient.get(), "1", false);
+ } catch (Error &) { break; }
}
- }
- else {
- /* Receive data from a client. */
- auto fdClient = fdClients.find(fd.fd);
- assert(fdClient != fdClients.end());
- if (fd.revents & POLLIN) {
- char buf[16384];
- auto n = read(fd.fd, buf, sizeof(buf));
- if (n > 0) {
- fdClient->second.second.append(buf, n);
- /* Split the input into lines. */
- while (true) {
- auto p = fdClient->second.second.find('\n');
- if (p == std::string::npos) break;
- /* We got a full line. Send ack back
- to the client. */
- auto path = fdClient->second.second.substr(0, p);
- fdClient->second.second = fdClient->second.second.substr(p + 1);
- auto storePath = maybeParseStorePath(path);
- if (storePath) {
- debug("got new GC root '%s'", path);
- auto hashPart = std::string(storePath->hashPart());
- auto shared(state.shared.lock());
- shared->tempRoots.insert(hashPart);
- /* If this path is currently being
- deleted, then we have to wait
- until deletion is finished to
- ensure that the client doesn't
- start re-creating it before
- we're done. FIXME: ideally we
- would use a FD for this so we
- don't block the poll loop. */
- while (shared->pending == hashPart) {
- debug("synchronising with deletion of path '%s'", path);
- shared.wait(state.wakeup);
- }
- } else
- printError("received garbage instead of a root from client");
- // This could block, but meh.
- try {
- writeFull(fd.fd, "1", false);
- } catch (SysError &) { }
- }
- } else if (n == 0)
- fdClients.erase(fdClient);
- } else
- fdClients.erase(fdClient);
- }
+ });
+
+ connections.lock()->insert({fdClient_, std::move(clientThread)});
}
}
-
- debug("GC roots server shut down");
});
Finally stopServer([&]() {