Skip to content

Commit

Permalink
Polish batch stats
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Aug 1, 2024
1 parent e4c741e commit 1227038
Showing 1 changed file with 20 additions and 23 deletions.
43 changes: 20 additions & 23 deletions src/Propulsion/Streams.fs
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,10 @@ module Buffer =
if waitingCats.Any then log.Information(" Waiting Streams, KB {@readyStreams}", Seq.truncate 5 waitingStreams.StatsDescending)

[<NoComparison; NoEquality>]
type Batch private (onCompletion, reqs: Dictionary<FsCodec.StreamName, int64>, unfoldReqs: ISet<FsCodec.StreamName>) =
type Batch private (onCompletion, reqs: Dictionary<FsCodec.StreamName, int64>, unfoldReqs: ISet<FsCodec.StreamName>, eventsCount, unfoldsCount) =
static member Create(onCompletion, streamEvents: StreamEvent<'Format> seq) =
let streams, reqs, unfoldReqs = Streams<'Format>(), Dictionary<FsCodec.StreamName, int64>(), HashSet<FsCodec.StreamName>()
let mutable eventsCount, unfoldsCount = 0, 0
for struct (stream, eventsAndUnfolds) in streamEvents |> ValueTuple.groupWith Seq.toArray do
let unfolds, events = eventsAndUnfolds |> Array.partition _.IsUnfold
let mutable hwm = -1L
Expand All @@ -270,19 +271,23 @@ module Buffer =
let asBatch = [| event |]
streams.Merge(stream, asBatch)
hwm <- asBatch |> StreamSpan.next |> max hwm
eventsCount <- eventsCount + events.Length
match unfolds with
| [||] -> ()
| unfolds ->
unfoldsCount <- unfoldsCount + unfolds.Length
unfoldReqs.Add stream |> ignore
let next = unfolds |> StreamSpan.next
// Drop all but the last set
let unfolds = unfolds |> Array.filter (fun x -> x.Index = next)
hwm <- max hwm next
streams.Merge(stream, unfolds)
reqs.Add(stream, hwm)
struct (streams, Batch(onCompletion, reqs, unfoldReqs))
struct (streams, Batch(onCompletion, reqs, unfoldReqs, eventsCount, unfoldsCount))
member val OnCompletion = onCompletion
member val StreamsCount = reqs.Count
member val EventsCount = eventsCount
member val UnfoldsCount = unfoldsCount
member val Reqs = reqs :> seq<KeyValuePair<FsCodec.StreamName, int64>>
member val UnfoldReqs = unfoldReqs

Expand Down Expand Up @@ -607,8 +612,7 @@ module Scheduling =
let monitor, monitorInterval = Stats.Busy.Monitor(), IntervalTimer(TimeSpan.FromSeconds 1.)
let stateStats = Stats.StateStats()
let lats = LatencyStats()
let mutable cycles, batchesCompleted, batchesStarted, streamsStarted, streamsWrittenAhead = 0, 0, 0, 0, 0
let mutable eventsStarted, eventsWrittenAhead, unfoldsStarted, unfoldsWrittenAhead = 0, 0, 0, 0
let mutable cycles, batchesCompleted, batchesStarted, streams, skipped, unfolded, events, unfolds = 0, 0, 0, 0, 0, 0, 0, 0

member val Log = log
member val Latency = lats
Expand All @@ -619,9 +623,9 @@ module Scheduling =

member x.DumpStats(struct (dispatchActive, dispatchMax), struct (batchesWaiting, batchesRunning), abend) =
let batchesCompleted = System.Threading.Interlocked.Exchange(&batchesCompleted, 0)
log.Information("Batches waiting {waiting} started {started} {streams:n0}s {events:n0}e {unfolds:n0}u skipped {streamsSkipped:n0}s {eventsSkipped:n0}e {unfoldsSkipped:n0}u completed {completed} Running {active}",
batchesWaiting, batchesStarted, streamsStarted, eventsStarted, unfoldsStarted, streamsWrittenAhead, eventsWrittenAhead, unfoldsWrittenAhead, batchesCompleted, batchesRunning)
batchesStarted <- 0; streamsStarted <- 0; eventsStarted <- 0; unfoldsStarted <- 0; streamsWrittenAhead <- 0; eventsWrittenAhead <- 0; unfoldsWrittenAhead <- 0; (*batchesCompleted <- 0*)
log.Information("Batches Waiting {waiting} Started {started} {streams:n0}s ({skipped:n0} skipped {streamsUnfolded:n0} unfolded) {events:n0}e {unfolds:n0}u | Completed {completed} Running {active}",
batchesWaiting, batchesStarted, streams, skipped, unfolded, events, unfolds, batchesCompleted, batchesRunning)
batchesStarted <- 0; streams <- 0; skipped <- 0; unfolded <- 0; events <- 0; unfolds <- 0; (*batchesCompleted <- 0*)
x.Timers.Dump log
log.Information("Scheduler {cycles} cycles {@states} Running {busy}/{processors}",
cycles, stateStats.StatsDescending, dispatchActive, dispatchMax)
Expand All @@ -636,14 +640,13 @@ module Scheduling =
member _.HasLongRunning = monitor.OldestFailing.TotalSeconds > longRunningThresholdS
member _.Classify sn = monitor.Classify(longRunningThresholdS, sn)

member _.RecordIngested(streams, skippedStreams, events, skippedEvents, unfolds, skippedUnfolds) =
member _.RecordIngested(streamsCount, streamsSkipped, streamsUnfolded, eventCount, unfoldCount) =
batchesStarted <- batchesStarted + 1
streamsStarted <- streamsStarted + streams
streamsWrittenAhead <- streamsWrittenAhead + skippedStreams
eventsStarted <- eventsStarted + events
unfoldsStarted <- unfoldsStarted + unfolds
eventsWrittenAhead <- eventsWrittenAhead + skippedEvents
unfoldsWrittenAhead <- unfoldsWrittenAhead + skippedUnfolds
streams <- streams + streamsCount
skipped <- skipped + streamsSkipped
unfolded <- unfolded + streamsUnfolded
events <- events + eventCount
unfolds <- unfolds + unfoldCount

member _.RecordBatchCompletion() =
System.Threading.Interlocked.Increment(&batchesCompleted) |> ignore
Expand Down Expand Up @@ -882,16 +885,10 @@ module Scheduling =
// Take an incoming batch of events, correlating it against our known stream state to yield a set of required work before we can complete/checkpoint it
let ingest (batch: Batch) =
let reqs = Dictionary()
let mutable events, unfolds, eventsSkipped, unfoldsSkipped = 0, 0, 0, 0
for item in batch.Reqs do
let isUnfold = batch.UnfoldReqs.Contains item.Key
match streams.ToProgressRequirement(item.Key, item.Value, isUnfold) with
| ValueNone ->
if isUnfold then unfoldsSkipped <- unfoldsSkipped + 1 else eventsSkipped <- eventsSkipped + 1
| ValueSome req ->
if isUnfold then unfolds <- unfolds + 1 else events <- events + 1
reqs[item.Key] <- req
stats.RecordIngested(reqs.Count, batch.StreamsCount - reqs.Count, events, eventsSkipped, unfolds, unfoldsSkipped)
streams.ToProgressRequirement(item.Key, item.Value, batch.UnfoldReqs.Contains item.Key)
|> ValueOption.iter (fun req -> reqs[item.Key] <- req)
stats.RecordIngested(reqs.Count, batch.StreamsCount - reqs.Count, batch.UnfoldReqs.Count, batch.EventsCount, batch.UnfoldsCount)
let onCompletion () =
batch.OnCompletion ()
stats.RecordBatchCompletion()
Expand Down

0 comments on commit 1227038

Please sign in to comment.