2021年4月2日星期五

How to combine two different GroupedStreams in Rx.NET?

This question is similar, but it does not apply to my case, since the user needed the merge observable streams from the same IGroupedObservable, while I want to combine streams from different groups.

I have the following structures and streams:

type A = {    Id: int    Value: int  }    type B = {    Id: int    Value: int  }    //subjects to test input, just any source of As and Bs  let subjectA: Subject<A> = Subject.broadcast  let subjectB: Subject<B> = Subject.broadcast    //grouped streams  let groupedA: IObservable<<IGroupedObservable<int, A>> = Observable.groupBy (fun a -> a.Id) subjectA  let groupedB: IObservable<<IGroupedObservable<int, B>> = Observable.groupBy (fun b -> b.Id) subjectB  

I want to somehow merge the internal observables of A and B when groupedA.Key = groupedB.Key, and get an observable of (A, B) pairs where A.Id = B.Id

The signature I want is something like IObservable<IGroupedObservable<int, A>> -> IObservable<IGroupedObservable<int, B>> -> IObservable<IGroupedObservable<int, (A, B)>> where for all (A, B), A.Id = B.Id

I tried a bunch of combineLatest, groupJoin, filters and maps variations, but with no success.

I'm using F# with Rx.Net and FSharp.Control.Reactive, but if you know the answer in C# (or any language, really) please post it

https://stackoverflow.com/questions/66911017/how-to-combine-two-different-groupedstreams-in-rx-net April 02, 2021 at 04:15AM

没有评论:

发表评论