You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Wondering if anyone here can help me with an interesting issue with demultiplexing a "change" observable for an array value. I have changes (Add(initial value) | Set(index, value)...) and not only that I want to maintain a persistent observable for each index in the array.
So i have made a scan that creates a new observable for each Add that itself is derived from changes, and appends it to an array of existing observables, then merges all those, emitting an observable that should join values coming from all the channels. The scan is recieving the changes but it looks like the inner "index" observers never do. So err..how do you deal with dynamic arrays if you hope to have "applicative" observables?
TiA
Alex
describe("array spread scan", () => {
itPromise("chooch", () => {
let changes: Rxjs.t<'cc, Rxjs.source<[#Add(string) | #Index(int, string)]>, [#Add(string) | #Index(int, string)]> = Rxjs.Subject.makeEmpty()
let state: Rxjs.t<'cs, 'source, array<string>> = Rxjs.Subject.makeBehavior([])
// A temporary observable that excludes #Add events, so we can filter for specific index later
let changeelements = changes->Rxjs.pipe(takeOnlyIndexChanges)
// close over the changeelements observable and state, so for each index we filter by index, sending those changes through the pipeline for the index, and then merge withlatestFrom(state)
let make = (index) => FieldArray.applyIndexChanges(state, changeelements, index)
changes
->Rxjs.pipe(Rxjs.scan((acc, v: 'change, _index) => {
switch v {
// When an #Add event comes in, make a new observable for that index, stored in the acc array
| #Add(value) => acc->Array.length->make->Array.append(acc, _)
| #Index(_i, _) => acc
}
}, []))
// Take the latest array of observables and merge the outputs of each into a single stream of (index, value)
->Rxjs.pipe(Rxjs.switchMap(Rxjs.mergeArray))
->Rxjs.pipe(Rxjs.withLatestFrom(state))
->Rxjs.pipe(Rxjs.map(applyElementChangeAtIndex)
// Return the state to the state behaviorSubject for progression
->Rxjs.subscribe_(state->Rxjs.Subject.toSubscriber)
let hist = state->Dynamic.toHistory
changes->Rxjs.Subscriber.next(#Add("nice"))
changes->Rxjs.Subscriber.next(#Index(0, "a"))
changes->Rxjs.Subscriber.next(#Add("smooth"))
changes->Rxjs.Subscriber.next(#Index(1, "b"))
changes->Rxjs.Subscriber.complete
hist
->Promise.tap(res => expect(res)->toEqual([[]]))
})
})
})
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
Hi All.
Wondering if anyone here can help me with an interesting issue with demultiplexing a "change" observable for an array value. I have changes (Add(initial value) | Set(index, value)...) and not only that I want to maintain a persistent observable for each index in the array.
So i have made a scan that creates a new observable for each Add that itself is derived from changes, and appends it to an array of existing observables, then merges all those, emitting an observable that should join values coming from all the channels. The scan is recieving the changes but it looks like the inner "index" observers never do. So err..how do you deal with dynamic arrays if you hope to have "applicative" observables?
TiA
Alex
Beta Was this translation helpful? Give feedback.
All reactions