diff --git a/bundles/prototype/StreamChain/StreamChain.enc b/bundles/prototype/StreamChain/StreamChain.enc new file mode 100644 index 000000000..e0dba37d8 --- /dev/null +++ b/bundles/prototype/StreamChain/StreamChain.enc @@ -0,0 +1,29 @@ +bundle StreamChain where + +typedef A = int +typedef B = int +typedef Scons = embed struct scons* end + +def chain(sa:Stream A, f:A->B) : Stream B { + let + futa = embed Fut Scons (future_t*)#{sa}; end + fscons = \(scons: Scons) -> + if (embed bool scons_eos(encore_ctx(),(struct scons*)#{scons}); end) then + embed Scons scons_end(encore_ctx()); end + else { + let va = embed A scons_element(encore_ctx(),(struct scons*)#{scons}).i; end; + let nexta = embed Stream A scons_next(encore_ctx(),(struct scons*)#{scons}); end; + let vb = f(va); + let nextb = chain(nexta,f); + embed Scons + scons_put_fut(encore_ctx(),#{nextb}, + (encore_arg_t) {.i =#{vb}},ENCORE_PRIMITIVE); + end; + } + -- Chaining by embeded function instead of ~~> + futb = embed Stream B + future_t* futmk = future_mk(encore_ctx(), get_scons_type()); + (stream_t*)future_chain_actor(_ctx, #{futa}, futmk, #{fscons}); + end + in (embed Stream B (stream_t*)#{futb}; end) +} diff --git a/bundles/prototype/StreamChain/StreamChainTest.enc b/bundles/prototype/StreamChain/StreamChainTest.enc new file mode 100644 index 000000000..cb3499159 --- /dev/null +++ b/bundles/prototype/StreamChain/StreamChainTest.enc @@ -0,0 +1,32 @@ +import StreamChain + +class Main + def main() : void + let + sio = new StreamIO + f = \(x:int) -> x*10 + -- Test with small input + n = 5 + -- Test with medium input + -- n = 5000 + -- Test with 1M input + -- n = 1000000 + sa1 = sio.produceI(n) + sb1 = chain(sa1,f) + in { + -- Print out the result sb1 + print "Output stream: "; + while (not eos sa1) { + print(get sa1); + sa1 = getNext sa1; + }; + } + +class StreamIO { + -- create stream int + stream produceI(i:A) : A + while i>0 { + yield i; + i = i - 1; + } +} diff --git a/bundles/prototype/StreamChain/StreamChainTest.out b/bundles/prototype/StreamChain/StreamChainTest.out new file mode 100644 index 000000000..c63080ea0 --- /dev/null +++ b/bundles/prototype/StreamChain/StreamChainTest.out @@ -0,0 +1,6 @@ +Output stream: +50 +40 +30 +20 +10 diff --git a/src/runtime/stream/stream.c b/src/runtime/stream/stream.c index 957930f5b..fb5d1b906 100644 --- a/src/runtime/stream/stream.c +++ b/src/runtime/stream/stream.c @@ -31,6 +31,11 @@ static pony_type_t scons_type = { .trace = scons_trace, }; +pony_type_t *get_scons_type() +{ + return &scons_type; +} + void stream_trace(pony_ctx_t *ctx, void *p) { future_trace(ctx, p); @@ -97,3 +102,41 @@ bool stream_eos(pony_ctx_t *ctx, stream_t *s) struct scons *scons = future_get_actor(ctx, (future_t *)s).p; return scons->eos; } + +stream_t *stream_put_fut(pony_ctx_t *ctx, future_t* fut, stream_t *s, + encore_arg_t value, pony_type_t *type){ + ctx = pony_ctx(); + struct scons *scons = scons_mk(ctx,type); + scons->element = value; + scons->next = fut; + future_fulfil(ctx,(future_t *)s, (encore_arg_t){ .p = scons }); + return fut; +} + +bool scons_eos(pony_ctx_t *ctx, scons_t *scons){ + return scons->eos; +} + +encore_arg_t scons_element(pony_ctx_t *ctx, scons_t *scons){ + return scons->element; +} + +stream_t *scons_next(pony_ctx_t *ctx, scons_t *scons){ + return scons->next; +} + +scons_t *scons_end(pony_ctx_t *ctx){ + ctx = pony_ctx(); + struct scons *scons = scons_mk(ctx, NULL); + scons->eos = true; + return scons; +} + +scons_t *scons_put_fut(pony_ctx_t *ctx, stream_t *s, + encore_arg_t value, pony_type_t *type){ + ctx = pony_ctx(); + struct scons *scons = scons_mk(ctx,type); + scons->element = value; + scons->next = (future_t*)s; + return scons; +} diff --git a/src/runtime/stream/stream.h b/src/runtime/stream/stream.h index 54bfe8d68..e1c86c3fc 100644 --- a/src/runtime/stream/stream.h +++ b/src/runtime/stream/stream.h @@ -7,6 +7,36 @@ typedef void stream_t; #include "encore.h" +typedef struct scons scons_t; + +bool scons_eos(pony_ctx_t *ctx, scons_t *scons); + +encore_arg_t scons_element(pony_ctx_t *ctx, scons_t *scons); + +stream_t *scons_next(pony_ctx_t *ctx, scons_t *scons); + +scons_t *scons_end(pony_ctx_t *ctx); + +scons_t *scons_put_fut(pony_ctx_t *ctx, stream_t *s, + encore_arg_t value, pony_type_t *type); + +pony_type_t *get_scons_type(); + +void scons_trace(pony_ctx_t *ctx, void *p); + +/** + * Put a value in an EXISTING stream + * + * @param s A stream + * @param fut An EXISTING stream + * @param value The value to be put in the stream + * @param type The runtime type of \p value + * @return The (empty) head of \p s + */ +stream_t *stream_put_fut(pony_ctx_t *ctx, future_t* fut, stream_t *s, encore_arg_t value, pony_type_t *type); + +struct scons *scons_mk(pony_ctx_t *ctx, pony_type_t *type); + /** * Create a new stream *