Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingester.QueryStream: Add support for ignoring context cancellation for chunk queriers #6408

Merged
merged 15 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* * `requestRateLimitedError` is mapped to `codes.Unavailable` or `codes.ResourceExhausted` instead of the non-standard `529` (The service is overloaded) or `http.StatusTooManyRequests` (429).
* [FEATURE] Query-frontend: add experimental support for query blocking. Queries are blocked on a per-tenant basis and is configured via the limit `blocked_queries`. #5609
* [FEATURE] Vault: Added support for new Vault authentication methods: `AppRole`, `Kubernetes`, `UserPass` and `Token`. #6143
* [FEATURE] Ingester: Experimental support for ignoring context cancellation when querying chunks, useful in ruling out the query engine's potential role in unexpected query cancellations. Enable with `-ingester.chunks-query-ignore-cancellation`. #6408
* [ENHANCEMENT] Ingester: exported summary `cortex_ingester_inflight_push_requests_summary` tracking total number of inflight requests in percentile buckets. #5845
* [ENHANCEMENT] Query-scheduler: add `cortex_query_scheduler_enqueue_duration_seconds` metric that records the time taken to enqueue or reject a query request. #5879
* [ENHANCEMENT] Query-frontend: add `cortex_query_frontend_enqueue_duration_seconds` metric that records the time taken to enqueue or reject a query request. When query-scheduler is in use, the metric has the `scheduler_address` label to differentiate the enqueue duration by query-scheduler backend. #5879 #6087 #6120
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -2925,6 +2925,17 @@
"fieldFlag": "ingester.error-sample-rate",
"fieldType": "int",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "chunks_query_ignore_cancellation",
"required": false,
"desc": "Ignore cancellation when querying chunks.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "ingester.chunks-query-ignore-cancellation",
"fieldType": "boolean",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,8 @@ Usage of ./cmd/mimir/mimir:
After what time a series is considered to be inactive. (default 10m0s)
-ingester.active-series-metrics-update-period duration
How often to update active series metrics. (default 1m0s)
-ingester.chunks-query-ignore-cancellation
[experimental] Ignore cancellation when querying chunks.
-ingester.client.backoff-max-period duration
Maximum delay when backing off. (default 10s)
-ingester.client.backoff-min-period duration
Expand Down
2 changes: 2 additions & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ The following features are currently experimental:
- `-ingester.client.circuit-breaker.failure-execution-threshold`
- `-ingester.client.circuit-breaker.period`
- `-ingester.client.circuit-breaker.cooldown-period`
- Ignoring chunks query cancellation
- `-ingester.chunks-query-ignore-cancellation`
- Querier
- Use of Redis cache backend (`-blocks-storage.bucket-store.metadata-cache.backend=redis`)
- Streaming chunks from ingester to querier (`-querier.prefer-streaming-chunks-from-ingesters`, `-querier.streaming-chunks-per-ingester-buffer-size`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,10 @@ instance_limits:
# all of them.
# CLI flag: -ingester.error-sample-rate
[error_sample_rate: <int> | default = 0]
# (experimental) Ignore cancellation when querying chunks.
# CLI flag: -ingester.chunks-query-ignore-cancellation
[chunks_query_ignore_cancellation: <boolean> | default = false]
```

### querier
Expand Down
43 changes: 36 additions & 7 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ type Config struct {
LimitInflightRequestsUsingGrpcMethodLimiter bool `yaml:"limit_inflight_requests_using_grpc_method_limiter" category:"experimental"`

ErrorSampleRate int64 `yaml:"error_sample_rate" json:"error_sample_rate" category:"experimental"`

ChunksQueryIgnoreCancellation bool `yaml:"chunks_query_ignore_cancellation" category:"experimental"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -197,6 +199,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.BoolVar(&cfg.LogUtilizationBasedLimiterCPUSamples, "ingester.log-utilization-based-limiter-cpu-samples", false, "Enable logging of utilization based limiter CPU samples.")
f.BoolVar(&cfg.LimitInflightRequestsUsingGrpcMethodLimiter, "ingester.limit-inflight-requests-using-grpc-method-limiter", false, "Use experimental method of limiting push requests.")
f.Int64Var(&cfg.ErrorSampleRate, "ingester.error-sample-rate", 0, "Each error will be logged once in this many times. Use 0 to log all of them.")
f.BoolVar(&cfg.ChunksQueryIgnoreCancellation, "ingester.chunks-query-ignore-cancellation", false, "Ignore cancellation when querying chunks.")
}

func (cfg *Config) Validate() error {
Expand Down Expand Up @@ -1612,6 +1615,8 @@ const queryStreamBatchMessageSize = 1 * 1024 * 1024

// QueryStream streams metrics from a TSDB. This implements the client.IngesterServer interface
func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) error {
start := time.Now()

if err := i.checkRunning(); err != nil {
return err
}
Expand Down Expand Up @@ -1667,12 +1672,23 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
}

if streamType == QueryStreamChunks {
chunksCtx := ctx
if i.cfg.ChunksQueryIgnoreCancellation {
// Pass an independent context, to help investigating a problem with ingester queries
// getting canceled. Prior to https://github.com/grafana/mimir/pull/6085, Prometheus chunk
// queriers actually ignored context, so we are emulating that behavior.
chunksCtx = context.WithoutCancel(ctx)
}
if req.StreamingChunksBatchSize > 0 {
level.Debug(spanlog).Log("msg", "using executeStreamingQuery")
numSeries, numSamples, err = i.executeStreamingQuery(ctx, db, int64(from), int64(through), matchers, shard, stream, req.StreamingChunksBatchSize, spanlog)
numSeries, numSamples, err = i.executeStreamingQuery(chunksCtx, db, int64(from), int64(through), matchers, shard, stream, req.StreamingChunksBatchSize, spanlog)
} else {
level.Debug(spanlog).Log("msg", "using executeChunksQuery")
numSeries, numSamples, err = i.executeChunksQuery(ctx, db, int64(from), int64(through), matchers, shard, stream)
numSeries, numSamples, err = i.executeChunksQuery(chunksCtx, db, int64(from), int64(through), matchers, shard, stream)
}

if i.cfg.ChunksQueryIgnoreCancellation && (ctx.Err() != nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) {
dumpContextError(ctx, err, start, spanlog)
}
} else {
level.Debug(spanlog).Log("msg", "using executeSamplesQuery")
Expand All @@ -1688,6 +1704,19 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
return nil
}

// Dump context error for diagnosis.
func dumpContextError(ctx context.Context, err error, start time.Time, spanlog *spanlogger.SpanLogger) {
deadline, deadlineSet := ctx.Deadline()
var timeout string
if deadlineSet {
timeout = fmt.Sprintf("%.2f seconds", deadline.Sub(start).Seconds())
} else {
timeout = "not set"
}
level.Debug(spanlog).Log("msg", "query context error", "cause", context.Cause(ctx), "timeout", timeout,
"err", err)
}

func (i *Ingester) executeSamplesQuery(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, shard *sharding.ShardSelector, stream client.Ingester_QueryStreamServer) (numSeries, numSamples int, _ error) {
q, err := db.Querier(from, through)
if err != nil {
Expand Down Expand Up @@ -1796,7 +1825,7 @@ func (i *Ingester) executeChunksQuery(ctx context.Context, db *userTSDB, from, t
// It's not required to return sorted series because series are sorted by the Mimir querier.
ss := q.Select(ctx, false, hints, matchers...)
if ss.Err() != nil {
return 0, 0, ss.Err()
return 0, 0, errors.Wrap(ss.Err(), "selecting series from ChunkQuerier")
}

chunkSeries := make([]client.TimeSeriesChunk, 0, queryStreamBatchSize)
Expand Down Expand Up @@ -1852,7 +1881,7 @@ func (i *Ingester) executeChunksQuery(ctx context.Context, db *userTSDB, from, t

// Ensure no error occurred while iterating the series set.
if err := ss.Err(); err != nil {
return 0, 0, err
return 0, 0, errors.Wrap(err, "iterating ChunkSeriesSet")
}

// Final flush any existing metrics
Expand Down Expand Up @@ -1943,7 +1972,7 @@ func (i *Ingester) sendStreamingQuerySeries(ctx context.Context, q storage.Chunk
// Series must be sorted so that they can be read by the querier in the order the PromQL engine expects.
ss := q.Select(ctx, true, hints, matchers...)
if ss.Err() != nil {
return nil, 0, ss.Err()
return nil, 0, errors.Wrap(ss.Err(), "selecting series from ChunkQuerier")
}

seriesInBatch := make([]client.QueryStreamSeries, 0, queryStreamBatchSize)
Expand Down Expand Up @@ -1972,7 +2001,7 @@ func (i *Ingester) sendStreamingQuerySeries(ctx context.Context, q storage.Chunk

chunkCount, err := series.ChunkCount()
if err != nil {
return nil, 0, err
return nil, 0, errors.Wrap(err, "getting ChunkSeries chunk count")
}

seriesInBatch = append(seriesInBatch, client.QueryStreamSeries{
Expand Down Expand Up @@ -2003,7 +2032,7 @@ func (i *Ingester) sendStreamingQuerySeries(ctx context.Context, q storage.Chunk

// Ensure no error occurred while iterating the series set.
if err := ss.Err(); err != nil {
return nil, 0, err
return nil, 0, errors.Wrap(err, "iterating ChunkSeriesSet")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] I think calling it "iterating" is a bit misleading. I understand this is coming from the comment above, but in practice what's happening here is that we're "sending" not "iterating".

Same comment applies above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't actually looking at the comment, I was just relating to the fact that ss.Err() returns any error having occurred during iteration of the ss iterator. I don't understand how "sending" would be more descriptive, since any error from ss.Err() would be from failed ChunkSeries iteration, not sending. A failed send should result in an error returned from client.SendQueryStream, no?

}

return allSeriesList, seriesCount, nil
Expand Down
Loading