From 4cf8766769502a1a55e827b41399e54ff90f0fbc Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Thu, 18 Jul 2024 09:52:42 +0100 Subject: [PATCH] Polish comments --- src/Propulsion.CosmosStore/CosmosStoreSink.fs | 4 ++-- src/Propulsion/Streams.fs | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/Propulsion.CosmosStore/CosmosStoreSink.fs b/src/Propulsion.CosmosStore/CosmosStoreSink.fs index 2904edff..cd45f510 100644 --- a/src/Propulsion.CosmosStore/CosmosStoreSink.fs +++ b/src/Propulsion.CosmosStore/CosmosStoreSink.fs @@ -120,8 +120,8 @@ module Internal = | Ok struct (Writer.Result.Ok pos', _stats) -> struct (streams.SetWritePos(stream, pos'), false) | Ok ((Writer.Result.Duplicate pos' | Writer.Result.PartialDuplicate pos'), _stats) -> - streams.SetWritePos(stream, pos') |> ignore // throw away the events (but not the unfolds) - ValueNone, false // Don't declare progress yet, until any unfolds have been handled + streams.SetWritePos(stream, pos') |> ignore // Trim any events that conflict with stuff already in the store + ValueNone, false // Don't declare progress until any unfolds have been handled | Ok (Writer.Result.PrefixMissing _, _stats) -> streams.WritePos(stream), false | Error struct (exn, _stats) -> diff --git a/src/Propulsion/Streams.fs b/src/Propulsion/Streams.fs index 9db6df8e..2c786ac2 100755 --- a/src/Propulsion/Streams.fs +++ b/src/Propulsion/Streams.fs @@ -493,7 +493,7 @@ module Scheduling = let sw = Stopwatch.start() member _.RecordResults ts = results <- results + Stopwatch.elapsedTicks ts member _.RecordDispatch ts = dispatch <- dispatch + Stopwatch.elapsedTicks ts - // If we did not dispatch, we attempt ingestion of streams as a standalone task, but need to add to dispatch time to compensate for calcs below + // If we did not dispatch, we attempt ingestion of streams as a standalone task, but need to add to dispatch time to compensate for math below member x.RecordDispatchNone ts = x.RecordDispatch ts member _.RecordMerge ts = merge <- merge + Stopwatch.elapsedTicks ts member _.RecordIngest ts = ingest <- ingest + Stopwatch.elapsedTicks ts @@ -667,7 +667,7 @@ module Scheduling = module Progress = - type [] BatchState = { markCompleted: unit -> unit; streamToRequiredIndex: Dictionary } + type [] BatchState = private { markCompleted: unit -> unit; streamToRequiredIndex: Dictionary } type ProgressState<'Pos>() = let pending = Queue() @@ -690,6 +690,8 @@ module Scheduling = member _.MarkStreamProgress(stream, index) = for x in pending do // example: when we reach position 1 on the stream (having handled event 0), and the required position was 1, we remove the requirement + // NOTE Any unfolds that accompany event 0 will also bear Index 0 + // NOTE 2: subsequent updates to Unfolds will bear the same Index of 0 until there is an Event with Index 1 let mutable requiredIndex = Unchecked.defaultof<_> if x.streamToRequiredIndex.TryGetValue(stream, &requiredIndex) && index >= requiredIndex then x.streamToRequiredIndex.Remove stream |> ignore