diff --git a/src/Propulsion.CosmosStore/CosmosStoreSink.fs b/src/Propulsion.CosmosStore/CosmosStoreSink.fs index 8d26eae6..38e515d0 100644 --- a/src/Propulsion.CosmosStore/CosmosStoreSink.fs +++ b/src/Propulsion.CosmosStore/CosmosStoreSink.fs @@ -49,17 +49,16 @@ module Internal = | Duplicate of updatedPos: int64 | PartialDuplicate of updatedPos: int64 | PrefixMissing of gap: int * actualPos: int64 - let logTo (log: ILogger) malformed (res: StreamName * Result) = - match res with - | stream, Ok (Result.Ok pos, _, _) -> + let logTo (log: ILogger) (stream: FsCodec.StreamName): Result, Dispatcher.ExnAndMetrics> -> unit = function + | Ok (Result.Ok pos, _, _) -> log.Information("Wrote {stream} up to {pos}", stream, pos) - | stream, Ok (Result.Duplicate updatedPos, _, _) -> + | Ok (Result.Duplicate updatedPos, _, _) -> log.Information("Ignored {stream} (synced up to {pos})", stream, updatedPos) - | stream, Ok (Result.PartialDuplicate updatedPos, _, _) -> + | Ok (Result.PartialDuplicate updatedPos, _, _) -> log.Information("Requeuing {stream} {pos}", stream, updatedPos) - | stream, Ok (Result.PrefixMissing (gap, pos), _, _) -> + | Ok (Result.PrefixMissing (gap, pos), _, _) -> log.Information("Waiting {stream} missing {gap} events before {pos}", stream, gap, pos) - | stream, Error (exn, _) -> + | Error (exn, malformed, _) -> let level = if malformed then LogEventLevel.Warning else Events.LogEventLevel.Information log.Write(level, exn, "Writing {stream} failed, retrying", stream) @@ -103,48 +102,39 @@ module Internal = type Dispatcher = - static member Create(log: ILogger, eventsContext, itemDispatcher, ?maxEvents, ?maxBytes) = + static member Create(storeLog: ILogger, eventsContext, itemDispatcher, ?maxEvents, ?maxBytes) = let maxEvents, maxBytes = defaultArg maxEvents 16384, defaultArg maxBytes (256 * 1024) - let writerResultLog = log.ForContext() let attemptWrite stream span revision ct = task { let struct (trimmed, met) = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span #if COSMOSV3 - try let! res = Writer.write log eventsContext (StreamName.toString stream) trimmed ct + try let! wr = Writer.write storeLog eventsContext (StreamName.toString stream) trimmed ct #else - try let! res = Writer.write log eventsContext stream trimmed ct + try let! wr = Writer.write storeLog eventsContext stream trimmed ct #endif - return Ok struct (res, met, revision) - with e -> return Error struct (e, met) } - let interpretProgress (streams: Scheduling.StreamStates<_>) stream res = - let applyResultToStreamState = function - | Ok struct (Writer.Result.Ok pos', met, revision) -> - streams.TrimEventsPriorTo(stream, pos') |> ignore - struct (Buffer.HandlerProgress.ofMetricsAndPos revision met pos' |> ValueSome, false) - | Ok ((Writer.Result.Duplicate pos' | Writer.Result.PartialDuplicate pos'), _met, _revision) -> - streams.TrimEventsPriorTo(stream, pos') |> ValueOption.map Buffer.HandlerProgress.ofPos, false // Trim any events that conflict with stuff already in the store - | Ok (Writer.Result.PrefixMissing _, _met, _revision) -> - ValueNone, false - | Error struct (exn, _met) -> - let malformed = Writer.classify exn |> Writer.isMalformed - streams.MarkMalformed(stream, malformed) |> ignore - ValueNone, malformed - let struct (handlerProgress, malformed) = applyResultToStreamState res - Writer.logTo writerResultLog malformed (stream, res) - struct (res, handlerProgress) + let hp = wr |> function + | Writer.Result.Ok pos' -> Buffer.HandlerProgress.ofMetricsAndPos revision met pos' |> ValueSome + | Writer.Result.Duplicate pos' | Writer.Result.PartialDuplicate pos' -> Buffer.HandlerProgress.ofPos pos' |> ValueSome + | Writer.Result.PrefixMissing _ -> ValueNone + return Ok struct (wr, hp, met) + with e -> return Error struct (e, Writer.classify e |> Writer.isMalformed, met) } + let interpretProgress = function + | Ok struct (_wr, hp, _met) as res -> struct (res, hp, false) + | Error struct (_exn, malformed, _met) as res -> res, ValueNone, malformed Dispatcher.Concurrent<_, _, _, _>.Create(itemDispatcher, attemptWrite, interpretProgress = interpretProgress) type WriterResult = Internal.Writer.Result type CosmosStoreSinkStats(log: ILogger, statsInterval, stateInterval, [] ?failThreshold, [] ?logExternalStats) = - inherit Scheduling.Stats( + inherit Scheduling.Stats, Dispatcher.ExnAndMetrics>( log, statsInterval, stateInterval, ?failThreshold = failThreshold, logExternalStats = defaultArg logExternalStats Equinox.CosmosStore.Core.Log.InternalMetrics.dump) + let writerResultLog = log.ForContext() let mutable okStreams, okEvents, okUnfolds, okBytes = HashSet(), 0, 0, 0L let mutable exnCats, exnStreams, exnEvents, exnUnfolds, exnBytes = Stats.Counters(), HashSet(), 0, 0, 0L let mutable resultOk, resultDup, resultPartialDup, resultPrefix, resultExn = 0, 0, 0, 0, 0 override _.Handle message = match message with - | { stream = stream; result = Ok (res, (es, us, bs), _revision) } -> + | { stream = stream; result = Ok (res, _hp, (es, us, bs)) as r } -> okStreams.Add stream |> ignore okEvents <- okEvents + es okUnfolds <- okUnfolds + us @@ -154,8 +144,9 @@ type CosmosStoreSinkStats(log: ILogger, statsInterval, stateInterval, [ resultDup <- resultDup + 1 | WriterResult.PartialDuplicate _ -> resultPartialDup <- resultPartialDup + 1 | WriterResult.PrefixMissing _ -> resultPrefix <- resultPrefix + 1 - base.RecordOk(message, force = (us <> 0)) - | { stream = stream; result = Error (Exception.Inner exn, (es, us, bs)) } -> + Internal.Writer.logTo writerResultLog stream r + base.RecordOk(message, us <> 0) + | { stream = stream; result = Error (Exception.Inner exn, _malformed, (es, us, bs)) as r } -> exnCats.Ingest(StreamName.categorize stream) exnStreams.Add stream |> ignore exnEvents <- exnEvents + es @@ -169,6 +160,7 @@ type CosmosStoreSinkStats(log: ILogger, statsInterval, stateInterval, [ OutcomeKind.Tagged "tooLarge" | Internal.Writer.ResultKind.Malformed -> OutcomeKind.Tagged "malformed" | Internal.Writer.ResultKind.Other -> OutcomeKind.Exn + Internal.Writer.logTo writerResultLog stream r base.RecordExn(message, kind, log.ForContext("stream", stream).ForContext("events", es).ForContext("unfolds", us), exn) override _.DumpStats() = let results = resultOk + resultDup + resultPartialDup + resultPrefix diff --git a/src/Propulsion.EventStore/EventStoreSink.fs b/src/Propulsion.EventStore/EventStoreSink.fs index a0d1b0a0..085bed8e 100755 --- a/src/Propulsion.EventStore/EventStoreSink.fs +++ b/src/Propulsion.EventStore/EventStoreSink.fs @@ -24,17 +24,16 @@ module Internal = | PartialDuplicate of updatedPos: int64 | PrefixMissing of gap: int * actualPos: int64 - let logTo (log: ILogger) (res: FsCodec.StreamName * Result) = - match res with - | stream, Ok (Result.Ok pos, _) -> + let logTo (log: ILogger) (stream: FsCodec.StreamName): Result, Dispatcher.ExnAndMetrics> -> unit = function + | Ok (Result.Ok pos, _, _) -> log.Information("Wrote {stream} up to {pos}", stream, pos) - | stream, Ok (Result.Duplicate updatedPos, _) -> + | Ok (Result.Duplicate updatedPos, _, _) -> log.Information("Ignored {stream} (synced up to {pos})", stream, updatedPos) - | stream, Ok (Result.PartialDuplicate updatedPos, _) -> + | Ok (Result.PartialDuplicate updatedPos, _, _) -> log.Information("Requeuing {stream} {pos}", stream, updatedPos) - | stream, Ok (Result.PrefixMissing (gap, pos), _) -> + | Ok (Result.PrefixMissing (gap, pos), _, _) -> log.Information("Waiting {stream} missing {gap} events before {pos}", stream, gap, pos) - | stream, Error (exn, _) -> + | Error (exn, _, _) -> log.Warning(exn,"Writing {stream} failed, retrying", stream) let write (log: ILogger) (context: EventStoreContext) stream (span: Event[]) ct = task { @@ -45,7 +44,7 @@ module Internal = #else let! res = context.Sync(log, stream, i - 1L, span |> Array.map (fun span -> span :> _), ct) #endif - let ress = + let res' = match res with | GatewaySyncResult.Written (Token.Unpack pos') -> Result.Ok (pos'.streamVersion + 1L) @@ -54,15 +53,14 @@ module Internal = | actual when actual < i -> Result.PrefixMissing (actual - i |> int, actual) | actual when actual >= i + span.LongLength -> Result.Duplicate actual | actual -> Result.PartialDuplicate actual - log.Debug("Result: {res}", ress) - return ress } + log.Debug("Result: {res}", res') + return res' } type [] ResultKind = TimedOut | Other type Dispatcher = - static member Create(log: ILogger, storeLog, connections: _[], maxDop) = - let writerResultLog = log.ForContext() + static member Create(storeLog, connections: _[], maxDop) = let mutable robin = 0 let attemptWrite stream span _revision ct = task { @@ -71,32 +69,28 @@ module Internal = let maxEvents, maxBytes = 65536, 4 * 1024 * 1024 - (*fudge*)4096 let struct (span, met) = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span try let! res = Writer.write storeLog selectedConnection (FsCodec.StreamName.toString stream) span ct - return Ok struct (res, met) - with e -> return Error struct (e, met) } - let interpretProgress (streams: Scheduling.StreamStates<_>) stream res = - let applyResultToStreamState = function - | Ok struct ((Writer.Result.Ok pos' | Writer.Result.Duplicate pos' | Writer.Result.PartialDuplicate pos'), _stats) -> - streams.TrimEventsPriorTo(stream, pos') - | Ok (Writer.Result.PrefixMissing _, _stats) -> - ValueNone - | Error struct (_stats, _exn) -> - streams.MarkMalformed(stream, false) - let writePos = applyResultToStreamState res - Writer.logTo writerResultLog (stream, res) - struct (res, writePos |> ValueOption.map Buffer.HandlerProgress.ofPos) + let hp = res |> function + | Writer.Result.Ok pos' | Writer.Result.Duplicate pos' | Writer.Result.PartialDuplicate pos' -> Buffer.HandlerProgress.ofPos pos' |> ValueSome + | Writer.Result.PrefixMissing _ -> ValueNone + return Ok struct (res, hp, met) + with e -> return Error struct (e, false, met) } + let interpretProgress = function + | Ok struct (_res, hp, _met) as res -> struct (res, hp, false) + | Error struct (_exn, malformed, _met) as res -> res, ValueNone, malformed Dispatcher.Concurrent<_, _, _, _>.Create(maxDop, project = attemptWrite, interpretProgress = interpretProgress) type WriterResult = Internal.Writer.Result type EventStoreSinkStats(log: ILogger, statsInterval, stateInterval, [] ?failThreshold, [] ?logExternalStats) = - inherit Scheduling.Stats( + inherit Scheduling.Stats, Dispatcher.ExnAndMetrics>( log, statsInterval, stateInterval, ?failThreshold = failThreshold, logExternalStats = defaultArg logExternalStats Log.InternalMetrics.dump) + let writerResultLog = log.ForContext() let mutable okStreams, okEvents, okBytes, exnCats, exnStreams, exnEvents, exnBytes = HashSet(), 0, 0L, Stats.Counters(), HashSet(), 0 , 0L let mutable resultOk, resultDup, resultPartialDup, resultPrefix, resultExn = 0, 0, 0, 0, 0 override _.Handle message = match message with - | { stream = stream; result = Ok (res, (es, us, bs)) } -> + | { stream = stream; result = Ok (res, _hp, (es, us, bs)) as r } -> okStreams.Add stream |> ignore okEvents <- okEvents + es okBytes <- okBytes + int64 bs @@ -105,13 +99,15 @@ type EventStoreSinkStats(log: ILogger, statsInterval, stateInterval, [ resultDup <- resultDup + 1 | WriterResult.PartialDuplicate _ -> resultPartialDup <- resultPartialDup + 1 | WriterResult.PrefixMissing _ -> resultPrefix <- resultPrefix + 1 - base.RecordOk(message, force = (us <> 0)) - | { stream = stream; result = Error (Exception.Inner exn, (es, _us, bs)) } -> + Internal.Writer.logTo writerResultLog stream r + base.RecordOk(message, us <> 0) + | { stream = stream; result = Error (Exception.Inner exn, _malformed, (es, _us, bs)) as r } -> exnCats.Ingest(StreamName.categorize stream) exnStreams.Add stream |> ignore exnEvents <- exnEvents + es exnBytes <- exnBytes + int64 bs resultExn <- resultExn + 1 + Internal.Writer.logTo writerResultLog stream r base.RecordExn(message, OutcomeKind.classify exn, log.ForContext("stream", stream).ForContext("events", es), exn) override _.DumpStats() = let results = resultOk + resultDup + resultPartialDup + resultPrefix @@ -139,7 +135,7 @@ type EventStoreSink = // Tune the sleep time when there are no items to schedule or responses to process. Default 1ms. ?idleDelay, ?ingesterStateInterval, ?commitInterval): SinkPipeline = - let dispatcher = Internal.Dispatcher.Create(log, storeLog, connections, maxConcurrentStreams) + let dispatcher = Internal.Dispatcher.Create(storeLog, connections, maxConcurrentStreams) let scheduler = let dumpStreams logStreamStates _log = logStreamStates Event.storedSize Scheduling.Engine(dispatcher, stats, dumpStreams, pendingBufferSize = 5, ?purgeInterval = purgeInterval, ?idleDelay = idleDelay) diff --git a/src/Propulsion.Kafka/Consumers.fs b/src/Propulsion.Kafka/Consumers.fs index b2ae9c4d..b96e854e 100644 --- a/src/Propulsion.Kafka/Consumers.fs +++ b/src/Propulsion.Kafka/Consumers.fs @@ -182,7 +182,7 @@ module Core = static member Start<'Info, 'Outcome> ( log: ILogger, config: KafkaConsumerConfig, consumeResultToInfo, infoToStreamEvents, prepare, maxDop, handle: Func>, - stats: Scheduling.Stats, + stats: Scheduling.Stats, ?logExternalState, ?purgeInterval, ?wakeForResults, ?idleDelay) = let dumpStreams logStreamStates log = logExternalState |> Option.iter (fun f -> f log) @@ -346,11 +346,11 @@ type Factory private () = select, handle: Func[], CancellationToken, Task * TimeSpan)>>>, stats, ?logExternalState, ?purgeInterval, ?wakeForResults, ?idleDelay) = let handle (items: Scheduling.Item[]) ct - : Task>[]> = task { + : Task>[]> = task { let start = Stopwatch.timestamp () let inline err ts e (x: Scheduling.Item<_>) = let met = StreamSpan.metrics Event.renderedSize x.span - Scheduling.InternalRes.create (x, ts, Result.Error struct (e, met)) + Scheduling.InternalRes.create (x, ts, Result.Error struct (e, false, met)) try let! results = handle.Invoke(items, ct) return Array.ofSeq (Seq.zip items results |> Seq.map (function | item, (Ok index', ts) -> @@ -393,7 +393,7 @@ type Factory private () = // new ones that arrived while the handler was processing are then eligible for retry purposes in the next dispatch cycle) handle: StreamState[] -> Async * TimeSpan)>>, // The responses from each handle invocation are passed to stats for periodic emission - stats, + stats: Propulsion.Streams.Stats, ?logExternalState, ?purgeInterval, ?wakeForResults, ?idleDelay) = let handle' xs ct = handle xs |> Async.executeAsTask ct Factory.StartBatchedAsync<'Info>(log, config, consumeResultToInfo, infoToStreamEvents, select, handle', stats, diff --git a/src/Propulsion/Streams.fs b/src/Propulsion/Streams.fs index fc3061bc..04555020 100755 --- a/src/Propulsion/Streams.fs +++ b/src/Propulsion/Streams.fs @@ -320,7 +320,6 @@ module Scheduling = open Buffer type StreamStates<'Format>() = let states = Dictionary>() - let merge stream (state: StreamState<_>) = match states.TryGetValue stream with | true, current -> @@ -330,9 +329,15 @@ module Scheduling = | false, _ -> states.Add(stream, state) state - let updateWritePos stream isMalformed pos span = - merge stream (StreamState<'Format>.Create(pos, queue = span, revision = Revision.initial, malformed = isMalformed)) |> _.WritePos - + let updateStreamState stream = function + | Error malformed -> + // Flag that the data at the head of the stream is triggering a non-transient error condition from the handler, preventing any further handler dispatches for `stream` + merge stream (StreamState<'Format>.Create(write = ValueNone, queue = null, revision = Revision.initial, malformed = malformed)) |> ignore + | Ok (updatedPos, _dispatchedRevision as up: HandlerProgress) -> + // Ensure we have a position (in case it got purged); Drop any events or unfolds implied by updatedPos + merge stream (StreamState<'Format>.Create(ValueSome updatedPos, queue = null, revision = Revision.initial, malformed = false)) + // Strip unfolds out of the queue if the handler reported the position as unchanged, but the unfolds were included in the invocation + |> StreamState.tryTrimUnfoldsIffPosAndRevisionStill up |> ValueOption.iter (fun trimmed -> states[ stream ] <- trimmed) let purge () = let mutable purged = 0 for x in states do @@ -361,24 +366,14 @@ module Scheduling = | true, ss when not ss.IsEmpty && not ss.IsMalformed && (not requireAll || ss.QueuedIsAtWritePos) && not (busy.Contains s) -> ValueSome ss | _ -> ValueNone - /// Removes events based on an attained write position - member _.TrimEventsPriorTo(stream, pos) = updateWritePos stream false (ValueSome pos) null - /// Flags that the data at the head of the stream is triggering a non-transient error condition from the handler, preventing any further handler dispatches for `stream` - member _.MarkMalformed(stream, isMalformed) = updateWritePos stream isMalformed ValueNone null member _.Merge(buffered: Streams<'Format>) = for kv in buffered.States do merge kv.Key kv.Value |> ignore member _.Purge() = purge () member _.LockForWrite stream = markBusy stream - member _.DropHandledEventsAndUnlock(stream, maybeUpdatedPosAndDispatchedUnfoldsRevision) = - match maybeUpdatedPosAndDispatchedUnfoldsRevision with - | ValueNone -> () - | ValueSome (updatedPos, _dispatchedRevision as up: HandlerProgress) -> - // Ensure we have a position (in case it got purged); Drop any events or unfolds implied by updatedPos - let ss = merge stream (StreamState<'Format>.Create(ValueSome updatedPos, queue = null, revision = Revision.initial, malformed = false)) - // Strip unfolds out of the queue if the handler reported the position as unchanged, but the unfolds were included in the invocation - ss |> StreamState.tryTrimUnfoldsIffPosAndRevisionStill up |> ValueOption.iter (fun trimmed -> states[ stream ] <- trimmed) - + member _.DropHandledEventsAndUnlock(stream, outcome) = + updateStreamState stream outcome + markNotBusy stream member _.Dump(log: ILogger, totalPurged: int, eventSize) = let mutable (busyCount, busyE, busyB), (ready, readyE, readyB), synced = (0, 0, 0L), (0, 0, 0L), 0 let mutable (waiting, waitingE, waitingB), (malformed, malformedE, malformedB) = (0, 0, 0L), (0, 0, 0L) @@ -688,7 +683,7 @@ module Scheduling = (metricsLog |> Log.withMetric m).Information("Outcome {kind} in {ms:n0}ms, progressed: {progressed}", kindTag, r.duration.TotalMilliseconds, progressed) if monitorInterval.IfDueRestart() then monitor.EmitMetrics metricsLog - member x.RecordOk(r, ?force) = x.RecordOutcomeKind(r, OutcomeKind.Ok, r.index' > r.index || defaultArg force false) + member x.RecordOk(r, force) = x.RecordOutcomeKind(r, OutcomeKind.Ok, r.index' > r.index || force) member x.RecordExn(r, k, log, exn) = x.RecordOutcomeKind(r, k, progressed = false) if OutcomeKind.isException k then @@ -796,10 +791,11 @@ module Scheduling = abstract member HasCapacity: bool with get abstract member AwaitCapacity: CancellationToken -> Task abstract member TryReplenish: pending: seq> * handleStarted: (FsCodec.StreamName * int64 -> unit) -> struct (bool * bool) - abstract member InterpretProgress: StreamStates<'F> * FsCodec.StreamName * Result<'P, 'E> -> struct (Result<'R, 'E> * HandlerProgress voption) + abstract member InterpretProgress: Result<'P, 'E> -> ResProgressAndMalformed> and [] Item<'Format> = { stream: FsCodec.StreamName; nextIndex: int64 voption; span: FsCodec.ITimelineEvent<'Format>[]; revision: Revision } and [] InternalRes<'R> = { stream: FsCodec.StreamName; index: int64; event: string; duration: TimeSpan; result: 'R } + and ResProgressAndMalformed<'O> = (struct ('O * Buffer.HandlerProgress voption * bool)) module InternalRes = let inline create (i: Item<_>, d, r) = let h = i.span[0] @@ -878,14 +874,14 @@ module Scheduling = // Ingest information to be gleaned from processing the results into `streams` (i.e. remove stream requirements as they are completed) let handleResult ({ stream = stream; index = i; event = et; duration = duration; result = r }: InternalRes<_>) = - match dispatcher.InterpretProgress(streams, stream, r) with - | Ok _ as r, ValueSome (index', _ as updatedPosAndDispatchedRevision) -> + match dispatcher.InterpretProgress r with + | Ok _ as r, ValueSome (index', _ as updatedPosAndDispatchedRevision), _malformed -> batches.RemoveAttainedRequirements(stream, updatedPosAndDispatchedRevision) - streams.DropHandledEventsAndUnlock(stream, ValueSome updatedPosAndDispatchedRevision) + streams.DropHandledEventsAndUnlock(stream, Ok updatedPosAndDispatchedRevision) stats.Handle { duration = duration; stream = stream; index = i; event = et; index' = index'; result = r } - | Ok _ as r, ValueNone - | (Error _ as r), _ -> - streams.DropHandledEventsAndUnlock(stream, ValueNone) + | Ok _ as r, ValueNone, malformed + | (Error _ as r), _, malformed -> + streams.DropHandledEventsAndUnlock(stream, Error malformed) stats.Handle { duration = duration; stream = stream; index = i; event = et; index' = i; result = r } let tryHandleResults () = tryApplyResults handleResult @@ -1046,12 +1042,12 @@ module Dispatcher = type Concurrent<'P, 'R, 'E, 'F> internal ( inner: ItemDispatcher, 'F>, project: struct (int64 * Scheduling.Item<'F>) -> CancellationToken -> Task>>, - interpretProgress: Scheduling.StreamStates<'F> -> FsCodec.StreamName -> Result<'P, 'E> -> struct (Result<'R, 'E> * Buffer.HandlerProgress voption)) = + interpretProgress: Result<'P, 'E> -> Scheduling.ResProgressAndMalformed>) = static member Create ( maxDop, // NOTE `project` must not throw under any circumstances, or the exception will go unobserved, and DOP will leak in the dispatcher project: FsCodec.StreamName -> FsCodec.ITimelineEvent<'F>[] -> Buffer.Revision -> CancellationToken -> Task>, - interpretProgress: Scheduling.StreamStates<'F> -> FsCodec.StreamName -> Result<'P, 'E> -> struct (Result<'R, 'E> * Buffer.HandlerProgress voption)) = + interpretProgress: Result<'P, 'E> -> Scheduling.ResProgressAndMalformed>) = let project struct (startTs, item: Scheduling.Item<'F>) (ct: CancellationToken) = task { let! res = project item.stream item.span item.revision ct return Scheduling.InternalRes.create (item, Stopwatch.elapsed startTs, res) } @@ -1061,11 +1057,11 @@ module Dispatcher = let struct (span: FsCodec.ITimelineEvent<'F>[], met) = prepare.Invoke(stream, span) try let! struct (outcome, index') = handle.Invoke(stream, span, ct) return Ok struct (outcome, Buffer.HandlerProgress.ofMetricsAndPos revision met index', met) - with e -> return Error struct (e, met) } - let interpretProgress (_streams: Scheduling.StreamStates<'F>) _stream = function - | Ok struct (outcome, hp, met) -> struct (Ok struct (outcome, met), ValueSome hp) - | Error struct (exn, met) -> Error struct (exn, met), ValueNone - Concurrent<_, _, _, 'F>.Create(maxDop, project, interpretProgress) + with e -> return Error struct (e, false, met) } + let interpretProgress = function + | Ok struct (outcome, hp, met) -> struct (Ok struct (outcome, met), ValueSome hp, false) + | Error struct (exn, malformed, met) -> Error struct (exn, malformed, met), ValueNone, malformed + Concurrent<_, _, _, 'F>.Create(maxDop, project, interpretProgress = interpretProgress) interface Scheduling.IDispatcher<'P, 'R, 'E, 'F> with [] override _.Result = inner.Result override _.Pump ct = inner.Pump ct @@ -1073,16 +1069,19 @@ module Dispatcher = override _.HasCapacity = inner.HasCapacity override _.AwaitCapacity(ct) = inner.AwaitCapacity(ct) override _.TryReplenish(pending, handleStarted) = inner.TryReplenish(pending, handleStarted, project) - override _.InterpretProgress(streams, stream, res) = interpretProgress streams stream res + override _.InterpretProgress res = interpretProgress res + + type ResProgressAndMetrics<'O> = (struct ('O * Buffer.HandlerProgress voption * StreamSpan.Metrics)) + type ExnAndMetrics = (struct(exn * bool * StreamSpan.Metrics)) + type NextIndexAndMetrics = (struct(int64 * StreamSpan.Metrics)) /// Implementation of IDispatcher that allows a supplied handler select work and declare completion based on arbitrarily defined criteria type Batched<'F> ( select: Func seq, Scheduling.Item<'F>[]>, // NOTE `handle` must not throw under any circumstances, or the exception will go unobserved - handle: Scheduling.Item<'F>[] -> CancellationToken -> - Task>[]>) = + handle: Scheduling.Item<'F>[] -> CancellationToken -> Task>[]>) = let inner = DopDispatcher 1 - let result = Event>>() + let result = Event>>() // On each iteration, we offer the ordered work queue to the selector // we propagate the selected streams to the handler @@ -1098,7 +1097,7 @@ module Dispatcher = hasCapacity <- false struct (dispatched, hasCapacity) - interface Scheduling.IDispatcher with + interface Scheduling.IDispatcher with [] override _.Result = result.Publish override _.Pump ct = task { use _ = inner.Result.Subscribe(Array.iter result.Trigger) @@ -1107,15 +1106,15 @@ module Dispatcher = override _.HasCapacity = inner.HasCapacity override _.AwaitCapacity(ct) = inner.AwaitButRelease(ct) override _.TryReplenish(pending, handleStarted) = trySelect pending handleStarted - override _.InterpretProgress(_streams: Scheduling.StreamStates<_>, _stream: FsCodec.StreamName, res: Result<_, _>) = + override _.InterpretProgress(res: Result<_, _>) = match res with - | Ok (pos', met) -> Ok ((), met), ValueSome (Buffer.HandlerProgress.ofPos pos') - | Error (exn, met) -> Error (exn, met), ValueNone + | Ok (pos', met) -> Ok ((), met), ValueSome (Buffer.HandlerProgress.ofPos pos'), false + | Error (exn, malformed, met) -> Error (exn, malformed, met), ValueNone, malformed [] type Stats<'Outcome>(log: ILogger, statsInterval, statesInterval, [] ?failThreshold, [] ?abendThreshold, [] ?logExternalStats) = - inherit Scheduling.Stats( + inherit Scheduling.Stats( log, statsInterval, statesInterval, ?failThreshold = failThreshold, ?abendThreshold = abendThreshold, ?logExternalStats = logExternalStats) let mutable okStreams, okEvents, okUnfolds, okBytes, exnStreams, exnCats, exnEvents, exnUnfolds, exnBytes = HashSet(), 0, 0, 0L, HashSet(), Stats.Counters(), 0, 0, 0L let mutable resultOk, resultExn = 0, 0 @@ -1142,9 +1141,9 @@ type Stats<'Outcome>(log: ILogger, statsInterval, statesInterval, okUnfolds <- okUnfolds + us okBytes <- okBytes + int64 bs resultOk <- resultOk + 1 - base.RecordOk(res, force = (us <> 0)) + base.RecordOk(res, us <> 0) this.HandleOk outcome - | { duration = duration; stream = stream; index = index; event = et; result = Error (Exception.Inner exn, (es, us, bs)) } -> + | { duration = duration; stream = stream; index = index; event = et; result = Error (Exception.Inner exn, _malformed, (es, us, bs)) } -> exnCats.Ingest(StreamName.categorize stream) exnStreams.Add stream |> ignore exnEvents <- exnEvents + es @@ -1243,17 +1242,17 @@ type Batched private () = ( log: ILogger, maxReadAhead, select: Func seq, Scheduling.Item<'F>[]>, handle: Func[], CancellationToken, Task * TimeSpan)>>>, - eventSize, stats: Scheduling.Stats<_, _>, + eventSize, stats: Scheduling.Stats, [] ?pendingBufferSize, [] ?purgeInterval, [] ?wakeForResults, [] ?idleDelay, [] ?requireAll, [] ?ingesterStateInterval, [] ?commitInterval) : Propulsion.SinkPipeline seq>> = let handle (items: Scheduling.Item<'F>[]) ct - : Task>[]> = task { + : Task>[]> = task { let start = Stopwatch.timestamp () let err ts e (x: Scheduling.Item<_>) = let met = StreamSpan.metrics eventSize x.span - Scheduling.InternalRes.create (x, ts, Error struct (e, met)) + Scheduling.InternalRes.create (x, ts, Error struct (e, false, met)) try let! results = handle.Invoke(items, ct) return Array.ofSeq (Seq.zip items results |> Seq.map (function | item, (Ok index', ts) -> diff --git a/src/Propulsion/Sync.fs b/src/Propulsion/Sync.fs index 0fd3bc77..87768d6d 100644 --- a/src/Propulsion/Sync.fs +++ b/src/Propulsion/Sync.fs @@ -9,7 +9,7 @@ open System.Collections.Generic [] type Stats<'Outcome>(log: ILogger, statsInterval, stateInterval, [] ?failThreshold) = - inherit Scheduling.Stats(log, statsInterval, stateInterval, ?failThreshold = failThreshold) + inherit Scheduling.Stats(log, statsInterval, stateInterval, ?failThreshold = failThreshold) let mutable okStreams, okEvents, okUnfolds, okBytes, exnStreams, exnEvents, exnUnfolds, exnBytes = HashSet(), 0, 0, 0L, HashSet(), 0, 0, 0L let prepareStats = Stats.LatencyStats("prepare") override _.DumpStats() = @@ -30,9 +30,9 @@ type Stats<'Outcome>(log: ILogger, statsInterval, stateInterval, [] ? okUnfolds <- okUnfolds + us okBytes <- okBytes + int64 bs prepareStats.Record prepareElapsed - base.RecordOk(message, force = (us <> 0)) + base.RecordOk(message, us <> 0) this.HandleOk outcome - | { stream = stream; result = Error (Exception.Inner exn, (es, us, bs)) } -> + | { stream = stream; result = Error (Exception.Inner exn, _malformed, (es, us, bs)) } -> exnStreams.Add stream |> ignore exnEvents <- exnEvents + es exnUnfolds <- exnUnfolds + us @@ -58,18 +58,18 @@ type Factory private () = let prepareTs = Stopwatch.timestamp () try let! outcome, index' = handle.Invoke(stream, trimmed, ct) return Ok struct (outcome, Buffer.HandlerProgress.ofMetricsAndPos revision met index', met, Stopwatch.elapsed prepareTs) - with e -> return Error struct (e, met) } + with e -> return Error struct (e, false, met) } - let interpretProgress _streams _stream = function - | Ok struct (outcome, progress, met: StreamSpan.Metrics, prep) -> struct (Ok struct (outcome, met, prep), ValueSome progress) - | Error struct (exn: exn, met) -> Error struct (exn, met), ValueNone + let interpretProgress = function + | Ok struct (outcome, hp, met: StreamSpan.Metrics, prep) -> struct (Ok struct (outcome, met, prep), ValueSome hp, false) + | Error struct (exn: exn, malformed, met) -> Error struct (exn, malformed, met), ValueNone, malformed - let dispatcher: Scheduling.IDispatcher<_, _, _, _> = Dispatcher.Concurrent<_, _, _, _>.Create(maxConcurrentStreams, attemptWrite, interpretProgress) + let dispatcher: Scheduling.IDispatcher<_, _, _, _> = Dispatcher.Concurrent<_, _, _, _>.Create(maxConcurrentStreams, attemptWrite, interpretProgress = interpretProgress) let dumpStreams logStreamStates log = logStreamStates eventSize match dumpExternalStats with Some f -> f log | None -> () let scheduler = - Scheduling.Engine + Scheduling.Engine (dispatcher, stats, dumpStreams, pendingBufferSize = maxReadAhead, ?idleDelay = idleDelay, ?purgeInterval = purgeInterval) Factory.Start(log, scheduler.Pump, maxReadAhead, scheduler,