aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/libstore/binary-cache-store.cc20
-rw-r--r--src/libutil/compression.cc166
-rw-r--r--src/libutil/compression.hh6
-rw-r--r--src/libutil/serialise.hh18
-rw-r--r--src/libutil/util.cc75
-rw-r--r--src/libutil/util.hh8
6 files changed, 206 insertions, 87 deletions
diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc
index d1b278b8e..2e9a13e56 100644
--- a/src/libstore/binary-cache-store.cc
+++ b/src/libstore/binary-cache-store.cc
@@ -203,22 +203,18 @@ void BinaryCacheStore::narFromPath(const Path & storePath, Sink & sink)
stats.narRead++;
stats.narReadCompressedBytes += nar->size();
- /* Decompress the NAR. FIXME: would be nice to have the remote
- side do this. */
- try {
- nar = decompress(info->compression, *nar);
- } catch (UnknownCompressionMethod &) {
- throw Error(format("binary cache path '%s' uses unknown compression method '%s'")
- % storePath % info->compression);
- }
+ uint64_t narSize = 0;
- stats.narReadBytes += nar->size();
+ StringSource source(*nar);
- printMsg(lvlTalkative, format("exporting path '%1%' (%2% bytes)") % storePath % nar->size());
+ LambdaSink wrapperSink([&](const unsigned char * data, size_t len) {
+ sink(data, len);
+ narSize += len;
+ });
- assert(nar->size() % 8 == 0);
+ decompress(info->compression, source, wrapperSink);
- sink((unsigned char *) nar->c_str(), nar->size());
+ stats.narReadBytes += narSize;
}
void BinaryCacheStore::queryPathInfoUncached(const Path & storePath,
diff --git a/src/libutil/compression.cc b/src/libutil/compression.cc
index 470c925ed..e7fedcbdc 100644
--- a/src/libutil/compression.cc
+++ b/src/libutil/compression.cc
@@ -17,7 +17,23 @@
namespace nix {
-static ref<std::string> decompressXZ(const std::string & in)
+static const size_t bufSize = 32 * 1024;
+
+static void decompressNone(Source & source, Sink & sink)
+{
+ std::vector<unsigned char> buf(bufSize);
+ while (true) {
+ size_t n;
+ try {
+ n = source.read(buf.data(), buf.size());
+ } catch (EndOfFile &) {
+ break;
+ }
+ sink(buf.data(), n);
+ }
+}
+
+static void decompressXZ(Source & source, Sink & sink)
{
lzma_stream strm(LZMA_STREAM_INIT);
@@ -29,36 +45,44 @@ static ref<std::string> decompressXZ(const std::string & in)
Finally free([&]() { lzma_end(&strm); });
lzma_action action = LZMA_RUN;
- uint8_t outbuf[BUFSIZ];
- ref<std::string> res = make_ref<std::string>();
- strm.next_in = (uint8_t *) in.c_str();
- strm.avail_in = in.size();
- strm.next_out = outbuf;
- strm.avail_out = sizeof(outbuf);
+ std::vector<uint8_t> inbuf(bufSize), outbuf(bufSize);
+ strm.next_in = nullptr;
+ strm.avail_in = 0;
+ strm.next_out = outbuf.data();
+ strm.avail_out = outbuf.size();
+ bool eof = false;
while (true) {
checkInterrupt();
+ if (strm.avail_in == 0 && !eof) {
+ strm.next_in = inbuf.data();
+ try {
+ strm.avail_in = source.read((unsigned char *) strm.next_in, inbuf.size());
+ } catch (EndOfFile &) {
+ eof = true;
+ }
+ }
+
if (strm.avail_in == 0)
action = LZMA_FINISH;
lzma_ret ret = lzma_code(&strm, action);
- if (strm.avail_out == 0 || ret == LZMA_STREAM_END) {
- res->append((char *) outbuf, sizeof(outbuf) - strm.avail_out);
- strm.next_out = outbuf;
- strm.avail_out = sizeof(outbuf);
+ if (strm.avail_out < outbuf.size()) {
+ sink((unsigned char *) outbuf.data(), outbuf.size() - strm.avail_out);
+ strm.next_out = outbuf.data();
+ strm.avail_out = outbuf.size();
}
- if (ret == LZMA_STREAM_END)
- return res;
+ if (ret == LZMA_STREAM_END) return;
if (ret != LZMA_OK)
throw CompressionError("error %d while decompressing xz file", ret);
}
}
-static ref<std::string> decompressBzip2(const std::string & in)
+static void decompressBzip2(Source & source, Sink & sink)
{
bz_stream strm;
memset(&strm, 0, sizeof(strm));
@@ -69,39 +93,50 @@ static ref<std::string> decompressBzip2(const std::string & in)
Finally free([&]() { BZ2_bzDecompressEnd(&strm); });
- char outbuf[BUFSIZ];
- ref<std::string> res = make_ref<std::string>();
- strm.next_in = (char *) in.c_str();
- strm.avail_in = in.size();
- strm.next_out = outbuf;
- strm.avail_out = sizeof(outbuf);
+ std::vector<char> inbuf(bufSize), outbuf(bufSize);
+ strm.next_in = nullptr;
+ strm.avail_in = 0;
+ strm.next_out = outbuf.data();
+ strm.avail_out = outbuf.size();
+ bool eof = false;
while (true) {
checkInterrupt();
+ if (strm.avail_in == 0 && !eof) {
+ strm.next_in = inbuf.data();
+ try {
+ strm.avail_in = source.read((unsigned char *) strm.next_in, inbuf.size());
+ } catch (EndOfFile &) {
+ eof = true;
+ }
+ }
+
int ret = BZ2_bzDecompress(&strm);
- if (strm.avail_out == 0 || ret == BZ_STREAM_END) {
- res->append(outbuf, sizeof(outbuf) - strm.avail_out);
- strm.next_out = outbuf;
- strm.avail_out = sizeof(outbuf);
+ if (strm.avail_in == 0 && strm.avail_out == outbuf.size() && eof)
+ throw CompressionError("bzip2 data ends prematurely");
+
+ if (strm.avail_out < outbuf.size()) {
+ sink((unsigned char *) outbuf.data(), outbuf.size() - strm.avail_out);
+ strm.next_out = outbuf.data();
+ strm.avail_out = outbuf.size();
}
- if (ret == BZ_STREAM_END)
- return res;
+ if (ret == BZ_STREAM_END) return;
if (ret != BZ_OK)
throw CompressionError("error while decompressing bzip2 file");
-
- if (strm.avail_in == 0)
- throw CompressionError("bzip2 data ends prematurely");
}
}
-static ref<std::string> decompressBrotli(const std::string & in)
+static void decompressBrotli(Source & source, Sink & sink)
{
#if !HAVE_BROTLI
- return make_ref<std::string>(runProgram(BROTLI, true, {"-d"}, {in}));
+ RunOptions options(BROTLI, {"-d"});
+ options.stdin = &source;
+ options.stdout = &sink;
+ runProgram2(options);
#else
auto *s = BrotliDecoderCreateInstance(nullptr, nullptr, nullptr);
if (!s)
@@ -109,16 +144,26 @@ static ref<std::string> decompressBrotli(const std::string & in)
Finally free([s]() { BrotliDecoderDestroyInstance(s); });
- uint8_t outbuf[BUFSIZ];
- ref<std::string> res = make_ref<std::string>();
- const uint8_t *next_in = (uint8_t *)in.c_str();
- size_t avail_in = in.size();
- uint8_t *next_out = outbuf;
- size_t avail_out = sizeof(outbuf);
+ std::vector<uint8_t> inbuf(bufSize), outbuf(bufSize);
+ const uint8_t * next_in = nullptr;
+ size_t avail_in = 0;
+ bool eof = false;
while (true) {
checkInterrupt();
+ if (avail_in == 0 && !eof) {
+ next_in = inbuf.data();
+ try {
+ avail_in = source.read((unsigned char *) next_in, inbuf.size());
+ } catch (EndOfFile &) {
+ eof = true;
+ }
+ }
+
+ uint8_t * next_out = outbuf.data();
+ size_t avail_out = outbuf.size();
+
auto ret = BrotliDecoderDecompressStream(s,
&avail_in, &next_in,
&avail_out, &next_out,
@@ -128,51 +173,49 @@ static ref<std::string> decompressBrotli(const std::string & in)
case BROTLI_DECODER_RESULT_ERROR:
throw CompressionError("error while decompressing brotli file");
case BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT:
- throw CompressionError("incomplete or corrupt brotli file");
+ if (eof)
+ throw CompressionError("incomplete or corrupt brotli file");
+ break;
case BROTLI_DECODER_RESULT_SUCCESS:
if (avail_in != 0)
throw CompressionError("unexpected input after brotli decompression");
break;
case BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT:
// I'm not sure if this can happen, but abort if this happens with empty buffer
- if (avail_out == sizeof(outbuf))
+ if (avail_out == outbuf.size())
throw CompressionError("brotli decompression requires larger buffer");
break;
}
// Always ensure we have full buffer for next invocation
- if (avail_out < sizeof(outbuf)) {
- res->append((char*)outbuf, sizeof(outbuf) - avail_out);
- next_out = outbuf;
- avail_out = sizeof(outbuf);
- }
+ if (avail_out < outbuf.size())
+ sink((unsigned char *) outbuf.data(), outbuf.size() - avail_out);
- if (ret == BROTLI_DECODER_RESULT_SUCCESS) return res;
+ if (ret == BROTLI_DECODER_RESULT_SUCCESS) return;
}
#endif // HAVE_BROTLI
}
-ref<std::string> compress(const std::string & method, const std::string & in, const bool parallel)
+ref<std::string> decompress(const std::string & method, const std::string & in)
{
- StringSink ssink;
- auto sink = makeCompressionSink(method, ssink, parallel);
- (*sink)(in);
- sink->finish();
- return ssink.s;
+ StringSource source(in);
+ StringSink sink;
+ decompress(method, source, sink);
+ return sink.s;
}
-ref<std::string> decompress(const std::string & method, const std::string & in)
+void decompress(const std::string & method, Source & source, Sink & sink)
{
if (method == "none")
- return make_ref<std::string>(in);
+ return decompressNone(source, sink);
else if (method == "xz")
- return decompressXZ(in);
+ return decompressXZ(source, sink);
else if (method == "bzip2")
- return decompressBzip2(in);
+ return decompressBzip2(source, sink);
else if (method == "br")
- return decompressBrotli(in);
+ return decompressBrotli(source, sink);
else
- throw UnknownCompressionMethod(format("unknown compression method '%s'") % method);
+ throw UnknownCompressionMethod("unknown compression method '%s'", method);
}
struct NoneSink : CompressionSink
@@ -499,4 +542,13 @@ ref<CompressionSink> makeCompressionSink(const std::string & method, Sink & next
throw UnknownCompressionMethod(format("unknown compression method '%s'") % method);
}
+ref<std::string> compress(const std::string & method, const std::string & in, const bool parallel)
+{
+ StringSink ssink;
+ auto sink = makeCompressionSink(method, ssink, parallel);
+ (*sink)(in);
+ sink->finish();
+ return ssink.s;
+}
+
}
diff --git a/src/libutil/compression.hh b/src/libutil/compression.hh
index a0d7530d7..f7a3e3fbd 100644
--- a/src/libutil/compression.hh
+++ b/src/libutil/compression.hh
@@ -8,10 +8,12 @@
namespace nix {
-ref<std::string> compress(const std::string & method, const std::string & in, const bool parallel = false);
-
ref<std::string> decompress(const std::string & method, const std::string & in);
+void decompress(const std::string & method, Source & source, Sink & sink);
+
+ref<std::string> compress(const std::string & method, const std::string & in, const bool parallel = false);
+
struct CompressionSink : BufferedSink
{
virtual void finish() = 0;
diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh
index 2ea5b6354..103b05767 100644
--- a/src/libutil/serialise.hh
+++ b/src/libutil/serialise.hh
@@ -56,7 +56,7 @@ struct Source
void operator () (unsigned char * data, size_t len);
/* Store up to ‘len’ in the buffer pointed to by ‘data’, and
- return the number of bytes stored. If blocks until at least
+ return the number of bytes stored. It blocks until at least
one byte is available. */
virtual size_t read(unsigned char * data, size_t len) = 0;
@@ -175,6 +175,22 @@ struct TeeSource : Source
};
+/* Convert a function into a sink. */
+struct LambdaSink : Sink
+{
+ typedef std::function<void(const unsigned char *, size_t)> lambda_t;
+
+ lambda_t lambda;
+
+ LambdaSink(const lambda_t & lambda) : lambda(lambda) { }
+
+ virtual void operator () (const unsigned char * data, size_t len)
+ {
+ lambda(data, len);
+ }
+};
+
+
void writePadding(size_t len, Sink & sink);
void writeString(const unsigned char * buf, size_t len, Sink & sink);
diff --git a/src/libutil/util.cc b/src/libutil/util.cc
index a60ba8508..37a35ab23 100644
--- a/src/libutil/util.cc
+++ b/src/libutil/util.cc
@@ -3,6 +3,7 @@
#include "affinity.hh"
#include "sync.hh"
#include "finally.hh"
+#include "serialise.hh"
#include <cctype>
#include <cerrno>
@@ -568,19 +569,25 @@ void writeFull(int fd, const string & s, bool allowInterrupts)
string drainFD(int fd)
{
- string result;
- unsigned char buffer[4096];
+ StringSink sink;
+ drainFD(fd, sink);
+ return std::move(*sink.s);
+}
+
+
+void drainFD(int fd, Sink & sink)
+{
+ std::vector<unsigned char> buf(4096);
while (1) {
checkInterrupt();
- ssize_t rd = read(fd, buffer, sizeof buffer);
+ ssize_t rd = read(fd, buf.data(), buf.size());
if (rd == -1) {
if (errno != EINTR)
throw SysError("reading from file");
}
else if (rd == 0) break;
- else result.append((char *) buffer, rd);
+ else sink(buf.data(), rd);
}
- return result;
}
@@ -920,20 +927,47 @@ string runProgram(Path program, bool searchPath, const Strings & args,
return res.second;
}
-std::pair<int, std::string> runProgram(const RunOptions & options)
+std::pair<int, std::string> runProgram(const RunOptions & options_)
+{
+ RunOptions options(options_);
+ StringSink sink;
+ options.stdout = &sink;
+
+ int status = 0;
+
+ try {
+ runProgram2(options);
+ } catch (ExecError & e) {
+ status = e.status;
+ }
+
+ return {status, std::move(*sink.s)};
+}
+
+void runProgram2(const RunOptions & options)
{
checkInterrupt();
+ assert(!(options.stdin && options.input));
+
+ std::unique_ptr<Source> source_;
+ Source * source = options.stdin;
+
+ if (options.input) {
+ source_ = std::make_unique<StringSource>(*options.input);
+ source = source_.get();
+ }
+
/* Create a pipe. */
Pipe out, in;
- out.create();
- if (options.input) in.create();
+ if (options.stdout) out.create();
+ if (source) in.create();
/* Fork. */
Pid pid = startProcess([&]() {
- if (dup2(out.writeSide.get(), STDOUT_FILENO) == -1)
+ if (options.stdout && dup2(out.writeSide.get(), STDOUT_FILENO) == -1)
throw SysError("dupping stdout");
- if (options.input && dup2(in.readSide.get(), STDIN_FILENO) == -1)
+ if (source && dup2(in.readSide.get(), STDIN_FILENO) == -1)
throw SysError("dupping stdin");
Strings args_(options.args);
@@ -961,11 +995,20 @@ std::pair<int, std::string> runProgram(const RunOptions & options)
});
- if (options.input) {
+ if (source) {
in.readSide = -1;
writerThread = std::thread([&]() {
try {
- writeFull(in.writeSide.get(), *options.input);
+ std::vector<unsigned char> buf(8 * 1024);
+ while (true) {
+ size_t n;
+ try {
+ n = source->read(buf.data(), buf.size());
+ } catch (EndOfFile &) {
+ break;
+ }
+ writeFull(in.writeSide.get(), buf.data(), n);
+ }
promise.set_value();
} catch (...) {
promise.set_exception(std::current_exception());
@@ -974,15 +1017,17 @@ std::pair<int, std::string> runProgram(const RunOptions & options)
});
}
- string result = drainFD(out.readSide.get());
+ if (options.stdout)
+ drainFD(out.readSide.get(), *options.stdout);
/* Wait for the child to finish. */
int status = pid.wait();
/* Wait for the writer thread to finish. */
- if (options.input) promise.get_future().get();
+ if (source) promise.get_future().get();
- return {status, result};
+ if (status)
+ throw ExecError(status, fmt("program '%1%' %2%", options.program, statusToString(status)));
}
diff --git a/src/libutil/util.hh b/src/libutil/util.hh
index 500ab7811..1ea1027ac 100644
--- a/src/libutil/util.hh
+++ b/src/libutil/util.hh
@@ -25,6 +25,9 @@
namespace nix {
+struct Sink;
+struct Source;
+
/* Return an environment variable. */
string getEnv(const string & key, const string & def = "");
@@ -150,6 +153,7 @@ MakeError(EndOfFile, Error)
/* Read a file descriptor until EOF occurs. */
string drainFD(int fd);
+void drainFD(int fd, Sink & sink);
/* Automatic cleanup of resources. */
@@ -256,6 +260,8 @@ struct RunOptions
bool searchPath = true;
Strings args;
std::experimental::optional<std::string> input;
+ Source * stdin = nullptr;
+ Sink * stdout = nullptr;
bool _killStderr = false;
RunOptions(const Path & program, const Strings & args)
@@ -266,6 +272,8 @@ struct RunOptions
std::pair<int, std::string> runProgram(const RunOptions & options);
+void runProgram2(const RunOptions & options);
+
class ExecError : public Error
{