diff --git a/src/Propulsion.CosmosStore/CosmosStoreSink.fs b/src/Propulsion.CosmosStore/CosmosStoreSink.fs index 5c32ad05..011dbf4f 100644 --- a/src/Propulsion.CosmosStore/CosmosStoreSink.fs +++ b/src/Propulsion.CosmosStore/CosmosStoreSink.fs @@ -65,7 +65,7 @@ module Internal = let write (log: ILogger) (ctx: EventsContext) stream (span: Event[]) ct = task { let i = StreamSpan.index span - let n = StreamSpan.nextIndex span + let n = StreamSpan.next span span |> Seq.iter (fun x -> if x.IsUnfold then invalidOp "CosmosStore3 does not [yet] support ingesting unfolds") log.Debug("Writing {s}@{i}x{n}", stream, i, span.Length) let mapData = FsCodec.Core.EventData.Map StreamSpan.toNativeEventBody diff --git a/src/Propulsion.Kafka/ProducerSinks.fs b/src/Propulsion.Kafka/ProducerSinks.fs index 24b4a264..b3e6affa 100644 --- a/src/Propulsion.Kafka/ProducerSinks.fs +++ b/src/Propulsion.Kafka/ProducerSinks.fs @@ -45,7 +45,7 @@ type StreamsProducerSink = | _ -> () do! producer.Produce(key, message, ct = ct) | ValueNone -> () - return struct (outcome, Events.nextIndex span) + return struct (outcome, Events.next span) } Sync.Factory.StartAsync ( log, maxReadAhead, maxConcurrentStreams, handle, diff --git a/src/Propulsion/Sinks.fs b/src/Propulsion/Sinks.fs index 35c4c7dc..a4160cf2 100644 --- a/src/Propulsion/Sinks.fs +++ b/src/Propulsion/Sinks.fs @@ -17,7 +17,7 @@ type Codec<'E> = FsCodec.IEventCodec<'E, EventBody, unit> module Events = /// The Index of the next event ordinarily expected on the next handler invocation (assuming this invocation handles all successfully) - let nextIndex: Event[] -> int64 = Streams.StreamSpan.nextIndex + let next: Event[] -> int64 = Streams.StreamSpan.next /// The Index of the first event as supplied to this handler let index: Event[] -> int64 = Streams.StreamSpan.index diff --git a/src/Propulsion/Streams.fs b/src/Propulsion/Streams.fs index 7eac9a33..76884382 100755 --- a/src/Propulsion/Streams.fs +++ b/src/Propulsion/Streams.fs @@ -103,22 +103,21 @@ module StreamSpan = trimmed, metrics eventSize trimmed let inline index (span: FsCodec.ITimelineEvent<'F>[]) = span[0].Index - let inline nextIndex (span: FsCodec.ITimelineEvent<'F>[]) = + let inline next (span: FsCodec.ITimelineEvent<'F>[]) = let l = span[span.Length - 1] if l.IsUnfold then l.Index else l.Index + 1L - let inline dropBeforeIndex min = function + let inline dropBefore min = function | [||] as xs -> xs - | xs when nextIndex xs < min -> Array.empty + | xs when next xs < min -> Array.empty | xs -> match index xs with | xi when xi = min -> xs | xi -> xs |> Array.skip (min - xi |> int) - let merge min (spans: FsCodec.ITimelineEvent<_>[][]) = let candidates = [| for span in spans do if span <> null then - match dropBeforeIndex min span with + match dropBefore min span with | [||] -> () | xs -> xs |] if candidates.Length = 0 then null @@ -132,7 +131,7 @@ module StreamSpan = for i in 1 .. candidates.Length - 1 do let x = candidates[i] let xIndex = index x - let accNext = nextIndex acc + let accNext = next acc if xIndex > accNext then // Gap match acc |> Array.filter (_.IsUnfold >> not) with | [||] -> () @@ -141,8 +140,8 @@ module StreamSpan = buffer.Add eventsOnly acc <- x // Overlapping, join - elif nextIndex x > accNext then - match dropBeforeIndex accNext x with + elif next x > accNext then + match dropBefore accNext x with | [||] -> () | news -> acc <- [| for x in acc do if not x.IsUnfold then x diff --git a/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs b/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs index b9131d43..c98b2375 100644 --- a/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs +++ b/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs @@ -127,7 +127,7 @@ module Helpers = do! handler (getConsumer()) (deserialize consumerId event) (log: ILogger).Information("BATCHED CONSUMER Handled {c} events in {l} streams", c, streams.Length) let ts = Stopwatch.elapsed ts - return seq { for x in streams -> struct (Ok (Propulsion.Sinks.Events.nextIndex x.span), ts) } } + return seq { for x in streams -> struct (Ok (Propulsion.Sinks.Events.next x.span), ts) } } let stats = Stats(log, TimeSpan.FromSeconds 5.,TimeSpan.FromSeconds 5.) let messageIndexes = StreamNameSequenceGenerator() let consumer = @@ -165,7 +165,7 @@ module Helpers = let handle _ (span: Propulsion.Sinks.Event[]) = async { for event in span do do! handler (getConsumer()) (deserialize consumerId event) - return (), Propulsion.Sinks.Events.nextIndex span } + return (), Propulsion.Sinks.Events.next span } let stats = Stats(log, TimeSpan.FromSeconds 5.,TimeSpan.FromSeconds 5.) let messageIndexes = StreamNameSequenceGenerator() let consumer = diff --git a/tests/Propulsion.MessageDb.Integration/Tests.fs b/tests/Propulsion.MessageDb.Integration/Tests.fs index 9a062c1b..2e65573c 100644 --- a/tests/Propulsion.MessageDb.Integration/Tests.fs +++ b/tests/Propulsion.MessageDb.Integration/Tests.fs @@ -84,7 +84,7 @@ let ``It processes events for a category`` () = task { test <@ Array.chooseV Simple.codec.Decode events |> Array.forall ((=) (Simple.Hello { name = "world" })) @> if handled.Count >= 2000 then stop () - return struct ((), Propulsion.Sinks.Events.nextIndex events) } + return struct ((), Propulsion.Sinks.Events.next events) } use sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 2, 2, handle, stats) let source = MessageDbSource( log, TimeSpan.FromMinutes 1, @@ -132,7 +132,7 @@ let ``It doesn't read the tail event again`` () = task { let stats = stats log let handle _ events _ = task { - return struct ((), Propulsion.Sinks.Events.nextIndex events) } + return struct ((), Propulsion.Sinks.Events.next events) } use sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 1, 1, handle, stats) let batchSize = 10 let source = MessageDbSource( diff --git a/tests/Propulsion.Tests/SinkHealthTests.fs b/tests/Propulsion.Tests/SinkHealthTests.fs index a8060af0..be73ad20 100644 --- a/tests/Propulsion.Tests/SinkHealthTests.fs +++ b/tests/Propulsion.Tests/SinkHealthTests.fs @@ -30,7 +30,7 @@ type Scenario(testOutput) = return failwith "transient" else do! Async.Sleep (TimeSpan.FromSeconds 1) - return (), Propulsion.Sinks.Events.nextIndex events } + return (), Propulsion.Sinks.Events.next events } let sink = Propulsion.Sinks.Factory.StartConcurrent(log, 2, 2, handle, stats) let dispose () = sink.Stop() diff --git a/tests/Propulsion.Tests/SourceTests.fs b/tests/Propulsion.Tests/SourceTests.fs index 7d5f7eac..5df1923f 100644 --- a/tests/Propulsion.Tests/SourceTests.fs +++ b/tests/Propulsion.Tests/SourceTests.fs @@ -14,7 +14,7 @@ type Scenario(testOutput) = let stats = { new Propulsion.Streams.Stats<_>(log, TimeSpan.FromMinutes 1, TimeSpan.FromMinutes 1) with member _.HandleOk x = () member _.HandleExn(log, x) = () } - let handle _ events _ = task { return struct ((), Propulsion.Sinks.Events.nextIndex events) } + let handle _ events _ = task { return struct ((), Propulsion.Sinks.Events.next events) } let sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 2, 2, handle, stats) let dispose () = sink.Stop() diff --git a/tests/Propulsion.Tests/StreamStateTests.fs b/tests/Propulsion.Tests/StreamStateTests.fs index c961443d..3a26125d 100644 --- a/tests/Propulsion.Tests/StreamStateTests.fs +++ b/tests/Propulsion.Tests/StreamStateTests.fs @@ -5,123 +5,84 @@ open Propulsion.Streams open Swensen.Unquote open Xunit -module FsCodec301 = // Not yet merged, https://github.com/jet/FsCodec/pull/123 - open FsCodec - open System - /// An Event or Unfold that's been read from a Store and hence has a defined Index on the Event Timeline. - [] - type TimelineEvent2<'Format>(index, eventType, data, meta, eventId, correlationId, causationId, timestamp, isUnfold, context, size) = - - static member Create(index, eventType, data, ?meta, ?eventId, ?correlationId, ?causationId, ?timestamp, ?isUnfold, ?context, ?size): ITimelineEvent<'Format> = - let isUnfold = defaultArg isUnfold false - let meta = match meta with Some x -> x | None -> Unchecked.defaultof<_> - let eventId = match eventId with Some x -> x | None -> Guid.Empty - let ts = match timestamp with Some ts -> ts | None -> DateTimeOffset.UtcNow - let size = defaultArg size 0 - TimelineEvent2(index, eventType, data, meta, eventId, Option.toObj correlationId, Option.toObj causationId, ts, isUnfold, Option.toObj context, size) :> _ - - static member Create(index, inner: IEventData<'Format>, ?isUnfold, ?context, ?size): ITimelineEvent<'Format> = - let isUnfold = defaultArg isUnfold false - let size = defaultArg size 0 - TimelineEvent2(index, inner.EventType, inner.Data, inner.Meta, inner.EventId, inner.CorrelationId, inner.CausationId, inner.Timestamp, isUnfold, Option.toObj context, size) :> _ - - override _.ToString() = - let t = if isUnfold then "Unfold" else "Event" - $"{t} {eventType} @{index} {context}" - interface ITimelineEvent<'Format> with - member _.Index = index - member _.IsUnfold = isUnfold - member _.Context = context - member _.Size = size - member _.EventType = eventType - member _.Data = data - member _.Meta = meta - member _.EventId = eventId - member _.CorrelationId = correlationId - member _.CausationId = causationId - member _.Timestamp = timestamp -open FsCodec301 - let canonicalTime = System.DateTimeOffset.UtcNow let mk_ p c seg uc: FsCodec.ITimelineEvent[] = - let mk id et isUnfold = TimelineEvent2.Create(id, et, null, timestamp = canonicalTime, isUnfold = isUnfold, context = seg) + let mk id et isUnfold = FsCodec.Core.TimelineEvent.Create(id, et, null, timestamp = canonicalTime, isUnfold = isUnfold, context = seg) [| for x in 0..c-1 -> mk (p + int64 x) (p + int64 x |> string) false for u in 0..uc-1 -> mk (p + int64 c) $"{p+int64 c}u{u}" true |] let mk p c = mk_ p c 0 0 -let merge = StreamSpan.merge let isSame = LanguagePrimitives.PhysicalEquality -let dropBeforeIndex = StreamSpan.dropBeforeIndex let is (xs: FsCodec.ITimelineEvent[][]) (res: FsCodec.ITimelineEvent[][]) = (xs, res) ||> Seq.forall2 (fun x y -> (Array.isEmpty x && Array.isEmpty y) || x[0].Index = y[0].Index && (x, y) ||> Seq.forall2 (fun x y -> x.EventType = y.EventType)) let [] nothing () = - let r = merge 0L [| mk 0L 0; mk 0L 0 |] + let r = StreamSpan.merge 0L [| mk 0L 0; mk 0L 0 |] test <@ isSame null r @> let [] synced () = - let r = merge 1L [| mk 0L 1; mk 0L 0 |] + let r = StreamSpan.merge 1L [| mk 0L 1; mk 0L 0 |] test <@ isSame null r @> let [] ``no overlap`` () = - let r = merge 0L [| mk 0L 1; mk 2L 2 |] + let r = StreamSpan.merge 0L [| mk 0L 1; mk 2L 2 |] test <@ r |> is [| mk 0L 1; mk 2L 2 |] @> let [] overlap () = - let r = merge 0L [| mk 0L 1; mk 0L 2 |] + let r = StreamSpan.merge 0L [| mk 0L 1; mk 0L 2 |] test <@ r |> is [| mk 0L 2 |] @> let [] ``remove nulls`` () = - let r = merge 1L [| mk 0L 1; mk 0L 2 |] + let r = StreamSpan.merge 1L [| mk 0L 1; mk 0L 2 |] test <@ r |> is [| mk 1L 1 |] @> let [] adjacent () = - let r = merge 0L [| mk 0L 1; mk 1L 2 |] + let r = StreamSpan.merge 0L [| mk 0L 1; mk 1L 2 |] test <@ r |> is [| mk 0L 3 |] @> let [] ``adjacent to min`` () = - let r = Array.map (dropBeforeIndex 2L) [| mk 0L 1; mk 1L 2 |] + let r = Array.map (StreamSpan.dropBefore 2L) [| mk 0L 1; mk 1L 2 |] test <@ r |> is [| [||]; mk 2L 1 |] @> let [] ``adjacent to min merge`` () = - let r = merge 2L [| mk 0L 1; mk 1L 2 |] + let r = StreamSpan.merge 2L [| mk 0L 1; mk 1L 2 |] test <@ r |> is [| mk 2L 1 |] @> let [] ``adjacent to min no overlap`` () = - let r = merge 2L [| mk 0L 1; mk 2L 1 |] + let r = StreamSpan.merge 2L [| mk 0L 1; mk 2L 1 |] test <@ r |> is [| mk 2L 1|] @> let [] ``adjacent trim`` () = - let r = Array.map (dropBeforeIndex 1L) [| mk 0L 2; mk 2L 2 |] + let r = Array.map (StreamSpan.dropBefore 1L) [| mk 0L 2; mk 2L 2 |] test <@ r |> is [| mk 1L 1; mk 2L 2 |] @> let [] ``adjacent trim merge`` () = - let r = merge 1L [| mk 0L 2; mk 2L 2 |] + let r = StreamSpan.merge 1L [| mk 0L 2; mk 2L 2 |] test <@ r |> is [| mk 1L 3 |] @> let [] ``adjacent trim append`` () = - let r = Array.map (dropBeforeIndex 1L) [| mk 0L 2; mk 2L 2; mk 5L 1 |] + let r = Array.map (StreamSpan.dropBefore 1L) [| mk 0L 2; mk 2L 2; mk 5L 1 |] test <@ r |> is [| mk 1L 1; mk 2L 2; mk 5L 1 |] @> let [] ``adjacent trim append merge`` () = - let r = merge 1L [| mk 0L 2; mk 2L 2; mk 5L 1|] + let r = StreamSpan.merge 1L [| mk 0L 2; mk 2L 2; mk 5L 1|] test <@ r |> is [| mk 1L 3; mk 5L 1 |] @> let [] ``mixed adjacent trim append`` () = - let r = Array.map (dropBeforeIndex 1L) [| mk 0L 2; mk 5L 1; mk 2L 2 |] + let r = Array.map (StreamSpan.dropBefore 1L) [| mk 0L 2; mk 5L 1; mk 2L 2 |] test <@ r |> is [| mk 1L 1; mk 5L 1; mk 2L 2 |] @> let [] ``mixed adjacent trim append merge`` () = - let r = merge 1L [| mk 0L 2; mk 5L 1; mk 2L 2|] + let r = StreamSpan.merge 1L [| mk 0L 2; mk 5L 1; mk 2L 2|] test <@ r |> is [| mk 1L 3; mk 5L 1 |] @> let [] fail () = - let r = merge 11614L [| null; mk 11614L 1 |] + let r = StreamSpan.merge 11614L [| null; mk 11614L 1 |] test <@ r |> is [| mk 11614L 1 |] @> let [] ``fail 2`` () = - let r = merge 11613L [| mk 11614L 1; null |] + let r = StreamSpan.merge 11613L [| mk 11614L 1; null |] test <@ r |> is [| mk 11614L 1 |] @> let (===) (xs: 't seq) (ys: 't seq) = (xs, ys) ||> Seq.forall2 isSame @@ -137,7 +98,7 @@ let [] ``merges retain freshest unfolds, yield mk_ pos events seg unfolds pos <- pos + int64 events seg <- seg + 1 |] - let res = merge 0L input + let res = StreamSpan.merge 0L input // The only way to end up with a null output is by sending either no spans, or all empties if res = null then test <@ input |> Array.forall Array.isEmpty @> @@ -165,7 +126,7 @@ let [] ``merges retain freshest unfolds, | _ -> true @> // resulting span sequence must be monotonic, with a gap of at least 1 in the Index ranges per span - test <@ res |> Seq.pairwise |> Seq.forall (fun (x, y) -> StreamSpan.nextIndex x < StreamSpan.index y) @> + test <@ res |> Seq.pairwise |> Seq.forall (fun (x, y) -> StreamSpan.next x < StreamSpan.index y) @> let others = res |> Array.take (res.Length - 1) // Only the last span can have unfolds diff --git a/tools/Propulsion.Tool/Sync.fs b/tools/Propulsion.Tool/Sync.fs index a85d6591..0cc57326 100644 --- a/tools/Propulsion.Tool/Sync.fs +++ b/tools/Propulsion.Tool/Sync.fs @@ -206,7 +206,7 @@ type Stats(log: ILogger, statsInterval, stateInterval, logExternalStats) = let private handle isValidEvent stream (events: Propulsion.Sinks.Event[]): Async = async { let ham, spam = events |> Array.partition isValidEvent - return Outcome.render_ stream ham spam 0, Propulsion.Sinks.Events.nextIndex events } + return Outcome.render_ stream ham spam 0, Propulsion.Sinks.Events.next events } let eofSignalException = System.Threading.Tasks.TaskCanceledException "Stopping; FeedMonitor wait completed" let run appName (c: Args.Configuration, p: ParseResults) = async { @@ -249,7 +249,7 @@ let run appName (c: Args.Configuration, p: ParseResults) = async { let json = Propulsion.Codec.NewtonsoftJson.RenderedSpan.ofStreamSpan stream events |> Propulsion.Codec.NewtonsoftJson.Serdes.Serialize do! producer.ProduceAsync(FsCodec.StreamName.toString stream, json) |> Async.Ignore - return Outcome.render_ stream ham spam 0, Propulsion.Sinks.Events.nextIndex events } + return Outcome.render_ stream ham spam 0, Propulsion.Sinks.Events.next events } Propulsion.Sinks.Factory.StartConcurrent(Log.Logger, maxReadAhead, maxConcurrentProcessors, handle a.Filters.EventFilter, stats, requireAll = requireAll) | SubCommand.Sync sa ->