aboutsummaryrefslogtreecommitdiff
path: root/src/libstore/download.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstore/download.cc')
-rw-r--r--src/libstore/download.cc29
1 files changed, 17 insertions, 12 deletions
diff --git a/src/libstore/download.cc b/src/libstore/download.cc
index 97e9b0b2f..317ec37d1 100644
--- a/src/libstore/download.cc
+++ b/src/libstore/download.cc
@@ -10,6 +10,7 @@
#include <curl/curl.h>
+#include <queue>
#include <iostream>
#include <thread>
#include <cmath>
@@ -281,8 +282,13 @@ struct CurlDownloader : public Downloader
struct State
{
+ struct EmbargoComparator {
+ bool operator() (const std::shared_ptr<DownloadItem> & i1, const std::shared_ptr<DownloadItem> & i2) {
+ return i1->embargo > i2->embargo;
+ }
+ };
bool quit = false;
- std::vector<std::shared_ptr<DownloadItem>> incoming;
+ std::priority_queue<std::shared_ptr<DownloadItem>, std::vector<std::shared_ptr<DownloadItem>>, EmbargoComparator> incoming;
};
Sync<State> state_;
@@ -380,9 +386,7 @@ struct CurlDownloader : public Downloader
/* Add new curl requests from the incoming requests queue,
except for requests that are embargoed (waiting for a
- retry timeout to expire). FIXME: should use a priority
- queue for the embargoed items to prevent repeated O(n)
- checks. */
+ retry timeout to expire). */
if (extraFDs[0].revents & CURL_WAIT_POLLIN) {
char buf[1024];
auto res = read(extraFDs[0].fd, buf, sizeof(buf));
@@ -390,22 +394,23 @@ struct CurlDownloader : public Downloader
throw SysError("reading curl wakeup socket");
}
- std::vector<std::shared_ptr<DownloadItem>> incoming, embargoed;
+ std::vector<std::shared_ptr<DownloadItem>> incoming;
auto now = std::chrono::steady_clock::now();
{
auto state(state_.lock());
- for (auto & item: state->incoming) {
- if (item->embargo <= now)
+ while (!state->incoming.empty()) {
+ auto item = state->incoming.top();
+ if (item->embargo <= now) {
incoming.push_back(item);
- else {
- embargoed.push_back(item);
+ state->incoming.pop();
+ } else {
if (nextWakeup == std::chrono::steady_clock::time_point()
|| item->embargo < nextWakeup)
nextWakeup = item->embargo;
+ break;
}
}
- state->incoming = embargoed;
quit = state->quit;
}
@@ -432,7 +437,7 @@ struct CurlDownloader : public Downloader
{
auto state(state_.lock());
- state->incoming.clear();
+ while (!state->incoming.empty()) state->incoming.pop();
state->quit = true;
}
}
@@ -443,7 +448,7 @@ struct CurlDownloader : public Downloader
auto state(state_.lock());
if (state->quit)
throw nix::Error("cannot enqueue download request because the download thread is shutting down");
- state->incoming.push_back(item);
+ state->incoming.push(item);
}
writeFull(wakeupPipe.writeSide.get(), " ");
}