Skip to content

Commit

Permalink
Polish comments
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jul 18, 2024
1 parent 685de1f commit 4cf8766
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
4 changes: 2 additions & 2 deletions src/Propulsion.CosmosStore/CosmosStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ module Internal =
| Ok struct (Writer.Result.Ok pos', _stats) ->
struct (streams.SetWritePos(stream, pos'), false)
| Ok ((Writer.Result.Duplicate pos' | Writer.Result.PartialDuplicate pos'), _stats) ->
streams.SetWritePos(stream, pos') |> ignore // throw away the events (but not the unfolds)
ValueNone, false // Don't declare progress yet, until any unfolds have been handled
streams.SetWritePos(stream, pos') |> ignore // Trim any events that conflict with stuff already in the store
ValueNone, false // Don't declare progress until any unfolds have been handled
| Ok (Writer.Result.PrefixMissing _, _stats) ->
streams.WritePos(stream), false
| Error struct (exn, _stats) ->
Expand Down
6 changes: 4 additions & 2 deletions src/Propulsion/Streams.fs
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ module Scheduling =
let sw = Stopwatch.start()
member _.RecordResults ts = results <- results + Stopwatch.elapsedTicks ts
member _.RecordDispatch ts = dispatch <- dispatch + Stopwatch.elapsedTicks ts
// If we did not dispatch, we attempt ingestion of streams as a standalone task, but need to add to dispatch time to compensate for calcs below
// If we did not dispatch, we attempt ingestion of streams as a standalone task, but need to add to dispatch time to compensate for math below
member x.RecordDispatchNone ts = x.RecordDispatch ts
member _.RecordMerge ts = merge <- merge + Stopwatch.elapsedTicks ts
member _.RecordIngest ts = ingest <- ingest + Stopwatch.elapsedTicks ts
Expand Down Expand Up @@ -667,7 +667,7 @@ module Scheduling =

module Progress =

type [<Struct; NoComparison; NoEquality>] BatchState = { markCompleted: unit -> unit; streamToRequiredIndex: Dictionary<FsCodec.StreamName, int64> }
type [<Struct; NoComparison; NoEquality>] BatchState = private { markCompleted: unit -> unit; streamToRequiredIndex: Dictionary<FsCodec.StreamName, int64> }

type ProgressState<'Pos>() =
let pending = Queue<BatchState>()
Expand All @@ -690,6 +690,8 @@ module Scheduling =
member _.MarkStreamProgress(stream, index) =
for x in pending do
// example: when we reach position 1 on the stream (having handled event 0), and the required position was 1, we remove the requirement
// NOTE Any unfolds that accompany event 0 will also bear Index 0
// NOTE 2: subsequent updates to Unfolds will bear the same Index of 0 until there is an Event with Index 1
let mutable requiredIndex = Unchecked.defaultof<_>
if x.streamToRequiredIndex.TryGetValue(stream, &requiredIndex) && index >= requiredIndex then
x.streamToRequiredIndex.Remove stream |> ignore
Expand Down

0 comments on commit 4cf8766

Please sign in to comment.