linq - Using Reactives to Merge Chunked Messages -
so i'm attempting use reactives recompose chunked messages identified id , having problem terminating final observable. have message class consists of id, total size, payload, chunk number , type , have following client-side code:
i need calculate number of messages take @ runtime
(from messages in (from messageargs in receive select serializer.deserialize<message>(new memorystream(encoding.utf8.getbytes(messageargs.message)))) group messages messages.id grouped select grouped) .subscribe(g => { var cache = new list<message>(); g.takewhile((int) math.ceiling(maxpayload/g.first().size) < cache.count) .subscribe(cache.add, _ => { /* rebuild message parts cache */ }); });
first create grouped observable filtering messages unique id , trying cache messages in each group until have collected them all, sort them , put them together. above seems block on g.first().
i need way calculate number take first (or any) of messages come through having difficulty doing so. help?
first
blocking operator (how else can return t
, not iobservable<t>
?)
i think using scan
(which builds aggregate on time) need. using scan
, can hide "state" of message re-construction in "builder" object.
messagebuilder.iscomplete
returns true when size of messages has received reaches maxpayload
(or whatever requirements are). messagebuilder.build()
returns reconstructed message.
i've moved "message building" code selectmany
, keeps built messages within monad.
(apologies reformatting code extension methods, find difficult read/write mixed linq syntax)
receive .select(messageargs => serializer.deserialize<message>( new memorystream(encoding.utf8.getbytes(messageargs.message)))) .groupby(message => message.id) .selectmany(group => { // use builder "add" message parts return group.scan(new messagebuilder(), (builder, messagepart) => { builder.addpart(messagepart); return builder; }) .skipwhile(builder => !builder.iscomplete) .select(builder => builder.build()); }) .subscribe(onmessagereceived);
Comments
Post a Comment