#pragma once ///@file #include #include #include "generator.hh" #include "strings.hh" #include "types.hh" #include "file-descriptor.hh" namespace nix { /** * Abstract destination of binary data. */ struct Sink { virtual ~Sink() { } virtual void operator () (std::string_view data) = 0; virtual bool good() { return true; } }; /** * Just throws away data. */ struct NullSink : Sink { void operator () (std::string_view data) override { } }; struct FinishSink : virtual Sink { virtual void finish() = 0; }; /** * A buffered abstract sink. Warning: a BufferedSink should not be * used from multiple threads concurrently. */ struct BufferedSink : virtual Sink { size_t bufSize, bufPos; std::unique_ptr buffer; BufferedSink(size_t bufSize = 32 * 1024) : bufSize(bufSize), bufPos(0), buffer(nullptr) { } void operator () (std::string_view data) override; void flush(); protected: virtual void writeUnbuffered(std::string_view data) = 0; }; /** * Abstract source of binary data. */ struct Source { virtual ~Source() { } /** * Store exactly ‘len’ bytes in the buffer pointed to by ‘data’. * It blocks until all the requested data is available, or throws * an error if it is not going to be available. */ void operator () (char * data, size_t len); /** * Store up to ‘len’ in the buffer pointed to by ‘data’, and * return the number of bytes stored. It blocks until at least * one byte is available. */ virtual size_t read(char * data, size_t len) = 0; virtual bool good() { return true; } void drainInto(Sink & sink); std::string drain(); }; /** * A buffered abstract source. Warning: a BufferedSource should not be * used from multiple threads concurrently. */ struct BufferedSource : Source { size_t bufSize, bufPosIn, bufPosOut; std::unique_ptr buffer; BufferedSource(size_t bufSize = 32 * 1024) : bufSize(bufSize), bufPosIn(0), bufPosOut(0), buffer(nullptr) { } size_t read(char * data, size_t len) override; bool hasData(); protected: /** * Underlying read call, to be overridden. */ virtual size_t readUnbuffered(char * data, size_t len) = 0; }; /** * A sink that writes data to a file descriptor. */ struct FdSink : BufferedSink { int fd; size_t written = 0; FdSink() : fd(-1) { } FdSink(int fd) : fd(fd) { } FdSink(FdSink&&) = default; FdSink & operator=(FdSink && s) { flush(); fd = s.fd; s.fd = -1; written = s.written; return *this; } ~FdSink(); void writeUnbuffered(std::string_view data) override; bool good() override; private: bool _good = true; }; /** * A source that reads data from a file descriptor. */ struct FdSource : BufferedSource { int fd; size_t read = 0; BackedStringView endOfFileError{"unexpected end-of-file"}; FdSource() : fd(-1) { } FdSource(int fd) : fd(fd) { } FdSource(FdSource &&) = default; FdSource & operator=(FdSource && s) { fd = s.fd; s.fd = -1; read = s.read; return *this; } bool good() override; protected: size_t readUnbuffered(char * data, size_t len) override; private: bool _good = true; }; /** * A sink that writes data to a string. */ struct StringSink : Sink { std::string s; StringSink() { } explicit StringSink(const size_t reservedSize) { s.reserve(reservedSize); }; StringSink(std::string && s) : s(std::move(s)) { }; void operator () (std::string_view data) override; }; /** * A source that reads data from a string. */ struct StringSource : Source { std::string_view s; size_t pos; StringSource(std::string_view s) : s(s), pos(0) { } size_t read(char * data, size_t len) override; }; /** * A sink that writes all incoming data to two other sinks. */ struct TeeSink : Sink { Sink & sink1, & sink2; TeeSink(Sink & sink1, Sink & sink2) : sink1(sink1), sink2(sink2) { } virtual void operator () (std::string_view data) override { sink1(data); sink2(data); } }; /** * Adapter class of a Source that saves all data read to a sink. */ struct TeeSource : Source { Source & orig; Sink & sink; TeeSource(Source & orig, Sink & sink) : orig(orig), sink(sink) { } size_t read(char * data, size_t len) override { size_t n = orig.read(data, len); sink({data, n}); return n; } }; /** * A reader that consumes the original Source until 'size'. */ struct SizedSource : Source { Source & orig; size_t remain; SizedSource(Source & orig, size_t size) : orig(orig), remain(size) { } size_t read(char * data, size_t len) override { if (this->remain <= 0) { throw EndOfFile("sized: unexpected end-of-file"); } len = std::min(len, this->remain); size_t n = this->orig.read(data, len); this->remain -= n; return n; } /** * Consume the original source until no remain data is left to consume. */ size_t drainAll() { std::vector buf(8192); size_t sum = 0; while (this->remain > 0) { size_t n = read(buf.data(), buf.size()); sum += n; } return sum; } }; /** * A sink that that just counts the number of bytes given to it */ struct LengthSink : Sink { uint64_t length = 0; void operator () (std::string_view data) override { length += data.size(); } }; /** * Convert a function into a sink. */ struct LambdaSink : Sink { typedef std::function lambda_t; lambda_t lambda; LambdaSink(const lambda_t & lambda) : lambda(lambda) { } void operator () (std::string_view data) override { lambda(data); } }; /** * Convert a function into a source. */ struct LambdaSource : Source { typedef std::function lambda_t; lambda_t lambda; LambdaSource(const lambda_t & lambda) : lambda(lambda) { } size_t read(char * data, size_t len) override { return lambda(data, len); } }; /** * Chain two sources together so after the first is exhausted, the second is * used */ struct ChainSource : Source { Source & source1, & source2; bool useSecond = false; ChainSource(Source & s1, Source & s2) : source1(s1), source2(s2) { } size_t read(char * data, size_t len) override; }; struct GeneratorSource : Source { GeneratorSource(Generator && g) : g(std::move(g)) {} virtual size_t read(char * data, size_t len) override { // we explicitly do not poll the generator multiple times to fill the // buffer, only to produce some output at all. this is allowed by the // semantics of read(), only operator() must fill the buffer entirely while (!buf.size()) { if (auto next = g.next()) { buf = *next; } else { throw EndOfFile("coroutine has finished"); } } len = std::min(len, buf.size()); memcpy(data, buf.data(), len); buf = buf.subspan(len); return len; } private: Generator g; Bytes buf{}; }; inline Sink & operator<<(Sink & sink, Generator && g) { while (auto buffer = g.next()) { sink(std::string_view(buffer->data(), buffer->size())); } return sink; } struct SerializingTransform; using WireFormatGenerator = Generator; struct SerializingTransform { std::array buf; Bytes operator()(uint64_t n) { buf[0] = n & 0xff; buf[1] = (n >> 8) & 0xff; buf[2] = (n >> 16) & 0xff; buf[3] = (n >> 24) & 0xff; buf[4] = (n >> 32) & 0xff; buf[5] = (n >> 40) & 0xff; buf[6] = (n >> 48) & 0xff; buf[7] = (unsigned char) (n >> 56) & 0xff; return {reinterpret_cast(buf.begin()), 8}; } static Bytes padding(size_t unpadded) { return Bytes("\0\0\0\0\0\0\0", unpadded % 8 ? 8 - unpadded % 8 : 0); } // opt in to generator chaining. without this co_yielding // another generator of any type will cause a type error. auto operator()(Generator && g) { return std::move(g); } // only choose this for *exactly* char spans, do not allow implicit // conversions. this would cause ambiguities with strings literals, // and resolving those with more string-like overloads needs a lot. template requires std::same_as> || std::same_as> Bytes operator()(Span s) { return s; } WireFormatGenerator operator()(std::string_view s); WireFormatGenerator operator()(const Strings & s); WireFormatGenerator operator()(const StringSet & s); WireFormatGenerator operator()(const Error & s); }; void writePadding(size_t len, Sink & sink); inline Sink & operator<<(Sink & sink, uint64_t u) { return sink << [&]() -> WireFormatGenerator { co_yield u; }(); } inline Sink & operator<<(Sink & sink, std::string_view s) { return sink << [&]() -> WireFormatGenerator { co_yield s; }(); } inline Sink & operator<<(Sink & sink, const Strings & s) { return sink << [&]() -> WireFormatGenerator { co_yield s; }(); } inline Sink & operator<<(Sink & sink, const StringSet & s) { return sink << [&]() -> WireFormatGenerator { co_yield s; }(); } inline Sink & operator<<(Sink & sink, const Error & ex) { return sink << [&]() -> WireFormatGenerator { co_yield ex; }(); } MakeError(SerialisationError, Error); template T readNum(Source & source) { unsigned char buf[8]; source((char *) buf, sizeof(buf)); auto n = readLittleEndian(buf); if (n > (uint64_t) std::numeric_limits::max()) throw SerialisationError("serialised integer %d is too large for type '%s'", n, typeid(T).name()); return (T) n; } inline unsigned int readInt(Source & source) { return readNum(source); } inline uint64_t readLongLong(Source & source) { return readNum(source); } void readPadding(size_t len, Source & source); size_t readString(char * buf, size_t max, Source & source); std::string readString(Source & source, size_t max = std::numeric_limits::max()); template T readStrings(Source & source); Source & operator >> (Source & in, std::string & s); template Source & operator >> (Source & in, T & n) { n = readNum(in); return in; } template Source & operator >> (Source & in, bool & b) { b = readNum(in); return in; } Error readError(Source & source); /** * An adapter that converts a std::basic_istream into a source. */ struct StreamToSourceAdapter : Source { std::shared_ptr> istream; StreamToSourceAdapter(std::shared_ptr> istream) : istream(istream) { } size_t read(char * data, size_t len) override { if (!istream->read(data, len)) { if (istream->eof()) { if (istream->gcount() == 0) throw EndOfFile("end of file"); } else throw Error("I/O error in StreamToSourceAdapter"); } return istream->gcount(); } }; /** * A source that reads a distinct format of concatenated chunks back into its * logical form, in order to guarantee a known state to the original stream, * even in the event of errors. * * Use with FramedSink, which also allows the logical stream to be terminated * in the event of an exception. */ struct FramedSource : Source { Source & from; bool eof = false; std::vector pending; size_t pos = 0; FramedSource(Source & from) : from(from) { } ~FramedSource() { if (!eof) { while (true) { auto n = readInt(from); if (!n) break; std::vector data(n); from(data.data(), n); } } } size_t read(char * data, size_t len) override { if (eof) throw EndOfFile("reached end of FramedSource"); if (pos >= pending.size()) { size_t len = readInt(from); if (!len) { eof = true; return 0; } pending = std::vector(len); pos = 0; from(pending.data(), len); } auto n = std::min(len, pending.size() - pos); memcpy(data, pending.data() + pos, n); pos += n; return n; } }; /** * Write as chunks in the format expected by FramedSource. * * The exception_ptr reference can be used to terminate the stream when you * detect that an error has occurred on the remote end. */ struct FramedSink : nix::BufferedSink { BufferedSink & to; std::exception_ptr & ex; FramedSink(BufferedSink & to, std::exception_ptr & ex) : to(to), ex(ex) { } ~FramedSink() { try { to << 0; to.flush(); } catch (...) { ignoreException(); } } void writeUnbuffered(std::string_view data) override { /* Don't send more data if the remote has encountered an error. */ if (ex) { auto ex2 = ex; ex = nullptr; std::rethrow_exception(ex2); } to << data.size(); to(data); }; }; /* Disabling GC when entering a coroutine (without the boehm patch). mutable to avoid boehm gc dependency in libutil. */ extern std::shared_ptr (*create_coro_gc_hook)(); }