Skip to content

Commit

Permalink
Ingester.QueryStream: Add support for ignoring context cancellation f…
Browse files Browse the repository at this point in the history
…or chunk queriers (#6408)

* Ingester.QueryStream: Add support for ignoring context cancellation for chunk queriers

In Ingester.QueryStream, support ignoring of context cancellation wrt. chunk
queriers, the way it worked prior to
#6085. If the context is canceled
though, (span) log it with full diagnostics.

Also wrap some chunk querying errors.

---------

Signed-off-by: Arve Knudsen <[email protected]>
  • Loading branch information
aknuds1 authored Oct 19, 2023
1 parent 0b7be0b commit f737a96
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* * `requestRateLimitedError` is mapped to `codes.Unavailable` or `codes.ResourceExhausted` instead of the non-standard `529` (The service is overloaded) or `http.StatusTooManyRequests` (429).
* [FEATURE] Query-frontend: add experimental support for query blocking. Queries are blocked on a per-tenant basis and is configured via the limit `blocked_queries`. #5609
* [FEATURE] Vault: Added support for new Vault authentication methods: `AppRole`, `Kubernetes`, `UserPass` and `Token`. #6143
* [FEATURE] Ingester: Experimental support for ignoring context cancellation when querying chunks, useful in ruling out the query engine's potential role in unexpected query cancellations. Enable with `-ingester.chunks-query-ignore-cancellation`. #6408
* [ENHANCEMENT] Ingester: exported summary `cortex_ingester_inflight_push_requests_summary` tracking total number of inflight requests in percentile buckets. #5845
* [ENHANCEMENT] Query-scheduler: add `cortex_query_scheduler_enqueue_duration_seconds` metric that records the time taken to enqueue or reject a query request. #5879
* [ENHANCEMENT] Query-frontend: add `cortex_query_frontend_enqueue_duration_seconds` metric that records the time taken to enqueue or reject a query request. When query-scheduler is in use, the metric has the `scheduler_address` label to differentiate the enqueue duration by query-scheduler backend. #5879 #6087 #6120
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -2925,6 +2925,17 @@
"fieldFlag": "ingester.error-sample-rate",
"fieldType": "int",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "chunks_query_ignore_cancellation",
"required": false,
"desc": "Ignore cancellation when querying chunks.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "ingester.chunks-query-ignore-cancellation",
"fieldType": "boolean",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,8 @@ Usage of ./cmd/mimir/mimir:
After what time a series is considered to be inactive. (default 10m0s)
-ingester.active-series-metrics-update-period duration
How often to update active series metrics. (default 1m0s)
-ingester.chunks-query-ignore-cancellation
[experimental] Ignore cancellation when querying chunks.
-ingester.client.backoff-max-period duration
Maximum delay when backing off. (default 10s)
-ingester.client.backoff-min-period duration
Expand Down
2 changes: 2 additions & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ The following features are currently experimental:
- `-ingester.client.circuit-breaker.failure-execution-threshold`
- `-ingester.client.circuit-breaker.period`
- `-ingester.client.circuit-breaker.cooldown-period`
- Ignoring chunks query cancellation
- `-ingester.chunks-query-ignore-cancellation`
- Querier
- Use of Redis cache backend (`-blocks-storage.bucket-store.metadata-cache.backend=redis`)
- Streaming chunks from ingester to querier (`-querier.prefer-streaming-chunks-from-ingesters`, `-querier.streaming-chunks-per-ingester-buffer-size`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,10 @@ instance_limits:
# all of them.
# CLI flag: -ingester.error-sample-rate
[error_sample_rate: <int> | default = 0]
# (experimental) Ignore cancellation when querying chunks.
# CLI flag: -ingester.chunks-query-ignore-cancellation
[chunks_query_ignore_cancellation: <boolean> | default = false]
```

### querier
Expand Down
43 changes: 36 additions & 7 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ type Config struct {
LimitInflightRequestsUsingGrpcMethodLimiter bool `yaml:"limit_inflight_requests_using_grpc_method_limiter" category:"experimental"`

ErrorSampleRate int64 `yaml:"error_sample_rate" json:"error_sample_rate" category:"experimental"`

ChunksQueryIgnoreCancellation bool `yaml:"chunks_query_ignore_cancellation" category:"experimental"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -197,6 +199,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.BoolVar(&cfg.LogUtilizationBasedLimiterCPUSamples, "ingester.log-utilization-based-limiter-cpu-samples", false, "Enable logging of utilization based limiter CPU samples.")
f.BoolVar(&cfg.LimitInflightRequestsUsingGrpcMethodLimiter, "ingester.limit-inflight-requests-using-grpc-method-limiter", false, "Use experimental method of limiting push requests.")
f.Int64Var(&cfg.ErrorSampleRate, "ingester.error-sample-rate", 0, "Each error will be logged once in this many times. Use 0 to log all of them.")
f.BoolVar(&cfg.ChunksQueryIgnoreCancellation, "ingester.chunks-query-ignore-cancellation", false, "Ignore cancellation when querying chunks.")
}

func (cfg *Config) Validate() error {
Expand Down Expand Up @@ -1612,6 +1615,8 @@ const queryStreamBatchMessageSize = 1 * 1024 * 1024

// QueryStream streams metrics from a TSDB. This implements the client.IngesterServer interface
func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) error {
start := time.Now()

if err := i.checkRunning(); err != nil {
return err
}
Expand Down Expand Up @@ -1667,12 +1672,23 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
}

if streamType == QueryStreamChunks {
chunksCtx := ctx
if i.cfg.ChunksQueryIgnoreCancellation {
// Pass an independent context, to help investigating a problem with ingester queries
// getting canceled. Prior to https://github.com/grafana/mimir/pull/6085, Prometheus chunk
// queriers actually ignored context, so we are emulating that behavior.
chunksCtx = context.WithoutCancel(ctx)
}
if req.StreamingChunksBatchSize > 0 {
level.Debug(spanlog).Log("msg", "using executeStreamingQuery")
numSeries, numSamples, err = i.executeStreamingQuery(ctx, db, int64(from), int64(through), matchers, shard, stream, req.StreamingChunksBatchSize, spanlog)
numSeries, numSamples, err = i.executeStreamingQuery(chunksCtx, db, int64(from), int64(through), matchers, shard, stream, req.StreamingChunksBatchSize, spanlog)
} else {
level.Debug(spanlog).Log("msg", "using executeChunksQuery")
numSeries, numSamples, err = i.executeChunksQuery(ctx, db, int64(from), int64(through), matchers, shard, stream)
numSeries, numSamples, err = i.executeChunksQuery(chunksCtx, db, int64(from), int64(through), matchers, shard, stream)
}

if i.cfg.ChunksQueryIgnoreCancellation && (ctx.Err() != nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) {
dumpContextError(ctx, err, start, spanlog)
}
} else {
level.Debug(spanlog).Log("msg", "using executeSamplesQuery")
Expand All @@ -1688,6 +1704,19 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
return nil
}

// Dump context error for diagnosis.
func dumpContextError(ctx context.Context, err error, start time.Time, spanlog *spanlogger.SpanLogger) {
deadline, deadlineSet := ctx.Deadline()
var timeout string
if deadlineSet {
timeout = fmt.Sprintf("%.2f seconds", deadline.Sub(start).Seconds())
} else {
timeout = "not set"
}
level.Debug(spanlog).Log("msg", "query context error", "cause", context.Cause(ctx), "timeout", timeout,
"err", err)
}

func (i *Ingester) executeSamplesQuery(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, shard *sharding.ShardSelector, stream client.Ingester_QueryStreamServer) (numSeries, numSamples int, _ error) {
q, err := db.Querier(from, through)
if err != nil {
Expand Down Expand Up @@ -1796,7 +1825,7 @@ func (i *Ingester) executeChunksQuery(ctx context.Context, db *userTSDB, from, t
// It's not required to return sorted series because series are sorted by the Mimir querier.
ss := q.Select(ctx, false, hints, matchers...)
if ss.Err() != nil {
return 0, 0, ss.Err()
return 0, 0, errors.Wrap(ss.Err(), "selecting series from ChunkQuerier")
}

chunkSeries := make([]client.TimeSeriesChunk, 0, queryStreamBatchSize)
Expand Down Expand Up @@ -1852,7 +1881,7 @@ func (i *Ingester) executeChunksQuery(ctx context.Context, db *userTSDB, from, t

// Ensure no error occurred while iterating the series set.
if err := ss.Err(); err != nil {
return 0, 0, err
return 0, 0, errors.Wrap(err, "iterating ChunkSeriesSet")
}

// Final flush any existing metrics
Expand Down Expand Up @@ -1943,7 +1972,7 @@ func (i *Ingester) sendStreamingQuerySeries(ctx context.Context, q storage.Chunk
// Series must be sorted so that they can be read by the querier in the order the PromQL engine expects.
ss := q.Select(ctx, true, hints, matchers...)
if ss.Err() != nil {
return nil, 0, ss.Err()
return nil, 0, errors.Wrap(ss.Err(), "selecting series from ChunkQuerier")
}

seriesInBatch := make([]client.QueryStreamSeries, 0, queryStreamBatchSize)
Expand Down Expand Up @@ -1972,7 +2001,7 @@ func (i *Ingester) sendStreamingQuerySeries(ctx context.Context, q storage.Chunk

chunkCount, err := series.ChunkCount()
if err != nil {
return nil, 0, err
return nil, 0, errors.Wrap(err, "getting ChunkSeries chunk count")
}

seriesInBatch = append(seriesInBatch, client.QueryStreamSeries{
Expand Down Expand Up @@ -2003,7 +2032,7 @@ func (i *Ingester) sendStreamingQuerySeries(ctx context.Context, q storage.Chunk

// Ensure no error occurred while iterating the series set.
if err := ss.Err(); err != nil {
return nil, 0, err
return nil, 0, errors.Wrap(err, "iterating ChunkSeriesSet")
}

return allSeriesList, seriesCount, nil
Expand Down

0 comments on commit f737a96

Please sign in to comment.