aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/libstore/filetransfer.cc33
1 files changed, 10 insertions, 23 deletions
diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc
index 9ef220769..7d36796f4 100644
--- a/src/libstore/filetransfer.cc
+++ b/src/libstore/filetransfer.cc
@@ -504,11 +504,6 @@ struct curlFileTransfer : public FileTransfer
Sync<State> state_;
- /* We can't use a std::condition_variable to wake up the curl
- thread, because it only monitors file descriptors. So use a
- pipe instead. */
- Pipe wakeupPipe;
-
std::thread workerThread;
curlFileTransfer()
@@ -523,9 +518,6 @@ struct curlFileTransfer : public FileTransfer
curl_multi_setopt(curlm, CURLMOPT_MAX_TOTAL_CONNECTIONS,
fileTransferSettings.httpConnections.get());
- wakeupPipe.create();
- fcntl(wakeupPipe.readSide.get(), F_SETFL, O_NONBLOCK);
-
workerThread = std::thread([&]() { workerThreadEntry(); });
}
@@ -538,6 +530,12 @@ struct curlFileTransfer : public FileTransfer
if (curlm) curl_multi_cleanup(curlm);
}
+ void wakeup()
+ {
+ if (auto mc = curl_multi_wakeup(curlm))
+ throw nix::Error("unexpected error from curl_multi_wakeup(): %s", curl_multi_strerror(mc));
+ }
+
void stopWorkerThread()
{
/* Signal the worker thread to exit. */
@@ -545,7 +543,7 @@ struct curlFileTransfer : public FileTransfer
auto state(state_.lock());
state->quit = true;
}
- writeFull(wakeupPipe.writeSide.get(), " ", false);
+ wakeup();
}
void workerThreadMain()
@@ -587,32 +585,21 @@ struct curlFileTransfer : public FileTransfer
}
/* Wait for activity, including wakeup events. */
- int numfds = 0;
- struct curl_waitfd extraFDs[1];
- extraFDs[0].fd = wakeupPipe.readSide.get();
- extraFDs[0].events = CURL_WAIT_POLLIN;
- extraFDs[0].revents = 0;
long maxSleepTimeMs = items.empty() ? 10000 : 100;
auto sleepTimeMs =
nextWakeup != std::chrono::steady_clock::time_point()
? std::max(0, (int) std::chrono::duration_cast<std::chrono::milliseconds>(nextWakeup - std::chrono::steady_clock::now()).count())
: maxSleepTimeMs;
vomit("download thread waiting for %d ms", sleepTimeMs);
- mc = curl_multi_wait(curlm, extraFDs, 1, sleepTimeMs, &numfds);
+ mc = curl_multi_poll(curlm, nullptr, 0, sleepTimeMs, nullptr);
if (mc != CURLM_OK)
- throw nix::Error("unexpected error from curl_multi_wait(): %s", curl_multi_strerror(mc));
+ throw nix::Error("unexpected error from curl_multi_poll(): %s", curl_multi_strerror(mc));
nextWakeup = std::chrono::steady_clock::time_point();
/* Add new curl requests from the incoming requests queue,
except for requests that are embargoed (waiting for a
retry timeout to expire). */
- if (extraFDs[0].revents & CURL_WAIT_POLLIN) {
- char buf[1024];
- auto res = read(extraFDs[0].fd, buf, sizeof(buf));
- if (res == -1 && errno != EINTR)
- throw SysError("reading curl wakeup socket");
- }
std::vector<std::shared_ptr<TransferItem>> incoming;
auto now = std::chrono::steady_clock::now();
@@ -675,7 +662,7 @@ struct curlFileTransfer : public FileTransfer
throw nix::Error("cannot enqueue download request because the download thread is shutting down");
state->incoming.push(item);
}
- writeFull(wakeupPipe.writeSide.get(), " ");
+ wakeup();
}
#if ENABLE_S3