diff --git a/src/Propulsion.CosmosStore/CosmosStoreSink.fs b/src/Propulsion.CosmosStore/CosmosStoreSink.fs index d9870b90..2904edff 100644 --- a/src/Propulsion.CosmosStore/CosmosStoreSink.fs +++ b/src/Propulsion.CosmosStore/CosmosStoreSink.fs @@ -121,7 +121,7 @@ module Internal = struct (streams.SetWritePos(stream, pos'), false) | Ok ((Writer.Result.Duplicate pos' | Writer.Result.PartialDuplicate pos'), _stats) -> streams.SetWritePos(stream, pos') |> ignore // throw away the events (but not the unfolds) - ValueNone, false // Don't declare progress + ValueNone, false // Don't declare progress yet, until any unfolds have been handled | Ok (Writer.Result.PrefixMissing _, _stats) -> streams.WritePos(stream), false | Error struct (exn, _stats) -> diff --git a/tools/Propulsion.Tool/Sync.fs b/tools/Propulsion.Tool/Sync.fs index af362f8b..875977bf 100644 --- a/tools/Propulsion.Tool/Sync.fs +++ b/tools/Propulsion.Tool/Sync.fs @@ -12,10 +12,11 @@ type [] Parameters = | [] FromTail | [] Follow | [] RequireAll + | [] EventsOnly | [] Categorize | [] MaxItems of int - | [] IncSys + | [] ExcSys | [] IncCat of regex: string | [] ExcCat of regex: string | [] IncStream of regex: string @@ -37,10 +38,11 @@ type [] Parameters = "NOTE normally a large `MaxReadAhead` and `cosmos -b` is required to avoid starving the scheduler. " + "NOTE This mode does not make sense to apply unless the ProcessorName is fresh; if the consumer group name is not fresh (and hence items are excluded from the feed), there will inevitably be missing events, and processing will stall. " + "Default: assume events arrive from the changefeed (and/or the input JSON file) without any gaps or out of order deliveries for any stream." + | EventsOnly -> "Exclude Unfolds from processing. Default: Unfolds are read, parsed and processed" | Categorize -> "Gather handler latency stats by category" | MaxItems _ -> "Limits RU consumption when reading; impacts checkpointing granularity by adjusting the batch size being loaded from the feed. Default (Sync): 9999. Default: Unlimited" - | IncSys -> "Include System streams. Default: Exclude Index Streams, identified by a $ prefix." + | ExcSys -> "Exclude System streams. Default: Include Index Streams, identified by a $ prefix." | IncCat _ -> "Allow Stream Category. Multiple values are combined with OR. Default: include all, subject to Category Deny and Stream Deny rules." | ExcCat _ -> "Deny Stream Category. Specified values/regexes are applied after the Category Allow rule(s)." | IncStream _ -> "Allow Stream Name. Multiple values are combined with OR. Default: Allow all streams that pass the category Allow test, Fail the Category and Stream deny tests." @@ -56,9 +58,10 @@ and Arguments(c, p: ParseResults) = member val Filters = Propulsion.StreamFilter( allowCats = p.GetResults IncCat, denyCats = p.GetResults ExcCat, allowSns = p.GetResults IncStream, denySns = p.GetResults ExcStream, - includeSystem = p.Contains IncSys, + includeSystem = not (p.Contains ExcSys), allowEts = p.GetResults IncEvent, denyEts = p.GetResults ExcEvent) member val Categorize = p.Contains Categorize + member val IncludeUnfolds = not (p.Contains EventsOnly) member val Command = match p.GetSubCommand() with | Kafka a -> KafkaArguments(c, a) |> SubCommand.Kafka @@ -233,7 +236,10 @@ let run appName (c: Args.Configuration, p: ParseResults) = async { Some p | SubCommand.Stats _ | SubCommand.Sync _ -> None let isFileSource = match a.Command.Source with Json _ -> true | _ -> false - let parse = a.Filters.CreateStreamFilter() |> Propulsion.CosmosStore.EquinoxSystemTextJsonParser.eventsAndUnfoldsWhereStream + let parse = + a.Filters.CreateStreamFilter() + |> if a.IncludeUnfolds then Propulsion.CosmosStore.EquinoxSystemTextJsonParser.eventsAndUnfoldsWhereStream + else Propulsion.CosmosStore.EquinoxSystemTextJsonParser.whereStream let statsInterval, stateInterval = a.StatsInterval, a.StateInterval let maxReadAhead = p.GetResult(MaxReadAhead, if isFileSource then 32768 else 2) let maxConcurrentProcessors = p.GetResult(MaxWriters, 8)