Skip to content

Commit

Permalink
Initial impl
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jul 22, 2024
1 parent 52b1405 commit 3f25d69
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 178 deletions.
74 changes: 38 additions & 36 deletions src/Propulsion.CosmosStore/CosmosStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ module Internal =
| Duplicate of updatedPos: int64
| PartialDuplicate of updatedPos: int64
| PrefixMissing of gap: int * actualPos: int64
let logTo (log: ILogger) malformed (res: StreamName * Result<struct (Result * StreamSpan.Metrics), struct (exn * StreamSpan.Metrics)>) =
let logTo (log: ILogger) malformed (res: StreamName * Result<struct (Result * StreamSpan.Metrics * Buffer.Revision), struct (exn * StreamSpan.Metrics)>) =
match res with
| stream, Ok (Result.Ok pos, _) ->
| stream, Ok (Result.Ok pos, _, _) ->
log.Information("Wrote {stream} up to {pos}", stream, pos)
| stream, Ok (Result.Duplicate updatedPos, _) ->
| stream, Ok (Result.Duplicate updatedPos, _, _) ->
log.Information("Ignored {stream} (synced up to {pos})", stream, updatedPos)
| stream, Ok (Result.PartialDuplicate updatedPos, _) ->
| stream, Ok (Result.PartialDuplicate updatedPos, _, _) ->
log.Information("Requeuing {stream} {pos}", stream, updatedPos)
| stream, Ok (Result.PrefixMissing (gap, pos), _) ->
| stream, Ok (Result.PrefixMissing (gap, pos), _, _) ->
log.Information("Waiting {stream} missing {gap} events before {pos}", stream, gap, pos)
| stream, Error (exn, _) ->
let level = if malformed then LogEventLevel.Warning else Events.LogEventLevel.Information
Expand Down Expand Up @@ -106,58 +106,60 @@ module Internal =
static member Create(log: ILogger, eventsContext, itemDispatcher, ?maxEvents, ?maxBytes) =
let maxEvents, maxBytes = defaultArg maxEvents 16384, defaultArg maxBytes (256 * 1024)
let writerResultLog = log.ForContext<Writer.Result>()
let attemptWrite stream span ct = task {
let struct (span, met) = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span
let attemptWrite stream span revision ct = task {
let struct (trimmed, met) = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span
#if COSMOSV3
try let! res = Writer.write log eventsContext (StreamName.toString stream) span ct
try let! res = Writer.write log eventsContext (StreamName.toString stream) trimmed ct
#else
try let! res = Writer.write log eventsContext stream span ct
try let! res = Writer.write log eventsContext stream trimmed ct
#endif
return Ok struct (res, met)
return Ok struct (res, met, revision)
with e -> return Error struct (e, met) }
let interpretProgress (streams: Scheduling.StreamStates<_>) stream res =
let applyResultToStreamState = function
| Ok struct (Writer.Result.Ok pos', _stats) ->
struct (streams.SetWritePos(stream, pos'), false)
| Ok ((Writer.Result.Duplicate pos' | Writer.Result.PartialDuplicate pos'), _stats) ->
streams.SetWritePos(stream, pos') |> ignore // Trim any events that conflict with stuff already in the store
ValueNone, false // Don't declare progress until any unfolds have been handled
| Ok (Writer.Result.PrefixMissing _, _stats) ->
streams.WritePos(stream), false
| Error struct (exn, _stats) ->
| Ok struct (Writer.Result.Ok pos', met, revision) ->
streams.TrimEventsPriorTo(stream, pos') |> ignore
struct (Buffer.HandlerProgress.ofMetricsAndPos revision met pos' |> ValueSome, false)
| Ok ((Writer.Result.Duplicate pos' | Writer.Result.PartialDuplicate pos'), _met, _revision) ->
streams.TrimEventsPriorTo(stream, pos') |> ValueOption.map Buffer.HandlerProgress.ofPos, false // Trim any events that conflict with stuff already in the store
| Ok (Writer.Result.PrefixMissing _, _met, _revision) ->
ValueNone, false
| Error struct (exn, _met) ->
let malformed = Writer.classify exn |> Writer.isMalformed
streams.MarkMalformed(stream, malformed), malformed
let struct (writePos, malformed) = applyResultToStreamState res
streams.MarkMalformed(stream, malformed) |> ignore
ValueNone, malformed
let struct (handlerProgress, malformed) = applyResultToStreamState res
Writer.logTo writerResultLog malformed (stream, res)
struct (res, writePos)
Dispatcher.Concurrent<_, _, _, _>.Create(itemDispatcher, attemptWrite, interpretProgress)
struct (res, handlerProgress)
Dispatcher.Concurrent<_, _, _, _>.Create(itemDispatcher, attemptWrite, interpretProgress = interpretProgress)

type WriterResult = Internal.Writer.Result

type CosmosStoreSinkStats(log: ILogger, statsInterval, stateInterval, [<O; D null>] ?failThreshold, [<O; D null>] ?logExternalStats) =
inherit Scheduling.Stats<struct (WriterResult * StreamSpan.Metrics), struct (exn * StreamSpan.Metrics)>(
inherit Scheduling.Stats<struct (WriterResult * StreamSpan.Metrics * Buffer.Revision), struct (exn * StreamSpan.Metrics)>(
log, statsInterval, stateInterval, ?failThreshold = failThreshold,
logExternalStats = defaultArg logExternalStats Equinox.CosmosStore.Core.Log.InternalMetrics.dump)
let mutable okStreams, okEvents, okBytes = HashSet(), 0, 0L
let mutable exnCats, exnStreams, exnEvents, exnBytes = Stats.Counters(), HashSet(), 0, 0L
let mutable okStreams, okEvents, okUnfolds, okBytes = HashSet(), 0, 0, 0L
let mutable exnCats, exnStreams, exnEvents, exnUnfolds, exnBytes = Stats.Counters(), HashSet(), 0, 0, 0L
let mutable resultOk, resultDup, resultPartialDup, resultPrefix, resultExn = 0, 0, 0, 0, 0
override _.Handle message =
match message with
| { stream = stream; result = Ok (res, (es, bs)) } ->
| { stream = stream; result = Ok (res, (es, us, bs), _revision) } ->
okStreams.Add stream |> ignore
okEvents <- okEvents + es
okUnfolds <- okUnfolds + us
okBytes <- okBytes + int64 bs
match res with
| WriterResult.Ok _ -> resultOk <- resultOk + 1
| WriterResult.Duplicate _ -> resultDup <- resultDup + 1
| WriterResult.PartialDuplicate _ -> resultPartialDup <- resultPartialDup + 1
| WriterResult.PrefixMissing _ -> resultPrefix <- resultPrefix + 1
// Plain RecordOk is insufficient as the lack of movement in the index would ordinarily imply it's stalled
base.RecordOk(message, progressed = true)
| { stream = stream; result = Error (Exception.Inner exn, (es, bs)) } ->
base.RecordOk(message, force = (us <> 0))
| { stream = stream; result = Error (Exception.Inner exn, (es, us, bs)) } ->
exnCats.Ingest(StreamName.categorize stream)
exnStreams.Add stream |> ignore
exnEvents <- exnEvents + es
exnUnfolds <- exnUnfolds + us
exnBytes <- exnBytes + int64 bs
resultExn <- resultExn + 1
let kind =
Expand All @@ -167,16 +169,16 @@ type CosmosStoreSinkStats(log: ILogger, statsInterval, stateInterval, [<O; D nul
| Internal.Writer.ResultKind.TooLarge -> OutcomeKind.Tagged "tooLarge"
| Internal.Writer.ResultKind.Malformed -> OutcomeKind.Tagged "malformed"
| Internal.Writer.ResultKind.Other -> OutcomeKind.Exn
base.RecordExn(message, kind, log.ForContext("stream", stream).ForContext("events", es), exn)
base.RecordExn(message, kind, log.ForContext("stream", stream).ForContext("events", es).ForContext("unfolds", us), exn)
override _.DumpStats() =
let results = resultOk + resultDup + resultPartialDup + resultPrefix
log.Information("Completed {mb:n0}MB {completed:n0}r {streams:n0}s {events:n0}e ({ok:n0} ok {dup:n0} redundant {partial:n0} partial {prefix:n0} waiting)",
Log.miB okBytes, results, okStreams.Count, okEvents, resultOk, resultDup, resultPartialDup, resultPrefix)
okStreams.Clear(); resultOk <- 0; resultDup <- 0; resultPartialDup <- 0; resultPrefix <- 0; okEvents <- 0; okBytes <- 0L
log.Information("Completed {mb:n0}MB {completed:n0}r {streams:n0}s {events:n0}e {unfolds:n0}u ({ok:n0} ok {dup:n0} redundant {partial:n0} partial {prefix:n0} waiting)",
Log.miB okBytes, results, okStreams.Count, okEvents, okUnfolds, resultOk, resultDup, resultPartialDup, resultPrefix)
okStreams.Clear(); resultOk <- 0; resultDup <- 0; resultPartialDup <- 0; resultPrefix <- 0; okEvents <- 0; okUnfolds <-0; okBytes <- 0L
if exnCats.Any then
log.Warning(" Exceptions {mb:n0}MB {fails:n0}r {streams:n0}s {events:n0}e",
Log.miB exnBytes, resultExn, exnStreams.Count, exnEvents)
resultExn <- 0; exnBytes <- 0L; exnEvents <- 0
log.Warning(" Exceptions {mb:n0}MB {fails:n0}r {streams:n0}s {events:n0}e {unfolds:n0}u",
Log.miB exnBytes, resultExn, exnStreams.Count, exnEvents, exnUnfolds)
resultExn <- 0; exnBytes <- 0L; exnEvents <- 0; exnUnfolds <- 0
log.Warning(" Affected cats {@exnCats} Streams {@exnStreams}",
exnCats.StatsDescending |> Seq.truncate 50, exnStreams |> Seq.truncate 100)
exnCats.Clear(); exnStreams.Clear()
Expand Down
16 changes: 8 additions & 8 deletions src/Propulsion.EventStore/EventStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ module Internal =
let writerResultLog = log.ForContext<Writer.Result>()
let mutable robin = 0

let attemptWrite stream span ct = task {
let attemptWrite stream span _revision ct = task {
let index = System.Threading.Interlocked.Increment(&robin) % connections.Length
let selectedConnection = connections[index]
let maxEvents, maxBytes = 65536, 4 * 1024 * 1024 - (*fudge*)4096
Expand All @@ -76,15 +76,15 @@ module Internal =
let interpretProgress (streams: Scheduling.StreamStates<_>) stream res =
let applyResultToStreamState = function
| Ok struct ((Writer.Result.Ok pos' | Writer.Result.Duplicate pos' | Writer.Result.PartialDuplicate pos'), _stats) ->
streams.SetWritePos(stream, pos')
streams.TrimEventsPriorTo(stream, pos')
| Ok (Writer.Result.PrefixMissing _, _stats) ->
streams.WritePos(stream)
ValueNone
| Error struct (_stats, _exn) ->
streams.MarkMalformed(stream, false)
let writePos = applyResultToStreamState res
Writer.logTo writerResultLog (stream, res)
struct (res, writePos)
Dispatcher.Concurrent<_, _, _, _>.Create(maxDop, attemptWrite, interpretProgress)
struct (res, writePos |> ValueOption.map Buffer.HandlerProgress.ofPos)
Dispatcher.Concurrent<_, _, _, _>.Create(maxDop, project = attemptWrite, interpretProgress = interpretProgress)

type WriterResult = Internal.Writer.Result

Expand All @@ -96,7 +96,7 @@ type EventStoreSinkStats(log: ILogger, statsInterval, stateInterval, [<O; D null
let mutable resultOk, resultDup, resultPartialDup, resultPrefix, resultExn = 0, 0, 0, 0, 0
override _.Handle message =
match message with
| { stream = stream; result = Ok (res, (es, bs)) } ->
| { stream = stream; result = Ok (res, (es, us, bs)) } ->
okStreams.Add stream |> ignore
okEvents <- okEvents + es
okBytes <- okBytes + int64 bs
Expand All @@ -105,8 +105,8 @@ type EventStoreSinkStats(log: ILogger, statsInterval, stateInterval, [<O; D null
| WriterResult.Duplicate _ -> resultDup <- resultDup + 1
| WriterResult.PartialDuplicate _ -> resultPartialDup <- resultPartialDup + 1
| WriterResult.PrefixMissing _ -> resultPrefix <- resultPrefix + 1
base.RecordOk(message)
| { stream = stream; result = Error (Exception.Inner exn, (es, bs)) } ->
base.RecordOk(message, force = (us <> 0))
| { stream = stream; result = Error (Exception.Inner exn, (es, _us, bs)) } ->
exnCats.Ingest(StreamName.categorize stream)
exnStreams.Add stream |> ignore
exnEvents <- exnEvents + es
Expand Down
Loading

0 comments on commit 3f25d69

Please sign in to comment.