Skip to content

Commit

Permalink
fix MimirIngesterStuckProcessingRecordsFromKafka (#9855)
Browse files Browse the repository at this point in the history
* fix `MimirIngesterStuckProcessingRecordsFromKafka`

The alert `MimirIngesterStuckProcessingRecordsFromKafka` relied on the metric `cortex_ingest_storage_reader_buffered_fetch_records_total ` provided by the Kafka client to identify wether we had stuck buffers or not.

Now that we've implemented concurrent fetching from Kafka and bypass the client's polling function we needed an equivalent metric when using concurrent fetching. This PR does that; In addition to that - the metric also takes the client's buffered records In case we do use a mixture of non-concurrent fetching and concurrent fetching.

* Add changelog

Signed-off-by: gotjosh <[email protected]>

* reestrcture metric assignment

Signed-off-by: gotjosh <[email protected]>

* Remove the registry

Signed-off-by: gotjosh <[email protected]>

* Fix helm

Signed-off-by: gotjosh <[email protected]>

* Protected the fetchers

Signed-off-by: gotjosh <[email protected]>

* Change log to debug

Signed-off-by: gotjosh <[email protected]>

* make `BufferedRecords` int64 and remove debug logs

Signed-off-by: gotjosh <[email protected]>

* Move buffered records increment location

Signed-off-by: gotjosh <[email protected]>

* Use atomic functions for locking / unlocking the client and fetcher.

Signed-off-by: gotjosh <[email protected]>

* assert on buffered records

Signed-off-by: gotjosh <[email protected]>

* reset the records buffer when `stop()` is called.

Signed-off-by: gotjosh <[email protected]>

* Fix test

Signed-off-by: Marco Pracucci <[email protected]>

* Changed how buffered records are tracked, improved unit tests and used atomic instead of a mutex to protect client/fetcher access

Signed-off-by: Marco Pracucci <[email protected]>

* Fix

Signed-off-by: Marco Pracucci <[email protected]>

* Fix comment

Signed-off-by: Marco Pracucci <[email protected]>

* Fix comment

Signed-off-by: Marco Pracucci <[email protected]>

* Get back to Josh implementation of buffered records tracking which has better coverage of all buffered records

Signed-off-by: Marco Pracucci <[email protected]>

* Use atomic for fetcher too

Signed-off-by: Marco Pracucci <[email protected]>

---------

Signed-off-by: gotjosh <[email protected]>
Signed-off-by: Marco Pracucci <[email protected]>
Co-authored-by: Marco Pracucci <[email protected]>
  • Loading branch information
gotjosh and pracucci authored Nov 13, 2024
1 parent 5fd2424 commit 72cbd83
Show file tree
Hide file tree
Showing 9 changed files with 477 additions and 156 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
### Mixin

* [CHANGE] Remove backwards compatibility for `thanos_memcached_` prefixed metrics in dashboards and alerts removed in 2.12. #9674 #9758
* [CHANGE] Reworked the alert `MimirIngesterStuckProcessingRecordsFromKafka` to also work when concurrent fetching is enabled. #9855
* [ENHANCEMENT] Unify ingester autoscaling panels on 'Mimir / Writes' dashboard to work for both ingest-storage and non-ingest-storage autoscaling. #9617
* [ENHANCEMENT] Alerts: Enable configuring job prefix for alerts to prevent clashes with metrics from Loki/Tempo. #9659
* [ENHANCEMENT] Dashboards: visualize the age of source blocks in the "Mimir / Compactor" dashboard. #9697
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1129,8 +1129,7 @@ spec:
rate(cortex_ingest_storage_reader_requests_total[5m])
) == 0)
and
# NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records.
(sum by (cluster, namespace, pod) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0)
(sum by (cluster, namespace, pod) (cortex_ingest_storage_reader_buffered_fetched_records) > 0)
for: 5m
labels:
severity: critical
Expand Down
3 changes: 1 addition & 2 deletions operations/mimir-mixin-compiled-baremetal/alerts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1103,8 +1103,7 @@ groups:
rate(cortex_ingest_storage_reader_requests_total[5m])
) == 0)
and
# NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records.
(sum by (cluster, namespace, instance) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0)
(sum by (cluster, namespace, instance) (cortex_ingest_storage_reader_buffered_fetched_records) > 0)
for: 5m
labels:
severity: critical
Expand Down
3 changes: 1 addition & 2 deletions operations/mimir-mixin-compiled/alerts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1117,8 +1117,7 @@ groups:
rate(cortex_ingest_storage_reader_requests_total[5m])
) == 0)
and
# NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records.
(sum by (cluster, namespace, pod) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0)
(sum by (cluster, namespace, pod) (cortex_ingest_storage_reader_buffered_fetched_records) > 0)
for: 5m
labels:
severity: critical
Expand Down
3 changes: 1 addition & 2 deletions operations/mimir-mixin/alerts/ingest-storage.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,7 @@
rate(cortex_ingest_storage_reader_requests_total[5m])
) == 0)
and
# NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records.
(sum by (%(alert_aggregation_labels)s, %(per_instance_label)s) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0)
(sum by (%(alert_aggregation_labels)s, %(per_instance_label)s) (cortex_ingest_storage_reader_buffered_fetched_records) > 0)
||| % $._config,
labels: {
severity: 'critical',
Expand Down
116 changes: 90 additions & 26 deletions pkg/storage/ingest/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type fetcher interface {

// Stop stops the fetcher.
Stop()

// BufferedRecords returns the number of records that have been fetched but not yet consumed.
BufferedRecords() int64
}

// fetchWant represents a range of offsets to fetch.
Expand Down Expand Up @@ -220,20 +223,18 @@ type concurrentFetchers struct {

minBytesWaitTime time.Duration

orderedFetches chan fetchResult
// orderedFetches is a channel where we write fetches that are ready to be polled by PollFetches().
// Since all records must be polled in order, the fetches written to this channel are after
// ordering.
orderedFetches chan fetchResult

lastReturnedRecord int64
startOffsets *genericOffsetReader[int64]

// trackCompressedBytes controls whether to calculate MaxBytes for fetch requests based on previous responses' compressed or uncompressed bytes.
trackCompressedBytes bool
}

// Stop implements fetcher
func (r *concurrentFetchers) Stop() {
close(r.done)

r.wg.Wait()
level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_record", r.lastReturnedRecord)
bufferedFetchedRecords *atomic.Int64
}

// newConcurrentFetchers creates a new concurrentFetchers. startOffset can be kafkaOffsetStart, kafkaOffsetEnd or a specific offset.
Expand Down Expand Up @@ -267,18 +268,19 @@ func newConcurrentFetchers(
return nil, fmt.Errorf("resolving offset to start consuming from: %w", err)
}
f := &concurrentFetchers{
client: client,
logger: logger,
topicName: topic,
partitionID: partition,
metrics: metrics,
minBytesWaitTime: minBytesWaitTime,
lastReturnedRecord: startOffset - 1,
startOffsets: startOffsetsReader,
trackCompressedBytes: trackCompressedBytes,
tracer: recordsTracer(),
orderedFetches: make(chan fetchResult),
done: make(chan struct{}),
bufferedFetchedRecords: atomic.NewInt64(0),
client: client,
logger: logger,
topicName: topic,
partitionID: partition,
metrics: metrics,
minBytesWaitTime: minBytesWaitTime,
lastReturnedRecord: startOffset - 1,
startOffsets: startOffsetsReader,
trackCompressedBytes: trackCompressedBytes,
tracer: recordsTracer(),
orderedFetches: make(chan fetchResult),
done: make(chan struct{}),
}

topics, err := kadm.NewClient(client).ListTopics(ctx, topic)
Expand All @@ -299,6 +301,32 @@ func newConcurrentFetchers(
return f, nil
}

// BufferedRecords implements fetcher.
func (r *concurrentFetchers) BufferedRecords() int64 {
return r.bufferedFetchedRecords.Load()
}

// Stop implements fetcher.
func (r *concurrentFetchers) Stop() {
// Ensure it's not already stopped.
select {
case _, ok := <-r.done:
if !ok {
return
}
default:
}

close(r.done)
r.wg.Wait()

// When the fetcher is stopped, buffered records are intentionally dropped. For this reason,
// we do reset the counter of buffered records here.
r.bufferedFetchedRecords.Store(0)

level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_record", r.lastReturnedRecord)
}

// Update implements fetcher
func (r *concurrentFetchers) Update(ctx context.Context, concurrency, records int) {
r.Stop()
Expand All @@ -315,6 +343,13 @@ func (r *concurrentFetchers) PollFetches(ctx context.Context) (kgo.Fetches, cont
case <-ctx.Done():
return kgo.Fetches{}, ctx
case f := <-r.orderedFetches:
// The records have been polled from the buffer, so we can now decrease the number of
// buffered records. It's important to note that we decrease it by the number of actually
// buffered records and not by the number of records returned by PollFetchers(), which
// could be lower if some records are discarded because "old" (already returned by previous
// PollFetches() calls).
r.bufferedFetchedRecords.Sub(int64(len(f.Records)))

firstUnreturnedRecordIdx := recordIndexAfterOffset(f.Records, r.lastReturnedRecord)
r.recordOrderedFetchTelemetry(f, firstUnreturnedRecordIdx, waitStartTime)

Expand Down Expand Up @@ -499,6 +534,10 @@ func (r *concurrentFetchers) run(ctx context.Context, wants chan fetchWant, logg
attemptSpan.SetTag("attempt", attempt)

f := r.fetchSingle(ctx, w)

// We increase the count of buffered records as soon as we fetch them.
r.bufferedFetchedRecords.Add(int64(len(f.Records)))

f = f.Merge(previousResult)
previousResult = f
if f.Err != nil {
Expand Down Expand Up @@ -586,18 +625,39 @@ func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concu
go r.run(ctx, wants, logger, highWatermark)
}

// We need to make sure we don't leak any goroutine given that start is called within a goroutine.
defer r.wg.Done()

var (
nextFetch = fetchWantFrom(startOffset, recordsPerFetch)
nextResult chan fetchResult
// nextFetch is the next records fetch operation we want to issue to one of the running workers.
// It contains the offset range to fetch and a channel where the result should be written to.
nextFetch = fetchWantFrom(startOffset, recordsPerFetch)

// nextResult is the channel where we expect a worker will write the result of the next fetch
// operation. This result is the next result that will be returned to PollFetches(), guaranteeing
// records ordering.
nextResult chan fetchResult

// pendingResults is the list of all fetchResult of all inflight fetch operations. Pending results
// are ordered in the same order these results should be returned to PollFetches(), so the first one
// in the list is the next one that should be returned, unless nextResult is valued (if nextResult
// is valued, then nextResult is the next and the first item in the pendingResults list is the
// 2nd next one).
pendingResults = list.New()

bufferedResult fetchResult
readyBufferedResults chan fetchResult // this is non-nil when bufferedResult is non-empty
// bufferedResult is the next fetch that should be polled by PollFetches().
bufferedResult fetchResult

// readyBufferedResults channel gets continuously flipped between nil and the actual channel
// where PollFetches() reads from. This channel is nil when there are no ordered buffered
// records ready to be written to the channel where PollFetches(), and is non-nil when there
// are some ordered buffered records ready.
//
// It is guaranteed that this channel is non-nil when bufferedResult is non-empty.
readyBufferedResults chan fetchResult
)
nextFetch.bytesPerRecord = 10_000 // start with an estimation, we will update it as we consume

// We need to make sure we don't leak any goroutine given that start is called within a goroutine.
defer r.wg.Done()
for {
refillBufferedResult := nextResult
if readyBufferedResults != nil {
Expand Down Expand Up @@ -641,6 +701,10 @@ func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concu
continue
}
nextFetch = nextFetch.UpdateBytesPerRecord(result.fetchedBytes, len(result.Records))

// We have some ordered records ready to be sent to PollFetches(). We store the fetch
// result in bufferedResult, and we flip readyBufferedResults to the channel used by
// PollFetches().
bufferedResult = result
readyBufferedResults = r.orderedFetches

Expand Down
Loading

0 comments on commit 72cbd83

Please sign in to comment.