Skip to content

Commit

Permalink
Cleanup Dispatch
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jul 23, 2024
1 parent 0e76a98 commit d310aec
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 122 deletions.
58 changes: 25 additions & 33 deletions src/Propulsion.CosmosStore/CosmosStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,16 @@ module Internal =
| Duplicate of updatedPos: int64
| PartialDuplicate of updatedPos: int64
| PrefixMissing of gap: int * actualPos: int64
let logTo (log: ILogger) malformed (res: StreamName * Result<struct (Result * StreamSpan.Metrics * Buffer.Revision), struct (exn * StreamSpan.Metrics)>) =
match res with
| stream, Ok (Result.Ok pos, _, _) ->
let logTo (log: ILogger) (stream: FsCodec.StreamName): Result<Dispatcher.ResProgressAndMetrics<Result>, Dispatcher.ExnAndMetrics> -> unit = function
| Ok (Result.Ok pos, _, _) ->
log.Information("Wrote {stream} up to {pos}", stream, pos)
| stream, Ok (Result.Duplicate updatedPos, _, _) ->
| Ok (Result.Duplicate updatedPos, _, _) ->
log.Information("Ignored {stream} (synced up to {pos})", stream, updatedPos)
| stream, Ok (Result.PartialDuplicate updatedPos, _, _) ->
| Ok (Result.PartialDuplicate updatedPos, _, _) ->
log.Information("Requeuing {stream} {pos}", stream, updatedPos)
| stream, Ok (Result.PrefixMissing (gap, pos), _, _) ->
| Ok (Result.PrefixMissing (gap, pos), _, _) ->
log.Information("Waiting {stream} missing {gap} events before {pos}", stream, gap, pos)
| stream, Error (exn, _) ->
| Error (exn, malformed, _) ->
let level = if malformed then LogEventLevel.Warning else Events.LogEventLevel.Information
log.Write(level, exn, "Writing {stream} failed, retrying", stream)

Expand Down Expand Up @@ -103,48 +102,39 @@ module Internal =

type Dispatcher =

static member Create(log: ILogger, eventsContext, itemDispatcher, ?maxEvents, ?maxBytes) =
static member Create(storeLog: ILogger, eventsContext, itemDispatcher, ?maxEvents, ?maxBytes) =
let maxEvents, maxBytes = defaultArg maxEvents 16384, defaultArg maxBytes (256 * 1024)
let writerResultLog = log.ForContext<Writer.Result>()
let attemptWrite stream span revision ct = task {
let struct (trimmed, met) = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span
#if COSMOSV3
try let! res = Writer.write log eventsContext (StreamName.toString stream) trimmed ct
try let! wr = Writer.write storeLog eventsContext (StreamName.toString stream) trimmed ct
#else
try let! res = Writer.write log eventsContext stream trimmed ct
try let! wr = Writer.write storeLog eventsContext stream trimmed ct
#endif
return Ok struct (res, met, revision)
with e -> return Error struct (e, met) }
let interpretProgress (streams: Scheduling.StreamStates<_>) stream res =
let applyResultToStreamState = function
| Ok struct (Writer.Result.Ok pos', met, revision) ->
streams.TrimEventsPriorTo(stream, pos') |> ignore
struct (Buffer.HandlerProgress.ofMetricsAndPos revision met pos' |> ValueSome, false)
| Ok ((Writer.Result.Duplicate pos' | Writer.Result.PartialDuplicate pos'), _met, _revision) ->
streams.TrimEventsPriorTo(stream, pos') |> ValueOption.map Buffer.HandlerProgress.ofPos, false // Trim any events that conflict with stuff already in the store
| Ok (Writer.Result.PrefixMissing _, _met, _revision) ->
ValueNone, false
| Error struct (exn, _met) ->
let malformed = Writer.classify exn |> Writer.isMalformed
streams.MarkMalformed(stream, malformed) |> ignore
ValueNone, malformed
let struct (handlerProgress, malformed) = applyResultToStreamState res
Writer.logTo writerResultLog malformed (stream, res)
struct (res, handlerProgress)
let hp = wr |> function
| Writer.Result.Ok pos' -> Buffer.HandlerProgress.ofMetricsAndPos revision met pos' |> ValueSome
| Writer.Result.Duplicate pos' | Writer.Result.PartialDuplicate pos' -> Buffer.HandlerProgress.ofPos pos' |> ValueSome
| Writer.Result.PrefixMissing _ -> ValueNone
return Ok struct (wr, hp, met)
with e -> return Error struct (e, Writer.classify e |> Writer.isMalformed, met) }
let interpretProgress = function
| Ok struct (_wr, hp, _met) as res -> struct (res, hp, false)
| Error struct (_exn, malformed, _met) as res -> res, ValueNone, malformed
Dispatcher.Concurrent<_, _, _, _>.Create(itemDispatcher, attemptWrite, interpretProgress = interpretProgress)

type WriterResult = Internal.Writer.Result

type CosmosStoreSinkStats(log: ILogger, statsInterval, stateInterval, [<O; D null>] ?failThreshold, [<O; D null>] ?logExternalStats) =
inherit Scheduling.Stats<struct (WriterResult * StreamSpan.Metrics * Buffer.Revision), struct (exn * StreamSpan.Metrics)>(
inherit Scheduling.Stats<Dispatcher.ResProgressAndMetrics<WriterResult>, Dispatcher.ExnAndMetrics>(
log, statsInterval, stateInterval, ?failThreshold = failThreshold,
logExternalStats = defaultArg logExternalStats Equinox.CosmosStore.Core.Log.InternalMetrics.dump)
let writerResultLog = log.ForContext<WriterResult>()
let mutable okStreams, okEvents, okUnfolds, okBytes = HashSet(), 0, 0, 0L
let mutable exnCats, exnStreams, exnEvents, exnUnfolds, exnBytes = Stats.Counters(), HashSet(), 0, 0, 0L
let mutable resultOk, resultDup, resultPartialDup, resultPrefix, resultExn = 0, 0, 0, 0, 0
override _.Handle message =
match message with
| { stream = stream; result = Ok (res, (es, us, bs), _revision) } ->
| { stream = stream; result = Ok (res, _hp, (es, us, bs)) as r } ->
okStreams.Add stream |> ignore
okEvents <- okEvents + es
okUnfolds <- okUnfolds + us
Expand All @@ -154,8 +144,9 @@ type CosmosStoreSinkStats(log: ILogger, statsInterval, stateInterval, [<O; D nul
| WriterResult.Duplicate _ -> resultDup <- resultDup + 1
| WriterResult.PartialDuplicate _ -> resultPartialDup <- resultPartialDup + 1
| WriterResult.PrefixMissing _ -> resultPrefix <- resultPrefix + 1
base.RecordOk(message, force = (us <> 0))
| { stream = stream; result = Error (Exception.Inner exn, (es, us, bs)) } ->
Internal.Writer.logTo writerResultLog stream r
base.RecordOk(message, us <> 0)
| { stream = stream; result = Error (Exception.Inner exn, _malformed, (es, us, bs)) as r } ->
exnCats.Ingest(StreamName.categorize stream)
exnStreams.Add stream |> ignore
exnEvents <- exnEvents + es
Expand All @@ -169,6 +160,7 @@ type CosmosStoreSinkStats(log: ILogger, statsInterval, stateInterval, [<O; D nul
| Internal.Writer.ResultKind.TooLarge -> OutcomeKind.Tagged "tooLarge"
| Internal.Writer.ResultKind.Malformed -> OutcomeKind.Tagged "malformed"
| Internal.Writer.ResultKind.Other -> OutcomeKind.Exn
Internal.Writer.logTo writerResultLog stream r
base.RecordExn(message, kind, log.ForContext("stream", stream).ForContext("events", es).ForContext("unfolds", us), exn)
override _.DumpStats() =
let results = resultOk + resultDup + resultPartialDup + resultPrefix
Expand Down
56 changes: 26 additions & 30 deletions src/Propulsion.EventStore/EventStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,16 @@ module Internal =
| PartialDuplicate of updatedPos: int64
| PrefixMissing of gap: int * actualPos: int64

let logTo (log: ILogger) (res: FsCodec.StreamName * Result<struct (Result * StreamSpan.Metrics), struct (exn * StreamSpan.Metrics)>) =
match res with
| stream, Ok (Result.Ok pos, _) ->
let logTo (log: ILogger) (stream: FsCodec.StreamName): Result<Dispatcher.ResProgressAndMetrics<Result>, Dispatcher.ExnAndMetrics> -> unit = function
| Ok (Result.Ok pos, _, _) ->
log.Information("Wrote {stream} up to {pos}", stream, pos)
| stream, Ok (Result.Duplicate updatedPos, _) ->
| Ok (Result.Duplicate updatedPos, _, _) ->
log.Information("Ignored {stream} (synced up to {pos})", stream, updatedPos)
| stream, Ok (Result.PartialDuplicate updatedPos, _) ->
| Ok (Result.PartialDuplicate updatedPos, _, _) ->
log.Information("Requeuing {stream} {pos}", stream, updatedPos)
| stream, Ok (Result.PrefixMissing (gap, pos), _) ->
| Ok (Result.PrefixMissing (gap, pos), _, _) ->
log.Information("Waiting {stream} missing {gap} events before {pos}", stream, gap, pos)
| stream, Error (exn, _) ->
| Error (exn, _, _) ->
log.Warning(exn,"Writing {stream} failed, retrying", stream)

let write (log: ILogger) (context: EventStoreContext) stream (span: Event[]) ct = task {
Expand All @@ -45,7 +44,7 @@ module Internal =
#else
let! res = context.Sync(log, stream, i - 1L, span |> Array.map (fun span -> span :> _), ct)
#endif
let ress =
let res' =
match res with
| GatewaySyncResult.Written (Token.Unpack pos') ->
Result.Ok (pos'.streamVersion + 1L)
Expand All @@ -54,15 +53,14 @@ module Internal =
| actual when actual < i -> Result.PrefixMissing (actual - i |> int, actual)
| actual when actual >= i + span.LongLength -> Result.Duplicate actual
| actual -> Result.PartialDuplicate actual
log.Debug("Result: {res}", ress)
return ress }
log.Debug("Result: {res}", res')
return res' }

type [<RequireQualifiedAccess>] ResultKind = TimedOut | Other

type Dispatcher =

static member Create(log: ILogger, storeLog, connections: _[], maxDop) =
let writerResultLog = log.ForContext<Writer.Result>()
static member Create(storeLog, connections: _[], maxDop) =
let mutable robin = 0

let attemptWrite stream span _revision ct = task {
Expand All @@ -71,32 +69,28 @@ module Internal =
let maxEvents, maxBytes = 65536, 4 * 1024 * 1024 - (*fudge*)4096
let struct (span, met) = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span
try let! res = Writer.write storeLog selectedConnection (FsCodec.StreamName.toString stream) span ct
return Ok struct (res, met)
with e -> return Error struct (e, met) }
let interpretProgress (streams: Scheduling.StreamStates<_>) stream res =
let applyResultToStreamState = function
| Ok struct ((Writer.Result.Ok pos' | Writer.Result.Duplicate pos' | Writer.Result.PartialDuplicate pos'), _stats) ->
streams.TrimEventsPriorTo(stream, pos')
| Ok (Writer.Result.PrefixMissing _, _stats) ->
ValueNone
| Error struct (_stats, _exn) ->
streams.MarkMalformed(stream, false)
let writePos = applyResultToStreamState res
Writer.logTo writerResultLog (stream, res)
struct (res, writePos |> ValueOption.map Buffer.HandlerProgress.ofPos)
let hp = res |> function
| Writer.Result.Ok pos' | Writer.Result.Duplicate pos' | Writer.Result.PartialDuplicate pos' -> Buffer.HandlerProgress.ofPos pos' |> ValueSome
| Writer.Result.PrefixMissing _ -> ValueNone
return Ok struct (res, hp, met)
with e -> return Error struct (e, false, met) }
let interpretProgress = function
| Ok struct (_res, hp, _met) as res -> struct (res, hp, false)
| Error struct (_exn, malformed, _met) as res -> res, ValueNone, malformed
Dispatcher.Concurrent<_, _, _, _>.Create(maxDop, project = attemptWrite, interpretProgress = interpretProgress)

type WriterResult = Internal.Writer.Result

type EventStoreSinkStats(log: ILogger, statsInterval, stateInterval, [<O; D null>] ?failThreshold, [<O; D null>] ?logExternalStats) =
inherit Scheduling.Stats<struct (WriterResult * StreamSpan.Metrics), struct (exn * StreamSpan.Metrics)>(
inherit Scheduling.Stats<Dispatcher.ResProgressAndMetrics<WriterResult>, Dispatcher.ExnAndMetrics>(
log, statsInterval, stateInterval, ?failThreshold = failThreshold,
logExternalStats = defaultArg logExternalStats Log.InternalMetrics.dump)
let writerResultLog = log.ForContext<WriterResult>()
let mutable okStreams, okEvents, okBytes, exnCats, exnStreams, exnEvents, exnBytes = HashSet(), 0, 0L, Stats.Counters(), HashSet(), 0 , 0L
let mutable resultOk, resultDup, resultPartialDup, resultPrefix, resultExn = 0, 0, 0, 0, 0
override _.Handle message =
match message with
| { stream = stream; result = Ok (res, (es, us, bs)) } ->
| { stream = stream; result = Ok (res, _hp, (es, us, bs)) as r } ->
okStreams.Add stream |> ignore
okEvents <- okEvents + es
okBytes <- okBytes + int64 bs
Expand All @@ -105,13 +99,15 @@ type EventStoreSinkStats(log: ILogger, statsInterval, stateInterval, [<O; D null
| WriterResult.Duplicate _ -> resultDup <- resultDup + 1
| WriterResult.PartialDuplicate _ -> resultPartialDup <- resultPartialDup + 1
| WriterResult.PrefixMissing _ -> resultPrefix <- resultPrefix + 1
base.RecordOk(message, force = (us <> 0))
| { stream = stream; result = Error (Exception.Inner exn, (es, _us, bs)) } ->
Internal.Writer.logTo writerResultLog stream r
base.RecordOk(message, us <> 0)
| { stream = stream; result = Error (Exception.Inner exn, _malformed, (es, _us, bs)) as r } ->
exnCats.Ingest(StreamName.categorize stream)
exnStreams.Add stream |> ignore
exnEvents <- exnEvents + es
exnBytes <- exnBytes + int64 bs
resultExn <- resultExn + 1
Internal.Writer.logTo writerResultLog stream r
base.RecordExn(message, OutcomeKind.classify exn, log.ForContext("stream", stream).ForContext("events", es), exn)
override _.DumpStats() =
let results = resultOk + resultDup + resultPartialDup + resultPrefix
Expand Down Expand Up @@ -139,7 +135,7 @@ type EventStoreSink =
// Tune the sleep time when there are no items to schedule or responses to process. Default 1ms.
?idleDelay,
?ingesterStateInterval, ?commitInterval): SinkPipeline =
let dispatcher = Internal.Dispatcher.Create(log, storeLog, connections, maxConcurrentStreams)
let dispatcher = Internal.Dispatcher.Create(storeLog, connections, maxConcurrentStreams)
let scheduler =
let dumpStreams logStreamStates _log = logStreamStates Event.storedSize
Scheduling.Engine(dispatcher, stats, dumpStreams, pendingBufferSize = 5, ?purgeInterval = purgeInterval, ?idleDelay = idleDelay)
Expand Down
8 changes: 4 additions & 4 deletions src/Propulsion.Kafka/Consumers.fs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ module Core =
static member Start<'Info, 'Outcome>
( log: ILogger, config: KafkaConsumerConfig, consumeResultToInfo, infoToStreamEvents,
prepare, maxDop, handle: Func<FsCodec.StreamName, Event[], CancellationToken, Task<struct ('Outcome * int64)>>,
stats: Scheduling.Stats<struct ('Outcome * StreamSpan.Metrics), struct (exn * StreamSpan.Metrics)>,
stats: Scheduling.Stats<struct ('Outcome * StreamSpan.Metrics), Dispatcher.ExnAndMetrics>,
?logExternalState, ?purgeInterval, ?wakeForResults, ?idleDelay) =
let dumpStreams logStreamStates log =
logExternalState |> Option.iter (fun f -> f log)
Expand Down Expand Up @@ -346,11 +346,11 @@ type Factory private () =
select, handle: Func<Scheduling.Item<_>[], CancellationToken, Task<seq<struct (Result<int64, exn> * TimeSpan)>>>, stats,
?logExternalState, ?purgeInterval, ?wakeForResults, ?idleDelay) =
let handle (items: Scheduling.Item<EventBody>[]) ct
: Task<Scheduling.InternalRes<Result<struct (int64 * StreamSpan.Metrics), struct (exn * StreamSpan.Metrics)>>[]> = task {
: Task<Scheduling.InternalRes<Result<Dispatcher.NextIndexAndMetrics, Dispatcher.ExnAndMetrics>>[]> = task {
let start = Stopwatch.timestamp ()
let inline err ts e (x: Scheduling.Item<_>) =
let met = StreamSpan.metrics Event.renderedSize x.span
Scheduling.InternalRes.create (x, ts, Result.Error struct (e, met))
Scheduling.InternalRes.create (x, ts, Result.Error struct (e, false, met))
try let! results = handle.Invoke(items, ct)
return Array.ofSeq (Seq.zip items results |> Seq.map (function
| item, (Ok index', ts) ->
Expand Down Expand Up @@ -393,7 +393,7 @@ type Factory private () =
// new ones that arrived while the handler was processing are then eligible for retry purposes in the next dispatch cycle)
handle: StreamState[] -> Async<seq<struct (Result<int64, exn> * TimeSpan)>>,
// The responses from each <c>handle</c> invocation are passed to <c>stats</c> for periodic emission
stats,
stats: Propulsion.Streams.Stats<unit>,
?logExternalState, ?purgeInterval, ?wakeForResults, ?idleDelay) =
let handle' xs ct = handle xs |> Async.executeAsTask ct
Factory.StartBatchedAsync<'Info>(log, config, consumeResultToInfo, infoToStreamEvents, select, handle', stats,
Expand Down
Loading

0 comments on commit d310aec

Please sign in to comment.