diff --git a/src/Propulsion/Streams.fs b/src/Propulsion/Streams.fs index 7fce7fb0..bbd56b3c 100755 --- a/src/Propulsion/Streams.fs +++ b/src/Propulsion/Streams.fs @@ -259,9 +259,10 @@ module Buffer = if waitingCats.Any then log.Information(" Waiting Streams, KB {@readyStreams}", Seq.truncate 5 waitingStreams.StatsDescending) [] - type Batch private (onCompletion, reqs: Dictionary, unfoldReqs: ISet) = + type Batch private (onCompletion, reqs: Dictionary, unfoldReqs: ISet, eventsCount, unfoldsCount) = static member Create(onCompletion, streamEvents: StreamEvent<'Format> seq) = let streams, reqs, unfoldReqs = Streams<'Format>(), Dictionary(), HashSet() + let mutable eventsCount, unfoldsCount = 0, 0 for struct (stream, eventsAndUnfolds) in streamEvents |> ValueTuple.groupWith Seq.toArray do let unfolds, events = eventsAndUnfolds |> Array.partition _.IsUnfold let mutable hwm = -1L @@ -270,9 +271,11 @@ module Buffer = let asBatch = [| event |] streams.Merge(stream, asBatch) hwm <- asBatch |> StreamSpan.next |> max hwm + eventsCount <- eventsCount + events.Length match unfolds with | [||] -> () | unfolds -> + unfoldsCount <- unfoldsCount + unfolds.Length unfoldReqs.Add stream |> ignore let next = unfolds |> StreamSpan.next // Drop all but the last set @@ -280,9 +283,11 @@ module Buffer = hwm <- max hwm next streams.Merge(stream, unfolds) reqs.Add(stream, hwm) - struct (streams, Batch(onCompletion, reqs, unfoldReqs)) + struct (streams, Batch(onCompletion, reqs, unfoldReqs, eventsCount, unfoldsCount)) member val OnCompletion = onCompletion member val StreamsCount = reqs.Count + member val EventsCount = eventsCount + member val UnfoldsCount = unfoldsCount member val Reqs = reqs :> seq> member val UnfoldReqs = unfoldReqs @@ -607,8 +612,7 @@ module Scheduling = let monitor, monitorInterval = Stats.Busy.Monitor(), IntervalTimer(TimeSpan.FromSeconds 1.) let stateStats = Stats.StateStats() let lats = LatencyStats() - let mutable cycles, batchesCompleted, batchesStarted, streamsStarted, streamsWrittenAhead = 0, 0, 0, 0, 0 - let mutable eventsStarted, eventsWrittenAhead, unfoldsStarted, unfoldsWrittenAhead = 0, 0, 0, 0 + let mutable cycles, batchesCompleted, batchesStarted, streams, skipped, unfolded, events, unfolds = 0, 0, 0, 0, 0, 0, 0, 0 member val Log = log member val Latency = lats @@ -619,9 +623,9 @@ module Scheduling = member x.DumpStats(struct (dispatchActive, dispatchMax), struct (batchesWaiting, batchesRunning), abend) = let batchesCompleted = System.Threading.Interlocked.Exchange(&batchesCompleted, 0) - log.Information("Batches waiting {waiting} started {started} {streams:n0}s {events:n0}e {unfolds:n0}u skipped {streamsSkipped:n0}s {eventsSkipped:n0}e {unfoldsSkipped:n0}u completed {completed} Running {active}", - batchesWaiting, batchesStarted, streamsStarted, eventsStarted, unfoldsStarted, streamsWrittenAhead, eventsWrittenAhead, unfoldsWrittenAhead, batchesCompleted, batchesRunning) - batchesStarted <- 0; streamsStarted <- 0; eventsStarted <- 0; unfoldsStarted <- 0; streamsWrittenAhead <- 0; eventsWrittenAhead <- 0; unfoldsWrittenAhead <- 0; (*batchesCompleted <- 0*) + log.Information("Batches Waiting {waiting} Started {started} {streams:n0}s ({skipped:n0} skipped {streamsUnfolded:n0} unfolded) {events:n0}e {unfolds:n0}u | Completed {completed} Running {active}", + batchesWaiting, batchesStarted, streams, skipped, unfolded, events, unfolds, batchesCompleted, batchesRunning) + batchesStarted <- 0; streams <- 0; skipped <- 0; unfolded <- 0; events <- 0; unfolds <- 0; (*batchesCompleted <- 0*) x.Timers.Dump log log.Information("Scheduler {cycles} cycles {@states} Running {busy}/{processors}", cycles, stateStats.StatsDescending, dispatchActive, dispatchMax) @@ -636,14 +640,13 @@ module Scheduling = member _.HasLongRunning = monitor.OldestFailing.TotalSeconds > longRunningThresholdS member _.Classify sn = monitor.Classify(longRunningThresholdS, sn) - member _.RecordIngested(streams, skippedStreams, events, skippedEvents, unfolds, skippedUnfolds) = + member _.RecordIngested(streamsCount, streamsSkipped, streamsUnfolded, eventCount, unfoldCount) = batchesStarted <- batchesStarted + 1 - streamsStarted <- streamsStarted + streams - streamsWrittenAhead <- streamsWrittenAhead + skippedStreams - eventsStarted <- eventsStarted + events - unfoldsStarted <- unfoldsStarted + unfolds - eventsWrittenAhead <- eventsWrittenAhead + skippedEvents - unfoldsWrittenAhead <- unfoldsWrittenAhead + skippedUnfolds + streams <- streams + streamsCount + skipped <- skipped + streamsSkipped + unfolded <- unfolded + streamsUnfolded + events <- events + eventCount + unfolds <- unfolds + unfoldCount member _.RecordBatchCompletion() = System.Threading.Interlocked.Increment(&batchesCompleted) |> ignore @@ -882,16 +885,10 @@ module Scheduling = // Take an incoming batch of events, correlating it against our known stream state to yield a set of required work before we can complete/checkpoint it let ingest (batch: Batch) = let reqs = Dictionary() - let mutable events, unfolds, eventsSkipped, unfoldsSkipped = 0, 0, 0, 0 for item in batch.Reqs do - let isUnfold = batch.UnfoldReqs.Contains item.Key - match streams.ToProgressRequirement(item.Key, item.Value, isUnfold) with - | ValueNone -> - if isUnfold then unfoldsSkipped <- unfoldsSkipped + 1 else eventsSkipped <- eventsSkipped + 1 - | ValueSome req -> - if isUnfold then unfolds <- unfolds + 1 else events <- events + 1 - reqs[item.Key] <- req - stats.RecordIngested(reqs.Count, batch.StreamsCount - reqs.Count, events, eventsSkipped, unfolds, unfoldsSkipped) + streams.ToProgressRequirement(item.Key, item.Value, batch.UnfoldReqs.Contains item.Key) + |> ValueOption.iter (fun req -> reqs[item.Key] <- req) + stats.RecordIngested(reqs.Count, batch.StreamsCount - reqs.Count, batch.UnfoldReqs.Count, batch.EventsCount, batch.UnfoldsCount) let onCompletion () = batch.OnCompletion () stats.RecordBatchCompletion()