diff options
Diffstat (limited to 'src/libutil/compression.cc')
-rw-r--r-- | src/libutil/compression.cc | 378 |
1 files changed, 84 insertions, 294 deletions
diff --git a/src/libutil/compression.cc b/src/libutil/compression.cc index 986ba2976..d26f68fde 100644 --- a/src/libutil/compression.cc +++ b/src/libutil/compression.cc @@ -1,18 +1,17 @@ #include "compression.hh" +#include "tarfile.hh" #include "util.hh" #include "finally.hh" #include "logging.hh" -#include <lzma.h> -#include <bzlib.h> +#include <archive.h> +#include <archive_entry.h> #include <cstdio> #include <cstring> #include <brotli/decode.h> #include <brotli/encode.h> -#include <zlib.h> - #include <iostream> namespace nix { @@ -27,7 +26,7 @@ struct ChunkedCompressionSink : CompressionSink const size_t CHUNK_SIZE = sizeof(outbuf) << 2; while (!data.empty()) { size_t n = std::min(CHUNK_SIZE, data.size()); - writeInternal(data); + writeInternal(data.substr(0, n)); data.remove_prefix(n); } } @@ -35,177 +34,101 @@ struct ChunkedCompressionSink : CompressionSink virtual void writeInternal(std::string_view data) = 0; }; -struct NoneSink : CompressionSink -{ - Sink & nextSink; - NoneSink(Sink & nextSink) : nextSink(nextSink) { } - void finish() override { flush(); } - void write(std::string_view data) override { nextSink(data); } -}; - -struct GzipDecompressionSink : CompressionSink +struct ArchiveDecompressionSource : Source { - Sink & nextSink; - z_stream strm; - bool finished = false; - uint8_t outbuf[BUFSIZ]; - - GzipDecompressionSink(Sink & nextSink) : nextSink(nextSink) - { - strm.zalloc = Z_NULL; - strm.zfree = Z_NULL; - strm.opaque = Z_NULL; - strm.avail_in = 0; - strm.next_in = Z_NULL; - strm.next_out = outbuf; - strm.avail_out = sizeof(outbuf); - - // Enable gzip and zlib decoding (+32) with 15 windowBits - int ret = inflateInit2(&strm,15+32); - if (ret != Z_OK) - throw CompressionError("unable to initialise gzip encoder"); - } - - ~GzipDecompressionSink() - { - inflateEnd(&strm); - } - - void finish() override - { - CompressionSink::flush(); - write({}); - } - - void write(std::string_view data) override - { - assert(data.size() <= std::numeric_limits<decltype(strm.avail_in)>::max()); - - strm.next_in = (Bytef *) data.data(); - strm.avail_in = data.size(); - - while (!finished && (!data.data() || strm.avail_in)) { - checkInterrupt(); - - int ret = inflate(&strm,Z_SYNC_FLUSH); - if (ret != Z_OK && ret != Z_STREAM_END) - throw CompressionError("error while decompressing gzip file: %d (%d, %d)", - zError(ret), data.size(), strm.avail_in); - - finished = ret == Z_STREAM_END; - - if (strm.avail_out < sizeof(outbuf) || strm.avail_in == 0) { - nextSink({(char *) outbuf, sizeof(outbuf) - strm.avail_out}); - strm.next_out = (Bytef *) outbuf; - strm.avail_out = sizeof(outbuf); + std::unique_ptr<TarArchive> archive = 0; + Source & src; + ArchiveDecompressionSource(Source & src) : src(src) {} + ~ArchiveDecompressionSource() override {} + size_t read(char * data, size_t len) override { + struct archive_entry * ae; + if (!archive) { + archive = std::make_unique<TarArchive>(src, true); + this->archive->check(archive_read_next_header(this->archive->archive, &ae), + "failed to read header (%s)"); + if (archive_filter_count(this->archive->archive) < 2) { + throw CompressionError("input compression not recognized"); } } + ssize_t result = archive_read_data(this->archive->archive, data, len); + if (result > 0) return result; + if (result == 0) { + throw EndOfFile("reached end of compressed file"); + } + this->archive->check(result, "failed to read compressed data (%s)"); + return result; } }; -struct XzDecompressionSink : CompressionSink +struct ArchiveCompressionSink : CompressionSink { Sink & nextSink; - uint8_t outbuf[BUFSIZ]; - lzma_stream strm = LZMA_STREAM_INIT; - bool finished = false; - - XzDecompressionSink(Sink & nextSink) : nextSink(nextSink) - { - lzma_ret ret = lzma_stream_decoder( - &strm, UINT64_MAX, LZMA_CONCATENATED); - if (ret != LZMA_OK) - throw CompressionError("unable to initialise lzma decoder"); - - strm.next_out = outbuf; - strm.avail_out = sizeof(outbuf); + struct archive * archive; + + ArchiveCompressionSink(Sink & nextSink, std::string format, bool parallel) : nextSink(nextSink) { + archive = archive_write_new(); + if (!archive) throw Error("failed to initialize libarchive"); + check(archive_write_add_filter_by_name(archive, format.c_str()), "couldn't initialize compression (%s)"); + check(archive_write_set_format_raw(archive)); + if (format == "xz" && parallel) { + check(archive_write_set_filter_option(archive, format.c_str(), "threads", "0")); + } + // disable internal buffering + check(archive_write_set_bytes_per_block(archive, 0)); + // disable output padding + check(archive_write_set_bytes_in_last_block(archive, 1)); + open(); } - ~XzDecompressionSink() + ~ArchiveCompressionSink() override { - lzma_end(&strm); + if (archive) archive_write_free(archive); } void finish() override { - CompressionSink::flush(); - write({}); + flush(); + check(archive_write_close(archive)); } - void write(std::string_view data) override + void check(int err, const std::string & reason = "failed to compress (%s)") { - strm.next_in = (const unsigned char *) data.data(); - strm.avail_in = data.size(); - - while (!finished && (!data.data() || strm.avail_in)) { - checkInterrupt(); - - lzma_ret ret = lzma_code(&strm, data.data() ? LZMA_RUN : LZMA_FINISH); - if (ret != LZMA_OK && ret != LZMA_STREAM_END) - throw CompressionError("error %d while decompressing xz file", ret); - - finished = ret == LZMA_STREAM_END; - - if (strm.avail_out < sizeof(outbuf) || strm.avail_in == 0) { - nextSink({(char *) outbuf, sizeof(outbuf) - strm.avail_out}); - strm.next_out = outbuf; - strm.avail_out = sizeof(outbuf); - } - } + if (err == ARCHIVE_EOF) + throw EndOfFile("reached end of archive"); + else if (err != ARCHIVE_OK) + throw Error(reason, archive_error_string(this->archive)); } -}; -struct BzipDecompressionSink : ChunkedCompressionSink -{ - Sink & nextSink; - bz_stream strm; - bool finished = false; - - BzipDecompressionSink(Sink & nextSink) : nextSink(nextSink) + void write(std::string_view data) override { - memset(&strm, 0, sizeof(strm)); - int ret = BZ2_bzDecompressInit(&strm, 0, 0); - if (ret != BZ_OK) - throw CompressionError("unable to initialise bzip2 decoder"); - - strm.next_out = (char *) outbuf; - strm.avail_out = sizeof(outbuf); + ssize_t result = archive_write_data(archive, data.data(), data.length()); + if (result <= 0) check(result); } - ~BzipDecompressionSink() +private: + void open() { - BZ2_bzDecompressEnd(&strm); + check(archive_write_open(archive, this, nullptr, ArchiveCompressionSink::callback_write, nullptr)); + auto ae = archive_entry_new(); + archive_entry_set_filetype(ae, AE_IFREG); + check(archive_write_header(archive, ae)); + archive_entry_free(ae); } - void finish() override + static ssize_t callback_write(struct archive * archive, void * _self, const void * buffer, size_t length) { - flush(); - write({}); + auto self = (ArchiveCompressionSink *) _self; + self->nextSink({(const char *) buffer, length}); + return length; } +}; - void writeInternal(std::string_view data) override - { - assert(data.size() <= std::numeric_limits<decltype(strm.avail_in)>::max()); - - strm.next_in = (char *) data.data(); - strm.avail_in = data.size(); - - while (strm.avail_in) { - checkInterrupt(); - - int ret = BZ2_bzDecompress(&strm); - if (ret != BZ_OK && ret != BZ_STREAM_END) - throw CompressionError("error while decompressing bzip2 file"); - - finished = ret == BZ_STREAM_END; - - if (strm.avail_out < sizeof(outbuf) || strm.avail_in == 0) { - nextSink({(char *) outbuf, sizeof(outbuf) - strm.avail_out}); - strm.next_out = (char *) outbuf; - strm.avail_out = sizeof(outbuf); - } - } - } +struct NoneSink : CompressionSink +{ + Sink & nextSink; + NoneSink(Sink & nextSink) : nextSink(nextSink) { } + void finish() override { flush(); } + void write(std::string_view data) override { nextSink(data); } }; struct BrotliDecompressionSink : ChunkedCompressionSink @@ -268,159 +191,24 @@ ref<std::string> decompress(const std::string & method, const std::string & in) return ssink.s; } -ref<CompressionSink> makeDecompressionSink(const std::string & method, Sink & nextSink) +std::unique_ptr<FinishSink> makeDecompressionSink(const std::string & method, Sink & nextSink) { if (method == "none" || method == "") - return make_ref<NoneSink>(nextSink); - else if (method == "xz") - return make_ref<XzDecompressionSink>(nextSink); - else if (method == "bzip2") - return make_ref<BzipDecompressionSink>(nextSink); - else if (method == "gzip") - return make_ref<GzipDecompressionSink>(nextSink); + return std::make_unique<NoneSink>(nextSink); else if (method == "br") - return make_ref<BrotliDecompressionSink>(nextSink); + return std::make_unique<BrotliDecompressionSink>(nextSink); else - throw UnknownCompressionMethod("unknown compression method '%s'", method); + return sourceToSink([&](Source & source) { + auto decompressionSource = std::make_unique<ArchiveDecompressionSource>(source); + decompressionSource->drainInto(nextSink); + }); } -struct XzCompressionSink : CompressionSink -{ - Sink & nextSink; - uint8_t outbuf[BUFSIZ]; - lzma_stream strm = LZMA_STREAM_INIT; - bool finished = false; - - XzCompressionSink(Sink & nextSink, bool parallel) : nextSink(nextSink) - { - lzma_ret ret; - bool done = false; - - if (parallel) { -#ifdef HAVE_LZMA_MT - lzma_mt mt_options = {}; - mt_options.flags = 0; - mt_options.timeout = 300; // Using the same setting as the xz cmd line - mt_options.preset = LZMA_PRESET_DEFAULT; - mt_options.filters = NULL; - mt_options.check = LZMA_CHECK_CRC64; - mt_options.threads = lzma_cputhreads(); - mt_options.block_size = 0; - if (mt_options.threads == 0) - mt_options.threads = 1; - // FIXME: maybe use lzma_stream_encoder_mt_memusage() to control the - // number of threads. - ret = lzma_stream_encoder_mt(&strm, &mt_options); - done = true; -#else - printMsg(lvlError, "warning: parallel XZ compression requested but not supported, falling back to single-threaded compression"); -#endif - } - - if (!done) - ret = lzma_easy_encoder(&strm, 6, LZMA_CHECK_CRC64); - - if (ret != LZMA_OK) - throw CompressionError("unable to initialise lzma encoder"); - - // FIXME: apply the x86 BCJ filter? - - strm.next_out = outbuf; - strm.avail_out = sizeof(outbuf); - } - - ~XzCompressionSink() - { - lzma_end(&strm); - } - - void finish() override - { - CompressionSink::flush(); - write({}); - } - - void write(std::string_view data) override - { - strm.next_in = (const unsigned char *) data.data(); - strm.avail_in = data.size(); - - while (!finished && (!data.data() || strm.avail_in)) { - checkInterrupt(); - - lzma_ret ret = lzma_code(&strm, data.data() ? LZMA_RUN : LZMA_FINISH); - if (ret != LZMA_OK && ret != LZMA_STREAM_END) - throw CompressionError("error %d while compressing xz file", ret); - - finished = ret == LZMA_STREAM_END; - - if (strm.avail_out < sizeof(outbuf) || strm.avail_in == 0) { - nextSink({(const char *) outbuf, sizeof(outbuf) - strm.avail_out}); - strm.next_out = outbuf; - strm.avail_out = sizeof(outbuf); - } - } - } -}; - -struct BzipCompressionSink : ChunkedCompressionSink -{ - Sink & nextSink; - bz_stream strm; - bool finished = false; - - BzipCompressionSink(Sink & nextSink) : nextSink(nextSink) - { - memset(&strm, 0, sizeof(strm)); - int ret = BZ2_bzCompressInit(&strm, 9, 0, 30); - if (ret != BZ_OK) - throw CompressionError("unable to initialise bzip2 encoder"); - - strm.next_out = (char *) outbuf; - strm.avail_out = sizeof(outbuf); - } - - ~BzipCompressionSink() - { - BZ2_bzCompressEnd(&strm); - } - - void finish() override - { - flush(); - writeInternal({}); - } - - void writeInternal(std::string_view data) override - { - assert(data.size() <= std::numeric_limits<decltype(strm.avail_in)>::max()); - - strm.next_in = (char *) data.data(); - strm.avail_in = data.size(); - - while (!finished && (!data.data() || strm.avail_in)) { - checkInterrupt(); - - int ret = BZ2_bzCompress(&strm, data.data() ? BZ_RUN : BZ_FINISH); - if (ret != BZ_RUN_OK && ret != BZ_FINISH_OK && ret != BZ_STREAM_END) - throw CompressionError("error %d while compressing bzip2 file", ret); - - finished = ret == BZ_STREAM_END; - - if (strm.avail_out < sizeof(outbuf) || strm.avail_in == 0) { - nextSink({(const char *) outbuf, sizeof(outbuf) - strm.avail_out}); - strm.next_out = (char *) outbuf; - strm.avail_out = sizeof(outbuf); - } - } - } -}; - struct BrotliCompressionSink : ChunkedCompressionSink { Sink & nextSink; uint8_t outbuf[BUFSIZ]; - BrotliEncoderState *state; + BrotliEncoderState * state; bool finished = false; BrotliCompressionSink(Sink & nextSink) : nextSink(nextSink) @@ -471,12 +259,14 @@ struct BrotliCompressionSink : ChunkedCompressionSink ref<CompressionSink> makeCompressionSink(const std::string & method, Sink & nextSink, const bool parallel) { + std::vector<std::string> la_supports = { + "bzip2", "compress", "grzip", "gzip", "lrzip", "lz4", "lzip", "lzma", "lzop", "xz", "zstd" + }; + if (std::find(la_supports.begin(), la_supports.end(), method) != la_supports.end()) { + return make_ref<ArchiveCompressionSink>(nextSink, method, parallel); + } if (method == "none") return make_ref<NoneSink>(nextSink); - else if (method == "xz") - return make_ref<XzCompressionSink>(nextSink, parallel); - else if (method == "bzip2") - return make_ref<BzipCompressionSink>(nextSink); else if (method == "br") return make_ref<BrotliCompressionSink>(nextSink); else |