diff options
Diffstat (limited to 'src/libutil/serialise.hh')
-rw-r--r-- | src/libutil/serialise.hh | 93 |
1 files changed, 92 insertions, 1 deletions
diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh index 6f4f4c855..b41e58f33 100644 --- a/src/libutil/serialise.hh +++ b/src/libutil/serialise.hh @@ -69,6 +69,8 @@ struct Source virtual bool good() { return true; } + void drainInto(Sink & sink); + std::string drain(); }; @@ -340,7 +342,7 @@ T readNum(Source & source) ((uint64_t) buf[6] << 48) | ((uint64_t) buf[7] << 56); - if (n > std::numeric_limits<T>::max()) + if (n > (uint64_t)std::numeric_limits<T>::max()) throw SerialisationError("serialised integer %d is too large for type '%s'", n, typeid(T).name()); return (T) n; @@ -404,4 +406,93 @@ struct StreamToSourceAdapter : Source }; +/* A source that reads a distinct format of concatenated chunks back into its + logical form, in order to guarantee a known state to the original stream, + even in the event of errors. + + Use with FramedSink, which also allows the logical stream to be terminated + in the event of an exception. +*/ +struct FramedSource : Source +{ + Source & from; + bool eof = false; + std::vector<unsigned char> pending; + size_t pos = 0; + + FramedSource(Source & from) : from(from) + { } + + ~FramedSource() + { + if (!eof) { + while (true) { + auto n = readInt(from); + if (!n) break; + std::vector<unsigned char> data(n); + from(data.data(), n); + } + } + } + + size_t read(unsigned char * data, size_t len) override + { + if (eof) throw EndOfFile("reached end of FramedSource"); + + if (pos >= pending.size()) { + size_t len = readInt(from); + if (!len) { + eof = true; + return 0; + } + pending = std::vector<unsigned char>(len); + pos = 0; + from(pending.data(), len); + } + + auto n = std::min(len, pending.size() - pos); + memcpy(data, pending.data() + pos, n); + pos += n; + return n; + } +}; + +/* Write as chunks in the format expected by FramedSource. + + The exception_ptr reference can be used to terminate the stream when you + detect that an error has occurred on the remote end. +*/ +struct FramedSink : nix::BufferedSink +{ + BufferedSink & to; + std::exception_ptr & ex; + + FramedSink(BufferedSink & to, std::exception_ptr & ex) : to(to), ex(ex) + { } + + ~FramedSink() + { + try { + to << 0; + to.flush(); + } catch (...) { + ignoreException(); + } + } + + void write(const unsigned char * data, size_t len) override + { + /* Don't send more data if the remote has + encountered an error. */ + if (ex) { + auto ex2 = ex; + ex = nullptr; + std::rethrow_exception(ex2); + } + to << len; + to(data, len); + }; +}; + + } |