linq - Using Reactives to Merge Chunked Messages -


so i'm attempting use reactives recompose chunked messages identified id , having problem terminating final observable. have message class consists of id, total size, payload, chunk number , type , have following client-side code:

i need calculate number of messages take @ runtime

(from messages in    (from messageargs in receive select serializer.deserialize<message>(new memorystream(encoding.utf8.getbytes(messageargs.message))))  group messages messages.id grouped select grouped) .subscribe(g => {     var cache = new list<message>();     g.takewhile((int) math.ceiling(maxpayload/g.first().size) < cache.count)       .subscribe(cache.add,      _ => { /* rebuild message parts cache */ }); }); 

first create grouped observable filtering messages unique id , trying cache messages in each group until have collected them all, sort them , put them together. above seems block on g.first().

i need way calculate number take first (or any) of messages come through having difficulty doing so. help?

first blocking operator (how else can return t , not iobservable<t>?)

i think using scan (which builds aggregate on time) need. using scan, can hide "state" of message re-construction in "builder" object.

messagebuilder.iscomplete returns true when size of messages has received reaches maxpayload (or whatever requirements are). messagebuilder.build() returns reconstructed message.

i've moved "message building" code selectmany, keeps built messages within monad.

(apologies reformatting code extension methods, find difficult read/write mixed linq syntax)

receive     .select(messageargs => serializer.deserialize<message>(         new memorystream(encoding.utf8.getbytes(messageargs.message))))     .groupby(message => message.id)     .selectmany(group =>     {         // use builder "add" message parts         return group.scan(new messagebuilder(), (builder, messagepart) =>         {             builder.addpart(messagepart);              return builder;         })         .skipwhile(builder => !builder.iscomplete)         .select(builder => builder.build());     })     .subscribe(onmessagereceived); 

Comments

Popular posts from this blog

javascript - Enclosure Memory Copies -

php - Replacing tags in braces, even nested tags, with regex -