Skip to content

Commit

Permalink
Renames: nextIndex->next
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Aug 1, 2024
1 parent 642d189 commit 50d6eb8
Show file tree
Hide file tree
Showing 10 changed files with 38 additions and 78 deletions.
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore/CosmosStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ module Internal =

let write (log: ILogger) (ctx: EventsContext) stream (span: Event[]) ct = task {
let i = StreamSpan.index span
let n = StreamSpan.nextIndex span
let n = StreamSpan.next span
span |> Seq.iter (fun x -> if x.IsUnfold then invalidOp "CosmosStore3 does not [yet] support ingesting unfolds")
log.Debug("Writing {s}@{i}x{n}", stream, i, span.Length)
let mapData = FsCodec.Core.EventData.Map StreamSpan.toNativeEventBody
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.Kafka/ProducerSinks.fs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type StreamsProducerSink =
| _ -> ()
do! producer.Produce(key, message, ct = ct)
| ValueNone -> ()
return struct (outcome, Events.nextIndex span)
return struct (outcome, Events.next span)
}
Sync.Factory.StartAsync
( log, maxReadAhead, maxConcurrentStreams, handle,
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion/Sinks.fs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Codec<'E> = FsCodec.IEventCodec<'E, EventBody, unit>
module Events =

/// The Index of the next event ordinarily expected on the next handler invocation (assuming this invocation handles all successfully)
let nextIndex: Event[] -> int64 = Streams.StreamSpan.nextIndex
let next: Event[] -> int64 = Streams.StreamSpan.next
/// The Index of the first event as supplied to this handler
let index: Event[] -> int64 = Streams.StreamSpan.index

Expand Down
15 changes: 7 additions & 8 deletions src/Propulsion/Streams.fs
Original file line number Diff line number Diff line change
Expand Up @@ -103,22 +103,21 @@ module StreamSpan =
trimmed, metrics eventSize trimmed

let inline index (span: FsCodec.ITimelineEvent<'F>[]) = span[0].Index
let inline nextIndex (span: FsCodec.ITimelineEvent<'F>[]) =
let inline next (span: FsCodec.ITimelineEvent<'F>[]) =
let l = span[span.Length - 1]
if l.IsUnfold then l.Index else l.Index + 1L
let inline dropBeforeIndex min = function
let inline dropBefore min = function
| [||] as xs -> xs
| xs when nextIndex xs < min -> Array.empty
| xs when next xs < min -> Array.empty
| xs ->
match index xs with
| xi when xi = min -> xs
| xi -> xs |> Array.skip (min - xi |> int)

let merge min (spans: FsCodec.ITimelineEvent<_>[][]) =
let candidates =
[| for span in spans do
if span <> null then
match dropBeforeIndex min span with
match dropBefore min span with
| [||] -> ()
| xs -> xs |]
if candidates.Length = 0 then null
Expand All @@ -132,7 +131,7 @@ module StreamSpan =
for i in 1 .. candidates.Length - 1 do
let x = candidates[i]
let xIndex = index x
let accNext = nextIndex acc
let accNext = next acc
if xIndex > accNext then // Gap
match acc |> Array.filter (_.IsUnfold >> not) with
| [||] -> ()
Expand All @@ -141,8 +140,8 @@ module StreamSpan =
buffer.Add eventsOnly
acc <- x
// Overlapping, join
elif nextIndex x > accNext then
match dropBeforeIndex accNext x with
elif next x > accNext then
match dropBefore accNext x with
| [||] -> ()
| news ->
acc <- [| for x in acc do if not x.IsUnfold then x
Expand Down
4 changes: 2 additions & 2 deletions tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ module Helpers =
do! handler (getConsumer()) (deserialize consumerId event)
(log: ILogger).Information("BATCHED CONSUMER Handled {c} events in {l} streams", c, streams.Length)
let ts = Stopwatch.elapsed ts
return seq { for x in streams -> struct (Ok (Propulsion.Sinks.Events.nextIndex x.span), ts) } }
return seq { for x in streams -> struct (Ok (Propulsion.Sinks.Events.next x.span), ts) } }
let stats = Stats(log, TimeSpan.FromSeconds 5.,TimeSpan.FromSeconds 5.)
let messageIndexes = StreamNameSequenceGenerator()
let consumer =
Expand Down Expand Up @@ -165,7 +165,7 @@ module Helpers =
let handle _ (span: Propulsion.Sinks.Event[]) = async {
for event in span do
do! handler (getConsumer()) (deserialize consumerId event)
return (), Propulsion.Sinks.Events.nextIndex span }
return (), Propulsion.Sinks.Events.next span }
let stats = Stats(log, TimeSpan.FromSeconds 5.,TimeSpan.FromSeconds 5.)
let messageIndexes = StreamNameSequenceGenerator()
let consumer =
Expand Down
4 changes: 2 additions & 2 deletions tests/Propulsion.MessageDb.Integration/Tests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ let ``It processes events for a category`` () = task {
test <@ Array.chooseV Simple.codec.Decode events |> Array.forall ((=) (Simple.Hello { name = "world" })) @>
if handled.Count >= 2000 then
stop ()
return struct ((), Propulsion.Sinks.Events.nextIndex events) }
return struct ((), Propulsion.Sinks.Events.next events) }
use sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 2, 2, handle, stats)
let source = MessageDbSource(
log, TimeSpan.FromMinutes 1,
Expand Down Expand Up @@ -132,7 +132,7 @@ let ``It doesn't read the tail event again`` () = task {
let stats = stats log

let handle _ events _ = task {
return struct ((), Propulsion.Sinks.Events.nextIndex events) }
return struct ((), Propulsion.Sinks.Events.next events) }
use sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 1, 1, handle, stats)
let batchSize = 10
let source = MessageDbSource(
Expand Down
2 changes: 1 addition & 1 deletion tests/Propulsion.Tests/SinkHealthTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Scenario(testOutput) =
return failwith "transient"
else
do! Async.Sleep (TimeSpan.FromSeconds 1)
return (), Propulsion.Sinks.Events.nextIndex events }
return (), Propulsion.Sinks.Events.next events }
let sink = Propulsion.Sinks.Factory.StartConcurrent(log, 2, 2, handle, stats)
let dispose () =
sink.Stop()
Expand Down
2 changes: 1 addition & 1 deletion tests/Propulsion.Tests/SourceTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Scenario(testOutput) =
let stats = { new Propulsion.Streams.Stats<_>(log, TimeSpan.FromMinutes 1, TimeSpan.FromMinutes 1)
with member _.HandleOk x = ()
member _.HandleExn(log, x) = () }
let handle _ events _ = task { return struct ((), Propulsion.Sinks.Events.nextIndex events) }
let handle _ events _ = task { return struct ((), Propulsion.Sinks.Events.next events) }
let sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 2, 2, handle, stats)
let dispose () =
sink.Stop()
Expand Down
79 changes: 20 additions & 59 deletions tests/Propulsion.Tests/StreamStateTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,123 +5,84 @@ open Propulsion.Streams
open Swensen.Unquote
open Xunit

module FsCodec301 = // Not yet merged, https://github.com/jet/FsCodec/pull/123
open FsCodec
open System
/// <summary>An Event or Unfold that's been read from a Store and hence has a defined <c>Index</c> on the Event Timeline.</summary>
[<NoComparison; NoEquality>]
type TimelineEvent2<'Format>(index, eventType, data, meta, eventId, correlationId, causationId, timestamp, isUnfold, context, size) =

static member Create(index, eventType, data, ?meta, ?eventId, ?correlationId, ?causationId, ?timestamp, ?isUnfold, ?context, ?size): ITimelineEvent<'Format> =
let isUnfold = defaultArg isUnfold false
let meta = match meta with Some x -> x | None -> Unchecked.defaultof<_>
let eventId = match eventId with Some x -> x | None -> Guid.Empty
let ts = match timestamp with Some ts -> ts | None -> DateTimeOffset.UtcNow
let size = defaultArg size 0
TimelineEvent2(index, eventType, data, meta, eventId, Option.toObj correlationId, Option.toObj causationId, ts, isUnfold, Option.toObj context, size) :> _

static member Create(index, inner: IEventData<'Format>, ?isUnfold, ?context, ?size): ITimelineEvent<'Format> =
let isUnfold = defaultArg isUnfold false
let size = defaultArg size 0
TimelineEvent2(index, inner.EventType, inner.Data, inner.Meta, inner.EventId, inner.CorrelationId, inner.CausationId, inner.Timestamp, isUnfold, Option.toObj context, size) :> _

override _.ToString() =
let t = if isUnfold then "Unfold" else "Event"
$"{t} {eventType} @{index} {context}"
interface ITimelineEvent<'Format> with
member _.Index = index
member _.IsUnfold = isUnfold
member _.Context = context
member _.Size = size
member _.EventType = eventType
member _.Data = data
member _.Meta = meta
member _.EventId = eventId
member _.CorrelationId = correlationId
member _.CausationId = causationId
member _.Timestamp = timestamp
open FsCodec301

let canonicalTime = System.DateTimeOffset.UtcNow

let mk_ p c seg uc: FsCodec.ITimelineEvent<string>[] =
let mk id et isUnfold = TimelineEvent2.Create(id, et, null, timestamp = canonicalTime, isUnfold = isUnfold, context = seg)
let mk id et isUnfold = FsCodec.Core.TimelineEvent.Create(id, et, null, timestamp = canonicalTime, isUnfold = isUnfold, context = seg)
[| for x in 0..c-1 -> mk (p + int64 x) (p + int64 x |> string) false
for u in 0..uc-1 -> mk (p + int64 c) $"{p+int64 c}u{u}" true |]
let mk p c = mk_ p c 0 0
let merge = StreamSpan.merge
let isSame = LanguagePrimitives.PhysicalEquality
let dropBeforeIndex = StreamSpan.dropBeforeIndex
let is (xs: FsCodec.ITimelineEvent<string>[][]) (res: FsCodec.ITimelineEvent<string>[][]) =
(xs, res) ||> Seq.forall2 (fun x y -> (Array.isEmpty x && Array.isEmpty y)
|| x[0].Index = y[0].Index && (x, y) ||> Seq.forall2 (fun x y -> x.EventType = y.EventType))

let [<Fact>] nothing () =
let r = merge 0L [| mk 0L 0; mk 0L 0 |]
let r = StreamSpan.merge 0L [| mk 0L 0; mk 0L 0 |]
test <@ isSame null r @>

let [<Fact>] synced () =
let r = merge 1L [| mk 0L 1; mk 0L 0 |]
let r = StreamSpan.merge 1L [| mk 0L 1; mk 0L 0 |]
test <@ isSame null r @>

let [<Fact>] ``no overlap`` () =
let r = merge 0L [| mk 0L 1; mk 2L 2 |]
let r = StreamSpan.merge 0L [| mk 0L 1; mk 2L 2 |]
test <@ r |> is [| mk 0L 1; mk 2L 2 |] @>

let [<Fact>] overlap () =
let r = merge 0L [| mk 0L 1; mk 0L 2 |]
let r = StreamSpan.merge 0L [| mk 0L 1; mk 0L 2 |]
test <@ r |> is [| mk 0L 2 |] @>

let [<Fact>] ``remove nulls`` () =
let r = merge 1L [| mk 0L 1; mk 0L 2 |]
let r = StreamSpan.merge 1L [| mk 0L 1; mk 0L 2 |]
test <@ r |> is [| mk 1L 1 |] @>

let [<Fact>] adjacent () =
let r = merge 0L [| mk 0L 1; mk 1L 2 |]
let r = StreamSpan.merge 0L [| mk 0L 1; mk 1L 2 |]
test <@ r |> is [| mk 0L 3 |] @>

let [<Fact>] ``adjacent to min`` () =
let r = Array.map (dropBeforeIndex 2L) [| mk 0L 1; mk 1L 2 |]
let r = Array.map (StreamSpan.dropBefore 2L) [| mk 0L 1; mk 1L 2 |]
test <@ r |> is [| [||]; mk 2L 1 |] @>

let [<Fact>] ``adjacent to min merge`` () =
let r = merge 2L [| mk 0L 1; mk 1L 2 |]
let r = StreamSpan.merge 2L [| mk 0L 1; mk 1L 2 |]
test <@ r |> is [| mk 2L 1 |] @>

let [<Fact>] ``adjacent to min no overlap`` () =
let r = merge 2L [| mk 0L 1; mk 2L 1 |]
let r = StreamSpan.merge 2L [| mk 0L 1; mk 2L 1 |]
test <@ r |> is [| mk 2L 1|] @>

let [<Fact>] ``adjacent trim`` () =
let r = Array.map (dropBeforeIndex 1L) [| mk 0L 2; mk 2L 2 |]
let r = Array.map (StreamSpan.dropBefore 1L) [| mk 0L 2; mk 2L 2 |]
test <@ r |> is [| mk 1L 1; mk 2L 2 |] @>

let [<Fact>] ``adjacent trim merge`` () =
let r = merge 1L [| mk 0L 2; mk 2L 2 |]
let r = StreamSpan.merge 1L [| mk 0L 2; mk 2L 2 |]
test <@ r |> is [| mk 1L 3 |] @>

let [<Fact>] ``adjacent trim append`` () =
let r = Array.map (dropBeforeIndex 1L) [| mk 0L 2; mk 2L 2; mk 5L 1 |]
let r = Array.map (StreamSpan.dropBefore 1L) [| mk 0L 2; mk 2L 2; mk 5L 1 |]
test <@ r |> is [| mk 1L 1; mk 2L 2; mk 5L 1 |] @>

let [<Fact>] ``adjacent trim append merge`` () =
let r = merge 1L [| mk 0L 2; mk 2L 2; mk 5L 1|]
let r = StreamSpan.merge 1L [| mk 0L 2; mk 2L 2; mk 5L 1|]
test <@ r |> is [| mk 1L 3; mk 5L 1 |] @>

let [<Fact>] ``mixed adjacent trim append`` () =
let r = Array.map (dropBeforeIndex 1L) [| mk 0L 2; mk 5L 1; mk 2L 2 |]
let r = Array.map (StreamSpan.dropBefore 1L) [| mk 0L 2; mk 5L 1; mk 2L 2 |]
test <@ r |> is [| mk 1L 1; mk 5L 1; mk 2L 2 |] @>

let [<Fact>] ``mixed adjacent trim append merge`` () =
let r = merge 1L [| mk 0L 2; mk 5L 1; mk 2L 2|]
let r = StreamSpan.merge 1L [| mk 0L 2; mk 5L 1; mk 2L 2|]
test <@ r |> is [| mk 1L 3; mk 5L 1 |] @>

let [<Fact>] fail () =
let r = merge 11614L [| null; mk 11614L 1 |]
let r = StreamSpan.merge 11614L [| null; mk 11614L 1 |]
test <@ r |> is [| mk 11614L 1 |] @>

let [<Fact>] ``fail 2`` () =
let r = merge 11613L [| mk 11614L 1; null |]
let r = StreamSpan.merge 11613L [| mk 11614L 1; null |]
test <@ r |> is [| mk 11614L 1 |] @>

let (===) (xs: 't seq) (ys: 't seq) = (xs, ys) ||> Seq.forall2 isSame
Expand All @@ -137,7 +98,7 @@ let [<FsCheck.Xunit.Property(MaxTest = 1000)>] ``merges retain freshest unfolds,
yield mk_ pos events seg unfolds
pos <- pos + int64 events
seg <- seg + 1 |]
let res = merge 0L input
let res = StreamSpan.merge 0L input
// The only way to end up with a null output is by sending either no spans, or all empties
if res = null then
test <@ input |> Array.forall Array.isEmpty @>
Expand Down Expand Up @@ -165,7 +126,7 @@ let [<FsCheck.Xunit.Property(MaxTest = 1000)>] ``merges retain freshest unfolds,
| _ -> true @>

// resulting span sequence must be monotonic, with a gap of at least 1 in the Index ranges per span
test <@ res |> Seq.pairwise |> Seq.forall (fun (x, y) -> StreamSpan.nextIndex x < StreamSpan.index y) @>
test <@ res |> Seq.pairwise |> Seq.forall (fun (x, y) -> StreamSpan.next x < StreamSpan.index y) @>

let others = res |> Array.take (res.Length - 1)
// Only the last span can have unfolds
Expand Down
4 changes: 2 additions & 2 deletions tools/Propulsion.Tool/Sync.fs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ type Stats(log: ILogger, statsInterval, stateInterval, logExternalStats) =

let private handle isValidEvent stream (events: Propulsion.Sinks.Event[]): Async<Outcome * int64> = async {
let ham, spam = events |> Array.partition isValidEvent
return Outcome.render_ stream ham spam 0, Propulsion.Sinks.Events.nextIndex events }
return Outcome.render_ stream ham spam 0, Propulsion.Sinks.Events.next events }

let eofSignalException = System.Threading.Tasks.TaskCanceledException "Stopping; FeedMonitor wait completed"
let run appName (c: Args.Configuration, p: ParseResults<Parameters>) = async {
Expand Down Expand Up @@ -249,7 +249,7 @@ let run appName (c: Args.Configuration, p: ParseResults<Parameters>) = async {
let json = Propulsion.Codec.NewtonsoftJson.RenderedSpan.ofStreamSpan stream events
|> Propulsion.Codec.NewtonsoftJson.Serdes.Serialize
do! producer.ProduceAsync(FsCodec.StreamName.toString stream, json) |> Async.Ignore
return Outcome.render_ stream ham spam 0, Propulsion.Sinks.Events.nextIndex events }
return Outcome.render_ stream ham spam 0, Propulsion.Sinks.Events.next events }
Propulsion.Sinks.Factory.StartConcurrent(Log.Logger, maxReadAhead, maxConcurrentProcessors, handle a.Filters.EventFilter, stats,
requireAll = requireAll)
| SubCommand.Sync sa ->
Expand Down

0 comments on commit 50d6eb8

Please sign in to comment.