1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
|
#include "filetransfer.hh"
#include "compression.hh"
#include <cstdint>
#include <exception>
#include <future>
#include <gtest/gtest.h>
#include <netinet/in.h>
#include <stdexcept>
#include <string_view>
#include <sys/poll.h>
#include <sys/socket.h>
#include <thread>
#include <unistd.h>
// local server tests don't work on darwin without some incantations
// the horrors do not want to look up. contributions welcome though!
#if __APPLE__
#define NOT_ON_DARWIN(n) DISABLED_##n
#else
#define NOT_ON_DARWIN(n) n
#endif
using namespace std::chrono_literals;
namespace {
struct Reply {
std::string status, headers;
std::function<std::string()> content;
};
}
namespace nix {
static std::tuple<uint16_t, AutoCloseFD>
serveHTTP(std::vector<Reply> replies)
{
AutoCloseFD listener(::socket(AF_INET6, SOCK_STREAM, 0));
if (!listener) {
throw SysError(errno, "socket() failed");
}
Pipe trigger;
trigger.create();
sockaddr_in6 addr = {
.sin6_family = AF_INET6,
.sin6_addr = IN6ADDR_LOOPBACK_INIT,
};
socklen_t len = sizeof(addr);
if (::bind(listener.get(), reinterpret_cast<const sockaddr *>(&addr), sizeof(addr)) < 0) {
throw SysError(errno, "bind() failed");
}
if (::getsockname(listener.get(), reinterpret_cast<sockaddr *>(&addr), &len) < 0) {
throw SysError(errno, "getsockname() failed");
}
if (::listen(listener.get(), 1) < 0) {
throw SysError(errno, "listen() failed");
}
std::thread(
[replies, at{0}](AutoCloseFD socket, AutoCloseFD trigger) mutable {
while (true) {
pollfd pfds[2] = {
{
.fd = socket.get(),
.events = POLLIN,
},
{
.fd = trigger.get(),
.events = POLLHUP,
},
};
if (::poll(pfds, 2, -1) <= 0) {
throw SysError(errno, "poll() failed");
}
if (pfds[1].revents & POLLHUP) {
return;
}
if (!(pfds[0].revents & POLLIN)) {
continue;
}
AutoCloseFD conn(::accept(socket.get(), nullptr, nullptr));
if (!conn) {
throw SysError(errno, "accept() failed");
}
auto send = [&](std::string_view bit) {
while (!bit.empty()) {
auto written = ::write(conn.get(), bit.data(), bit.size());
if (written < 0) {
throw SysError(errno, "write() failed");
}
bit.remove_prefix(written);
}
};
const auto & reply = replies[at++ % replies.size()];
send("HTTP/1.1 ");
send(reply.status);
send("\r\n");
send(reply.headers);
send("\r\n");
send(reply.content());
::shutdown(conn.get(), SHUT_RDWR);
}
},
std::move(listener),
std::move(trigger.readSide)
)
.detach();
return {
ntohs(addr.sin6_port),
std::move(trigger.writeSide),
};
}
static std::tuple<uint16_t, AutoCloseFD>
serveHTTP(std::string status, std::string headers, std::function<std::string()> content)
{
return serveHTTP({{{status, headers, content}}});
}
TEST(FileTransfer, exceptionAbortsDownload)
{
struct Done
{};
auto ft = makeFileTransfer();
LambdaSink broken([](auto block) { throw Done(); });
ASSERT_THROW(ft->download(FileTransferRequest("file:///dev/zero"), broken), Done);
// makeFileTransfer returns a ref<>, which cannot be cleared. since we also
// can't default-construct it we'll have to overwrite it instead, but we'll
// take the raw pointer out first so we can destroy it in a detached thread
// (otherwise a failure will stall the process and have it killed by meson)
auto reset = std::async(std::launch::async, [&]() { ft = makeFileTransfer(); });
EXPECT_EQ(reset.wait_for(10s), std::future_status::ready);
// if this did time out we have to leak `reset`.
if (reset.wait_for(0s) == std::future_status::timeout) {
(void) new auto(std::move(reset));
}
}
TEST(FileTransfer, NOT_ON_DARWIN(reportsSetupErrors))
{
auto [port, srv] = serveHTTP("404 not found", "", [] { return ""; });
auto ft = makeFileTransfer();
ASSERT_THROW(
ft->download(FileTransferRequest(fmt("http://[::1]:%d/index", port))),
FileTransferError);
}
TEST(FileTransfer, NOT_ON_DARWIN(reportsTransferError))
{
auto [port, srv] = serveHTTP("200 ok", "content-length: 100\r\n", [] {
std::this_thread::sleep_for(10ms);
return "";
});
auto ft = makeFileTransfer();
FileTransferRequest req(fmt("http://[::1]:%d/index", port));
req.baseRetryTimeMs = 0;
ASSERT_THROW(ft->download(req), FileTransferError);
}
TEST(FileTransfer, NOT_ON_DARWIN(handlesContentEncoding))
{
std::string original = "Test data string";
std::string compressed = compress("gzip", original);
auto [port, srv] = serveHTTP("200 ok", "content-encoding: gzip\r\n", [&] { return compressed; });
auto ft = makeFileTransfer();
StringSink sink;
ft->download(FileTransferRequest(fmt("http://[::1]:%d/index", port)), sink);
EXPECT_EQ(sink.s, original);
}
TEST(FileTransfer, usesIntermediateLinkHeaders)
{
auto [port, srv] = serveHTTP({
{"301 ok",
"location: /second\r\n"
"content-length: 0\r\n",
[] { return ""; }},
{"307 ok",
"location: /third\r\n"
"content-length: 0\r\n",
[] { return ""; }},
{"307 ok",
"location: /fourth\r\n"
"link: <http://foo>; rel=\"immutable\"\r\n"
"content-length: 0\r\n",
[] { return ""; }},
{"200 ok", "content-length: 1\r\n", [] { return "a"; }},
});
auto ft = makeFileTransfer();
FileTransferRequest req(fmt("http://[::1]:%d/first", port));
req.baseRetryTimeMs = 0;
auto result = ft->download(req);
ASSERT_EQ(result.immutableUrl, "http://foo");
}
}
|