-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat!(Streams): Support propagating Unfolds; remove StreamResult #264
Changes from 1 commit
c4c09df
a112eb7
207269f
a46e104
a9261d8
d32a4a5
bcacc6c
642d189
50d6eb8
b375d90
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,7 +29,7 @@ module Pruner = | |
let res = if deleted = 0 && deferred = 0 then Nop span.Length else Ok (deleted, deferred) | ||
// For case where we discover events have already been deleted beyond our requested position, signal to reader to drop events | ||
let writePos = max trimmedPos (untilIndex + 1L) | ||
return struct (writePos, res) } | ||
return struct (res, writePos) } | ||
|
||
type CosmosStorePrunerStats(log, statsInterval, stateInterval, [<O; D null>] ?failThreshold) = | ||
inherit Propulsion.Streams.Stats<Pruner.Outcome>(log, statsInterval, stateInterval, ?failThreshold = failThreshold) | ||
|
@@ -75,8 +75,8 @@ type CosmosStorePruner = | |
#endif | ||
let interpret _stream span = | ||
let metrics = StreamSpan.metrics Event.storedSize span | ||
struct (metrics, span) | ||
Dispatcher.Concurrent<_, _, _, _>.Create(maxConcurrentStreams, interpret, Pruner.handle pruneUntil, (fun _ r -> r)) | ||
struct (span, metrics) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. any extra stuff layered on goes after the primary data in the tuple - no idea how/why I ended up having it backwards, but these signatures are undergoing change as part of putting the pos after the result anyway |
||
Dispatcher.Concurrent<_, _, _, _>.Create(maxConcurrentStreams, interpret, Pruner.handle pruneUntil) | ||
let dumpStreams logStreamStates _log = logStreamStates Event.storedSize | ||
let scheduler = Scheduling.Engine(dispatcher, stats, dumpStreams, pendingBufferSize = 5, | ||
?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,8 +15,7 @@ module private Impl = | |
type EventBody = byte[] // V4 defines one directly, here we shim it | ||
module StreamSpan = | ||
|
||
let private toNativeEventBody (xs: Propulsion.Sinks.EventBody): byte[] = xs.ToArray() | ||
let defaultToNative_ = FsCodec.Core.TimelineEvent.Map toNativeEventBody | ||
let toNativeEventBody (xs: Propulsion.Sinks.EventBody): byte[] = xs.ToArray() | ||
// Trimmed edition of what V4 exposes | ||
module internal Equinox = | ||
module CosmosStore = | ||
|
@@ -34,10 +33,9 @@ module private Impl = | |
|
||
// v4 and later use JsonElement, but Propulsion is using ReadOnlyMemory<byte> rather than assuming and/or offering optimization for JSON bodies | ||
open System.Text.Json | ||
let private toNativeEventBody (x: EventBody): JsonElement = | ||
let toNativeEventBody (x: EventBody): JsonElement = | ||
if x.IsEmpty then JsonElement() | ||
else JsonSerializer.Deserialize(x.Span) | ||
let defaultToNative_ = FsCodec.Core.TimelineEvent.Map toNativeEventBody | ||
#endif | ||
|
||
module Internal = | ||
|
@@ -49,38 +47,42 @@ module Internal = | |
type [<NoComparison; NoEquality; RequireQualifiedAccess>] Result = | ||
| Ok of updatedPos: int64 | ||
| Duplicate of updatedPos: int64 | ||
| PartialDuplicate of overage: Event[] | ||
| PrefixMissing of batch: Event[] * writePos: int64 | ||
let logTo (log: ILogger) malformed (res: StreamName * Result<struct (StreamSpan.Metrics * Result), struct (StreamSpan.Metrics * exn)>) = | ||
| PartialDuplicate of updatedPos: int64 | ||
| PrefixMissing of gap: int * actualPos: int64 | ||
let logTo (log: ILogger) malformed (res: StreamName * Result<struct (Result * StreamSpan.Metrics), struct (exn * StreamSpan.Metrics)>) = | ||
match res with | ||
| stream, Ok (_, Result.Ok pos) -> | ||
| stream, Ok (Result.Ok pos, _) -> | ||
log.Information("Wrote {stream} up to {pos}", stream, pos) | ||
| stream, Ok (_, Result.Duplicate updatedPos) -> | ||
| stream, Ok (Result.Duplicate updatedPos, _) -> | ||
log.Information("Ignored {stream} (synced up to {pos})", stream, updatedPos) | ||
| stream, Ok (_, Result.PartialDuplicate overage) -> | ||
log.Information("Requeuing {stream} {pos} ({count} events)", stream, overage[0].Index, overage.Length) | ||
| stream, Ok (_, Result.PrefixMissing (batch, pos)) -> | ||
log.Information("Waiting {stream} missing {gap} events ({count} events @ {pos})", stream, batch[0].Index - pos, batch.Length, batch[0].Index) | ||
| stream, Error (_, exn) -> | ||
| stream, Ok (Result.PartialDuplicate updatedPos, _) -> | ||
log.Information("Requeuing {stream} {pos}", stream, updatedPos) | ||
| stream, Ok (Result.PrefixMissing (gap, pos), _) -> | ||
log.Information("Waiting {stream} missing {gap} events before {pos}", stream, gap, pos) | ||
| stream, Error (exn, _) -> | ||
let level = if malformed then LogEventLevel.Warning else Events.LogEventLevel.Information | ||
log.Write(level, exn, "Writing {stream} failed, retrying", stream) | ||
|
||
let write (log: ILogger) (ctx: EventsContext) stream (span: Event[]) ct = task { | ||
log.Debug("Writing {s}@{i}x{n}", stream, span[0].Index, span.Length) | ||
let i = StreamSpan.index span | ||
let n = StreamSpan.nextIndex span | ||
span |> Seq.iter (fun x -> if x.IsUnfold then invalidOp "CosmosStore3 does not [yet] support ingesting unfolds") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. prep for #263 |
||
log.Debug("Writing {s}@{i}x{n}", stream, i, span.Length) | ||
let mapData = FsCodec.Core.EventData.Map StreamSpan.toNativeEventBody | ||
#if COSMOSV3 | ||
let! res = ctx.Sync(stream, { index = span[0].Index; etag = None }, span |> Array.map (fun x -> StreamSpan.defaultToNative_ x :> _)) | ||
let! res = ctx.Sync(stream, { index = i; etag = None }, span |> Array.map mapData) | ||
|> Async.executeAsTask ct | ||
#else | ||
let! res = ctx.Sync(stream, { index = span[0].Index; etag = None }, span |> Array.map (fun x -> StreamSpan.defaultToNative_ x :> _), ct) | ||
let! res = ctx.Sync(stream, { index = i; etag = None }, span |> Array.map mapData, ct) | ||
#endif | ||
let res' = | ||
match res with | ||
| AppendResult.Ok pos -> Result.Ok pos.index | ||
| AppendResult.Conflict (pos, _) | AppendResult.ConflictUnknown pos -> | ||
match pos.index with | ||
| actual when actual < span[0].Index -> Result.PrefixMissing (span, actual) | ||
| actual when actual >= span[0].Index + span.LongLength -> Result.Duplicate actual | ||
| actual -> Result.PartialDuplicate (span |> Array.skip (actual - span[0].Index |> int)) | ||
| actual when actual < i -> Result.PrefixMissing (actual - i |> int, actual) | ||
| actual when actual >= n -> Result.Duplicate actual | ||
| actual -> Result.PartialDuplicate actual | ||
log.Debug("Result: {res}", res') | ||
return res' } | ||
let containsMalformedMessage e = | ||
|
@@ -103,40 +105,42 @@ module Internal = | |
let maxEvents, maxBytes = defaultArg maxEvents 16384, defaultArg maxBytes (256 * 1024) | ||
let writerResultLog = log.ForContext<Writer.Result>() | ||
let attemptWrite stream span ct = task { | ||
let struct (met, span') = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span | ||
let struct (span, met) = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span | ||
#if COSMOSV3 | ||
try let! res = Writer.write log eventsContext (StreamName.toString stream) span' ct | ||
try let! res = Writer.write log eventsContext (StreamName.toString stream) span ct | ||
#else | ||
try let! res = Writer.write log eventsContext stream span' ct | ||
try let! res = Writer.write log eventsContext stream span ct | ||
#endif | ||
return Ok struct (met, res) | ||
with e -> return Error struct (met, e) } | ||
return Ok struct (res, met) | ||
with e -> return Error struct (e, met) } | ||
let interpretProgress (streams: Scheduling.StreamStates<_>) stream res = | ||
let applyResultToStreamState = function | ||
| Ok struct (_stats, Writer.Result.Ok pos) -> struct (streams.RecordWriteProgress(stream, pos, null), false) | ||
| Ok (_stats, Writer.Result.Duplicate pos) -> streams.RecordWriteProgress(stream, pos, null), false | ||
| Ok (_stats, Writer.Result.PartialDuplicate overage) -> streams.RecordWriteProgress(stream, overage[0].Index, [| overage |]), false | ||
| Ok (_stats, Writer.Result.PrefixMissing (overage, pos)) -> streams.RecordWriteProgress(stream, pos, [| overage |]), false | ||
| Error struct (_stats, exn) -> | ||
| Ok struct ((Writer.Result.Ok pos' | Writer.Result.Duplicate pos' | Writer.Result.PartialDuplicate pos'), _stats) -> | ||
let ss = streams.RecordWriteProgress(stream, pos', null) | ||
struct (ss.WritePos, false) | ||
| Ok (Writer.Result.PrefixMissing _, _stats) -> | ||
streams.WritePos(stream), false | ||
| Error struct (exn, _stats) -> | ||
let malformed = Writer.classify exn |> Writer.isMalformed | ||
streams.SetMalformed(stream, malformed), malformed | ||
let struct (ss, malformed) = applyResultToStreamState res | ||
let ss = streams.SetMalformed(stream, malformed) | ||
ss.WritePos, malformed | ||
let struct (writePos, malformed) = applyResultToStreamState res | ||
Writer.logTo writerResultLog malformed (stream, res) | ||
struct (ss.WritePos, res) | ||
struct (res, writePos) | ||
Dispatcher.Concurrent<_, _, _, _>.Create(itemDispatcher, attemptWrite, interpretProgress) | ||
|
||
type WriterResult = Internal.Writer.Result | ||
|
||
type CosmosStoreSinkStats(log: ILogger, statsInterval, stateInterval, [<O; D null>] ?failThreshold, [<O; D null>] ?logExternalStats) = | ||
inherit Scheduling.Stats<struct (StreamSpan.Metrics * WriterResult), struct (StreamSpan.Metrics * exn)>( | ||
inherit Scheduling.Stats<struct (WriterResult * StreamSpan.Metrics), struct (exn * StreamSpan.Metrics)>( | ||
log, statsInterval, stateInterval, ?failThreshold = failThreshold, | ||
logExternalStats = defaultArg logExternalStats Equinox.CosmosStore.Core.Log.InternalMetrics.dump) | ||
let mutable okStreams, okEvents, okBytes = HashSet(), 0, 0L | ||
let mutable exnCats, exnStreams, exnEvents, exnBytes = Stats.Counters(), HashSet(), 0, 0L | ||
let mutable resultOk, resultDup, resultPartialDup, resultPrefix, resultExn = 0, 0, 0, 0, 0 | ||
override _.Handle message = | ||
match message with | ||
| { stream = stream; result = Ok ((es, bs), res) } -> | ||
| { stream = stream; result = Ok (res, (es, bs)) } -> | ||
okStreams.Add stream |> ignore | ||
okEvents <- okEvents + es | ||
okBytes <- okBytes + int64 bs | ||
|
@@ -146,7 +150,7 @@ type CosmosStoreSinkStats(log: ILogger, statsInterval, stateInterval, [<O; D nul | |
| WriterResult.PartialDuplicate _ -> resultPartialDup <- resultPartialDup + 1 | ||
| WriterResult.PrefixMissing _ -> resultPrefix <- resultPrefix + 1 | ||
base.RecordOk(message) | ||
| { stream = stream; result = Error ((es, bs), Exception.Inner exn) } -> | ||
| { stream = stream; result = Error (Exception.Inner exn, (es, bs)) } -> | ||
exnCats.Ingest(StreamName.categorize stream) | ||
exnStreams.Add stream |> ignore | ||
exnEvents <- exnEvents + es | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for alignment with
res, events
with equinox result tuples