aboutsummaryrefslogtreecommitdiff
path: root/src/libutil/serialise.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/libutil/serialise.cc')
-rw-r--r--src/libutil/serialise.cc57
1 files changed, 56 insertions, 1 deletions
diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc
index d1a16b6ba..16f3476c2 100644
--- a/src/libutil/serialise.cc
+++ b/src/libutil/serialise.cc
@@ -201,6 +201,62 @@ static DefaultStackAllocator defaultAllocatorSingleton;
StackAllocator *StackAllocator::defaultAllocator = &defaultAllocatorSingleton;
+std::unique_ptr<FinishSink> sourceToSink(std::function<void(Source &)> fun)
+{
+ struct SourceToSink : FinishSink
+ {
+ typedef boost::coroutines2::coroutine<bool> coro_t;
+
+ std::function<void(Source &)> fun;
+ std::optional<coro_t::push_type> coro;
+
+ SourceToSink(std::function<void(Source &)> fun) : fun(fun)
+ {
+ }
+
+ std::string_view cur;
+
+ void operator () (std::string_view in) override
+ {
+ if (in.empty()) return;
+ cur = in;
+
+ if (!coro)
+ coro = coro_t::push_type(VirtualStackAllocator{}, [&](coro_t::pull_type & yield) {
+ LambdaSource source([&](char *out, size_t out_len) {
+ if (cur.empty()) {
+ yield();
+ if (yield.get()) {
+ return (size_t)0;
+ }
+ }
+
+ size_t n = std::min(cur.size(), out_len);
+ memcpy(out, cur.data(), n);
+ cur.remove_prefix(n);
+ return n;
+ });
+ fun(source);
+ });
+
+ if (!*coro) { abort(); }
+
+ if (!cur.empty()) (*coro)(false);
+ }
+
+ void finish() override
+ {
+ if (!coro) return;
+ if (!*coro) abort();
+ (*coro)(true);
+ if (*coro) abort();
+ }
+ };
+
+ return std::make_unique<SourceToSink>(fun);
+}
+
+
std::unique_ptr<Source> sinkToSource(
std::function<void(Sink &)> fun,
std::function<void()> eof)
@@ -212,7 +268,6 @@ std::unique_ptr<Source> sinkToSource(
std::function<void(Sink &)> fun;
std::function<void()> eof;
std::optional<coro_t::pull_type> coro;
- bool started = false;
SinkToSource(std::function<void(Sink &)> fun, std::function<void()> eof)
: fun(fun), eof(eof)