Skip to content

Commit

Permalink
Default args to include sys and unfolds
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jul 18, 2024
1 parent 2f919ce commit 685de1f
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore/CosmosStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ module Internal =
struct (streams.SetWritePos(stream, pos'), false)
| Ok ((Writer.Result.Duplicate pos' | Writer.Result.PartialDuplicate pos'), _stats) ->
streams.SetWritePos(stream, pos') |> ignore // throw away the events (but not the unfolds)
ValueNone, false // Don't declare progress
ValueNone, false // Don't declare progress yet, until any unfolds have been handled
| Ok (Writer.Result.PrefixMissing _, _stats) ->
streams.WritePos(stream), false
| Error struct (exn, _stats) ->
Expand Down
14 changes: 10 additions & 4 deletions tools/Propulsion.Tool/Sync.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ type [<NoEquality; NoComparison; RequireSubcommand>] Parameters =
| [<AltCommandLine "-Z"; Unique>] FromTail
| [<AltCommandLine "-F"; Unique>] Follow
| [<AltCommandLine "-A"; Unique>] RequireAll
| [<AltCommandLine "-E"; Unique>] EventsOnly
| [<AltCommandLine "-C"; Unique>] Categorize
| [<AltCommandLine "-b"; Unique>] MaxItems of int

| [<AltCommandLine "-I"; AltCommandLine "--include-system"; Unique>] IncSys
| [<AltCommandLine "-N"; AltCommandLine "--exclude-system"; Unique>] ExcSys
| [<AltCommandLine "-cat"; AltCommandLine "--include-category">] IncCat of regex: string
| [<AltCommandLine "-ncat"; AltCommandLine "--exclude-category">] ExcCat of regex: string
| [<AltCommandLine "-sn"; AltCommandLine "--include-streamname">] IncStream of regex: string
Expand All @@ -37,10 +38,11 @@ type [<NoEquality; NoComparison; RequireSubcommand>] Parameters =
"NOTE normally a large `MaxReadAhead` and `cosmos -b` is required to avoid starving the scheduler. " +
"NOTE This mode does not make sense to apply unless the ProcessorName is fresh; if the consumer group name is not fresh (and hence items are excluded from the feed), there will inevitably be missing events, and processing will stall. " +
"Default: assume events arrive from the changefeed (and/or the input JSON file) without any gaps or out of order deliveries for any stream."
| EventsOnly -> "Exclude Unfolds from processing. Default: Unfolds are read, parsed and processed"
| Categorize -> "Gather handler latency stats by category"
| MaxItems _ -> "Limits RU consumption when reading; impacts checkpointing granularity by adjusting the batch size being loaded from the feed. Default (Sync): 9999. Default: Unlimited"

| IncSys -> "Include System streams. Default: Exclude Index Streams, identified by a $ prefix."
| ExcSys -> "Exclude System streams. Default: Include Index Streams, identified by a $ prefix."
| IncCat _ -> "Allow Stream Category. Multiple values are combined with OR. Default: include all, subject to Category Deny and Stream Deny rules."
| ExcCat _ -> "Deny Stream Category. Specified values/regexes are applied after the Category Allow rule(s)."
| IncStream _ -> "Allow Stream Name. Multiple values are combined with OR. Default: Allow all streams that pass the category Allow test, Fail the Category and Stream deny tests."
Expand All @@ -56,9 +58,10 @@ and Arguments(c, p: ParseResults<Parameters>) =
member val Filters = Propulsion.StreamFilter(
allowCats = p.GetResults IncCat, denyCats = p.GetResults ExcCat,
allowSns = p.GetResults IncStream, denySns = p.GetResults ExcStream,
includeSystem = p.Contains IncSys,
includeSystem = not (p.Contains ExcSys),
allowEts = p.GetResults IncEvent, denyEts = p.GetResults ExcEvent)
member val Categorize = p.Contains Categorize
member val IncludeUnfolds = not (p.Contains EventsOnly)
member val Command =
match p.GetSubCommand() with
| Kafka a -> KafkaArguments(c, a) |> SubCommand.Kafka
Expand Down Expand Up @@ -233,7 +236,10 @@ let run appName (c: Args.Configuration, p: ParseResults<Parameters>) = async {
Some p
| SubCommand.Stats _ | SubCommand.Sync _ -> None
let isFileSource = match a.Command.Source with Json _ -> true | _ -> false
let parse = a.Filters.CreateStreamFilter() |> Propulsion.CosmosStore.EquinoxSystemTextJsonParser.eventsAndUnfoldsWhereStream
let parse =
a.Filters.CreateStreamFilter()
|> if a.IncludeUnfolds then Propulsion.CosmosStore.EquinoxSystemTextJsonParser.eventsAndUnfoldsWhereStream
else Propulsion.CosmosStore.EquinoxSystemTextJsonParser.whereStream
let statsInterval, stateInterval = a.StatsInterval, a.StateInterval
let maxReadAhead = p.GetResult(MaxReadAhead, if isFileSource then 32768 else 2)
let maxConcurrentProcessors = p.GetResult(MaxWriters, 8)
Expand Down

0 comments on commit 685de1f

Please sign in to comment.