From 72cbd836389c935ad79419f8124de73796bbb42a Mon Sep 17 00:00:00 2001 From: gotjosh Date: Wed, 13 Nov 2024 13:38:11 +0000 Subject: [PATCH] fix `MimirIngesterStuckProcessingRecordsFromKafka` (#9855) * fix `MimirIngesterStuckProcessingRecordsFromKafka` The alert `MimirIngesterStuckProcessingRecordsFromKafka` relied on the metric `cortex_ingest_storage_reader_buffered_fetch_records_total ` provided by the Kafka client to identify wether we had stuck buffers or not. Now that we've implemented concurrent fetching from Kafka and bypass the client's polling function we needed an equivalent metric when using concurrent fetching. This PR does that; In addition to that - the metric also takes the client's buffered records In case we do use a mixture of non-concurrent fetching and concurrent fetching. * Add changelog Signed-off-by: gotjosh * reestrcture metric assignment Signed-off-by: gotjosh * Remove the registry Signed-off-by: gotjosh * Fix helm Signed-off-by: gotjosh * Protected the fetchers Signed-off-by: gotjosh * Change log to debug Signed-off-by: gotjosh * make `BufferedRecords` int64 and remove debug logs Signed-off-by: gotjosh * Move buffered records increment location Signed-off-by: gotjosh * Use atomic functions for locking / unlocking the client and fetcher. Signed-off-by: gotjosh * assert on buffered records Signed-off-by: gotjosh * reset the records buffer when `stop()` is called. Signed-off-by: gotjosh * Fix test Signed-off-by: Marco Pracucci * Changed how buffered records are tracked, improved unit tests and used atomic instead of a mutex to protect client/fetcher access Signed-off-by: Marco Pracucci * Fix Signed-off-by: Marco Pracucci * Fix comment Signed-off-by: Marco Pracucci * Fix comment Signed-off-by: Marco Pracucci * Get back to Josh implementation of buffered records tracking which has better coverage of all buffered records Signed-off-by: Marco Pracucci * Use atomic for fetcher too Signed-off-by: Marco Pracucci --------- Signed-off-by: gotjosh Signed-off-by: Marco Pracucci Co-authored-by: Marco Pracucci --- CHANGELOG.md | 1 + .../metamonitoring/mixin-alerts.yaml | 3 +- .../alerts.yaml | 3 +- operations/mimir-mixin-compiled/alerts.yaml | 3 +- .../alerts/ingest-storage.libsonnet | 3 +- pkg/storage/ingest/fetcher.go | 116 ++++++-- pkg/storage/ingest/fetcher_test.go | 166 ++++++++++-- pkg/storage/ingest/reader.go | 91 +++++-- pkg/storage/ingest/reader_test.go | 247 ++++++++++++------ 9 files changed, 477 insertions(+), 156 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eb6f93c93a1..f00292d282b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -76,6 +76,7 @@ ### Mixin * [CHANGE] Remove backwards compatibility for `thanos_memcached_` prefixed metrics in dashboards and alerts removed in 2.12. #9674 #9758 +* [CHANGE] Reworked the alert `MimirIngesterStuckProcessingRecordsFromKafka` to also work when concurrent fetching is enabled. #9855 * [ENHANCEMENT] Unify ingester autoscaling panels on 'Mimir / Writes' dashboard to work for both ingest-storage and non-ingest-storage autoscaling. #9617 * [ENHANCEMENT] Alerts: Enable configuring job prefix for alerts to prevent clashes with metrics from Loki/Tempo. #9659 * [ENHANCEMENT] Dashboards: visualize the age of source blocks in the "Mimir / Compactor" dashboard. #9697 diff --git a/operations/helm/tests/metamonitoring-values-generated/mimir-distributed/templates/metamonitoring/mixin-alerts.yaml b/operations/helm/tests/metamonitoring-values-generated/mimir-distributed/templates/metamonitoring/mixin-alerts.yaml index b45db52d7ef..ebb3c5fd135 100644 --- a/operations/helm/tests/metamonitoring-values-generated/mimir-distributed/templates/metamonitoring/mixin-alerts.yaml +++ b/operations/helm/tests/metamonitoring-values-generated/mimir-distributed/templates/metamonitoring/mixin-alerts.yaml @@ -1129,8 +1129,7 @@ spec: rate(cortex_ingest_storage_reader_requests_total[5m]) ) == 0) and - # NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records. - (sum by (cluster, namespace, pod) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0) + (sum by (cluster, namespace, pod) (cortex_ingest_storage_reader_buffered_fetched_records) > 0) for: 5m labels: severity: critical diff --git a/operations/mimir-mixin-compiled-baremetal/alerts.yaml b/operations/mimir-mixin-compiled-baremetal/alerts.yaml index 4f87df5ba22..77846f2f2d6 100644 --- a/operations/mimir-mixin-compiled-baremetal/alerts.yaml +++ b/operations/mimir-mixin-compiled-baremetal/alerts.yaml @@ -1103,8 +1103,7 @@ groups: rate(cortex_ingest_storage_reader_requests_total[5m]) ) == 0) and - # NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records. - (sum by (cluster, namespace, instance) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0) + (sum by (cluster, namespace, instance) (cortex_ingest_storage_reader_buffered_fetched_records) > 0) for: 5m labels: severity: critical diff --git a/operations/mimir-mixin-compiled/alerts.yaml b/operations/mimir-mixin-compiled/alerts.yaml index dc7cdd4e8eb..cef80213c98 100644 --- a/operations/mimir-mixin-compiled/alerts.yaml +++ b/operations/mimir-mixin-compiled/alerts.yaml @@ -1117,8 +1117,7 @@ groups: rate(cortex_ingest_storage_reader_requests_total[5m]) ) == 0) and - # NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records. - (sum by (cluster, namespace, pod) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0) + (sum by (cluster, namespace, pod) (cortex_ingest_storage_reader_buffered_fetched_records) > 0) for: 5m labels: severity: critical diff --git a/operations/mimir-mixin/alerts/ingest-storage.libsonnet b/operations/mimir-mixin/alerts/ingest-storage.libsonnet index 371865130b0..3cca0186d4b 100644 --- a/operations/mimir-mixin/alerts/ingest-storage.libsonnet +++ b/operations/mimir-mixin/alerts/ingest-storage.libsonnet @@ -151,8 +151,7 @@ rate(cortex_ingest_storage_reader_requests_total[5m]) ) == 0) and - # NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records. - (sum by (%(alert_aggregation_labels)s, %(per_instance_label)s) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0) + (sum by (%(alert_aggregation_labels)s, %(per_instance_label)s) (cortex_ingest_storage_reader_buffered_fetched_records) > 0) ||| % $._config, labels: { severity: 'critical', diff --git a/pkg/storage/ingest/fetcher.go b/pkg/storage/ingest/fetcher.go index 05bffcd40c5..4ecbc131d27 100644 --- a/pkg/storage/ingest/fetcher.go +++ b/pkg/storage/ingest/fetcher.go @@ -48,6 +48,9 @@ type fetcher interface { // Stop stops the fetcher. Stop() + + // BufferedRecords returns the number of records that have been fetched but not yet consumed. + BufferedRecords() int64 } // fetchWant represents a range of offsets to fetch. @@ -220,20 +223,18 @@ type concurrentFetchers struct { minBytesWaitTime time.Duration - orderedFetches chan fetchResult + // orderedFetches is a channel where we write fetches that are ready to be polled by PollFetches(). + // Since all records must be polled in order, the fetches written to this channel are after + // ordering. + orderedFetches chan fetchResult + lastReturnedRecord int64 startOffsets *genericOffsetReader[int64] // trackCompressedBytes controls whether to calculate MaxBytes for fetch requests based on previous responses' compressed or uncompressed bytes. trackCompressedBytes bool -} -// Stop implements fetcher -func (r *concurrentFetchers) Stop() { - close(r.done) - - r.wg.Wait() - level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_record", r.lastReturnedRecord) + bufferedFetchedRecords *atomic.Int64 } // newConcurrentFetchers creates a new concurrentFetchers. startOffset can be kafkaOffsetStart, kafkaOffsetEnd or a specific offset. @@ -267,18 +268,19 @@ func newConcurrentFetchers( return nil, fmt.Errorf("resolving offset to start consuming from: %w", err) } f := &concurrentFetchers{ - client: client, - logger: logger, - topicName: topic, - partitionID: partition, - metrics: metrics, - minBytesWaitTime: minBytesWaitTime, - lastReturnedRecord: startOffset - 1, - startOffsets: startOffsetsReader, - trackCompressedBytes: trackCompressedBytes, - tracer: recordsTracer(), - orderedFetches: make(chan fetchResult), - done: make(chan struct{}), + bufferedFetchedRecords: atomic.NewInt64(0), + client: client, + logger: logger, + topicName: topic, + partitionID: partition, + metrics: metrics, + minBytesWaitTime: minBytesWaitTime, + lastReturnedRecord: startOffset - 1, + startOffsets: startOffsetsReader, + trackCompressedBytes: trackCompressedBytes, + tracer: recordsTracer(), + orderedFetches: make(chan fetchResult), + done: make(chan struct{}), } topics, err := kadm.NewClient(client).ListTopics(ctx, topic) @@ -299,6 +301,32 @@ func newConcurrentFetchers( return f, nil } +// BufferedRecords implements fetcher. +func (r *concurrentFetchers) BufferedRecords() int64 { + return r.bufferedFetchedRecords.Load() +} + +// Stop implements fetcher. +func (r *concurrentFetchers) Stop() { + // Ensure it's not already stopped. + select { + case _, ok := <-r.done: + if !ok { + return + } + default: + } + + close(r.done) + r.wg.Wait() + + // When the fetcher is stopped, buffered records are intentionally dropped. For this reason, + // we do reset the counter of buffered records here. + r.bufferedFetchedRecords.Store(0) + + level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_record", r.lastReturnedRecord) +} + // Update implements fetcher func (r *concurrentFetchers) Update(ctx context.Context, concurrency, records int) { r.Stop() @@ -315,6 +343,13 @@ func (r *concurrentFetchers) PollFetches(ctx context.Context) (kgo.Fetches, cont case <-ctx.Done(): return kgo.Fetches{}, ctx case f := <-r.orderedFetches: + // The records have been polled from the buffer, so we can now decrease the number of + // buffered records. It's important to note that we decrease it by the number of actually + // buffered records and not by the number of records returned by PollFetchers(), which + // could be lower if some records are discarded because "old" (already returned by previous + // PollFetches() calls). + r.bufferedFetchedRecords.Sub(int64(len(f.Records))) + firstUnreturnedRecordIdx := recordIndexAfterOffset(f.Records, r.lastReturnedRecord) r.recordOrderedFetchTelemetry(f, firstUnreturnedRecordIdx, waitStartTime) @@ -499,6 +534,10 @@ func (r *concurrentFetchers) run(ctx context.Context, wants chan fetchWant, logg attemptSpan.SetTag("attempt", attempt) f := r.fetchSingle(ctx, w) + + // We increase the count of buffered records as soon as we fetch them. + r.bufferedFetchedRecords.Add(int64(len(f.Records))) + f = f.Merge(previousResult) previousResult = f if f.Err != nil { @@ -586,18 +625,39 @@ func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concu go r.run(ctx, wants, logger, highWatermark) } + // We need to make sure we don't leak any goroutine given that start is called within a goroutine. + defer r.wg.Done() + var ( - nextFetch = fetchWantFrom(startOffset, recordsPerFetch) - nextResult chan fetchResult + // nextFetch is the next records fetch operation we want to issue to one of the running workers. + // It contains the offset range to fetch and a channel where the result should be written to. + nextFetch = fetchWantFrom(startOffset, recordsPerFetch) + + // nextResult is the channel where we expect a worker will write the result of the next fetch + // operation. This result is the next result that will be returned to PollFetches(), guaranteeing + // records ordering. + nextResult chan fetchResult + + // pendingResults is the list of all fetchResult of all inflight fetch operations. Pending results + // are ordered in the same order these results should be returned to PollFetches(), so the first one + // in the list is the next one that should be returned, unless nextResult is valued (if nextResult + // is valued, then nextResult is the next and the first item in the pendingResults list is the + // 2nd next one). pendingResults = list.New() - bufferedResult fetchResult - readyBufferedResults chan fetchResult // this is non-nil when bufferedResult is non-empty + // bufferedResult is the next fetch that should be polled by PollFetches(). + bufferedResult fetchResult + + // readyBufferedResults channel gets continuously flipped between nil and the actual channel + // where PollFetches() reads from. This channel is nil when there are no ordered buffered + // records ready to be written to the channel where PollFetches(), and is non-nil when there + // are some ordered buffered records ready. + // + // It is guaranteed that this channel is non-nil when bufferedResult is non-empty. + readyBufferedResults chan fetchResult ) nextFetch.bytesPerRecord = 10_000 // start with an estimation, we will update it as we consume - // We need to make sure we don't leak any goroutine given that start is called within a goroutine. - defer r.wg.Done() for { refillBufferedResult := nextResult if readyBufferedResults != nil { @@ -641,6 +701,10 @@ func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concu continue } nextFetch = nextFetch.UpdateBytesPerRecord(result.fetchedBytes, len(result.Records)) + + // We have some ordered records ready to be sent to PollFetches(). We store the fetch + // result in bufferedResult, and we flip readyBufferedResults to the channel used by + // PollFetches(). bufferedResult = result readyBufferedResults = r.orderedFetches diff --git a/pkg/storage/ingest/fetcher_test.go b/pkg/storage/ingest/fetcher_test.go index 89b85ffc68c..f96f27d5b47 100644 --- a/pkg/storage/ingest/fetcher_test.go +++ b/pkg/storage/ingest/fetcher_test.go @@ -13,6 +13,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/concurrency" "github.com/grafana/dskit/services" + "github.com/grafana/dskit/test" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -303,9 +304,12 @@ func TestConcurrentFetchers(t *testing.T) { assert.Zero(t, fetches.NumRecords()) assert.Error(t, fetchCtx.Err(), "Expected context to be cancelled") + assert.Zero(t, fetchers.BufferedRecords()) }) t.Run("cold replay", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -321,9 +325,14 @@ func TestConcurrentFetchers(t *testing.T) { fetches, _ := fetchers.PollFetches(ctx) assert.Equal(t, fetches.NumRecords(), 5) + + // We expect no more records returned by PollFetches() and no buffered records. + pollFetchesAndAssertNoRecords(t, fetchers) }) t.Run("fetch records produced after startup", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -339,6 +348,9 @@ func TestConcurrentFetchers(t *testing.T) { fetches, _ := fetchers.PollFetches(ctx) assert.Equal(t, fetches.NumRecords(), 3) + + // We expect no more records returned by PollFetches() and no buffered records. + pollFetchesAndAssertNoRecords(t, fetchers) }) t.Run("slow processing of fetches", func(t *testing.T) { @@ -357,26 +369,42 @@ func TestConcurrentFetchers(t *testing.T) { var wg sync.WaitGroup wg.Add(1) + go func() { defer wg.Done() consumedRecords := 0 for consumedRecords < 10 { fetches, _ := fetchers.PollFetches(ctx) - time.Sleep(1000 * time.Millisecond) // Simulate slow processing consumedRecords += fetches.NumRecords() + + // Simulate slow processing. + time.Sleep(200 * time.Millisecond) } assert.Equal(t, 10, consumedRecords) }() - // Produce more records while processing is slow - for i := 5; i < 10; i++ { - produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("record-%d", i))) - } + // Slowly produce more records while processing is slow too. This increase the chances + // of progressive fetches done by the consumer. + wg.Add(1) + + go func() { + defer wg.Done() + + for i := 5; i < 10; i++ { + produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("record-%d", i))) + time.Sleep(200 * time.Millisecond) + } + }() wg.Wait() + + // We expect no more records returned by PollFetches() and no buffered records. + pollFetchesAndAssertNoRecords(t, fetchers) }) t.Run("fast processing of fetches", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -390,25 +418,27 @@ func TestConcurrentFetchers(t *testing.T) { produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("record-%d", i))) } - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - consumedRecords := 0 - for consumedRecords < 10 { - fetches, _ := fetchers.PollFetches(ctx) - consumedRecords += fetches.NumRecords() - // no processing delay - } - assert.Equal(t, 10, consumedRecords) - }() + // Consume all expected records. + consumedRecords := 0 + for consumedRecords < 10 { + fetches, _ := fetchers.PollFetches(ctx) + consumedRecords += fetches.NumRecords() + } + assert.Equal(t, 10, consumedRecords) - wg.Wait() + // We expect no more records returned by PollFetches() and no buffered records. + pollFetchesAndAssertNoRecords(t, fetchers) }) t.Run("fetch with different concurrency levels", func(t *testing.T) { + t.Parallel() + for _, concurrency := range []int{1, 2, 4} { + concurrency := concurrency + t.Run(fmt.Sprintf("concurrency-%d", concurrency), func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -429,11 +459,16 @@ func TestConcurrentFetchers(t *testing.T) { } assert.Equal(t, 20, totalRecords) + + // We expect no more records returned by PollFetches() and no buffered records. + pollFetchesAndAssertNoRecords(t, fetchers) }) } }) t.Run("start from mid-stream offset", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -472,9 +507,14 @@ func TestConcurrentFetchers(t *testing.T) { "new-record-1", "new-record-2", }, fetchedRecordsContents) + + // We expect no more records returned by PollFetches() and no buffered records. + pollFetchesAndAssertNoRecords(t, fetchers) }) t.Run("synchronous produce and fetch", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -507,10 +547,15 @@ func TestConcurrentFetchers(t *testing.T) { // Verify fetched records assert.Equal(t, expectedRecords, fetchedRecords, "Fetched records in round %d do not match expected", round) + + // We expect no more records returned by PollFetches() and no buffered records. + pollFetchesAndAssertNoRecords(t, fetchers) } }) t.Run("concurrency can be updated", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) defer cancel() rec1 := []byte("record-1") @@ -548,10 +593,13 @@ func TestConcurrentFetchers(t *testing.T) { fetchers.Update(ctx, 10, 10) produceRecordAndAssert(rec3) + // We expect no more records returned by PollFetches() and no buffered records. + pollFetchesAndAssertNoRecords(t, fetchers) }) t.Run("update concurrency with continuous production", func(t *testing.T) { t.Parallel() + const ( testDuration = 10 * time.Second produceInterval = 10 * time.Millisecond @@ -633,6 +681,9 @@ func TestConcurrentFetchers(t *testing.T) { "Record %d has unexpected content: %s", i, string(record.Value)) } + // We expect no more records returned by PollFetches() and no buffered records. + pollFetchesAndAssertNoRecords(t, fetchers) + // Log some statistics t.Logf("Total produced: %d, Total fetched: %d", totalProduced, totalFetched) t.Logf("Fetched with initial concurrency: %d", initialFetched) @@ -642,6 +693,7 @@ func TestConcurrentFetchers(t *testing.T) { t.Run("consume from end and update immediately", func(t *testing.T) { t.Parallel() + const ( initialRecords = 100 additionalRecords = 50 @@ -694,6 +746,9 @@ func TestConcurrentFetchers(t *testing.T) { "Record %d has unexpected content: %s", i, string(record.Value)) } + // We expect no more records returned by PollFetches() and no buffered records. + pollFetchesAndAssertNoRecords(t, fetchers) + // Log some statistics t.Logf("Total records produced: %d", initialRecords+additionalRecords) t.Logf("Records produced after start: %d", additionalRecords) @@ -756,6 +811,9 @@ func TestConcurrentFetchers(t *testing.T) { } assert.Equal(t, producedRecordsBytes, fetchedRecordsBytes) + + // We expect no more records returned by PollFetches() and no buffered records. + pollFetchesAndAssertNoRecords(t, fetchers) }) t.Run("staggered production with one less than multiple of concurrency and records per fetch", func(t *testing.T) { @@ -823,6 +881,9 @@ func TestConcurrentFetchers(t *testing.T) { return nil, errors.New("mocked error"), true }) + + // We expect no more records returned by PollFetches() and no buffered records. + pollFetchesAndAssertNoRecords(t, fetchers) }) t.Run("fetchers do not request offset beyond high watermark", func(t *testing.T) { @@ -898,9 +959,14 @@ func TestConcurrentFetchers(t *testing.T) { // Verify the number and content of fetched records assert.Equal(t, producedRecordsBytes, fetchedRecordsBytes, "Should fetch all produced records") + + // We expect no more records returned by PollFetches() and no buffered records. + pollFetchesAndAssertNoRecords(t, fetchers) }) t.Run("starting to run against a broken broker fails creating the fetchers", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -923,7 +989,9 @@ func TestConcurrentFetchers(t *testing.T) { logger := log.NewNopLogger() reg := prometheus.NewPedanticRegistry() - metrics := newReaderMetrics(partitionID, reg) + metrics := newReaderMetrics(partitionID, reg, func() float64 { + return 0 + }) client := newKafkaProduceClient(t, clusterAddr) @@ -955,12 +1023,43 @@ func TestConcurrentFetchers(t *testing.T) { assert.ErrorContains(t, err, "failed to find topic ID") assert.ErrorIs(t, err, mockErr) }) + + t.Run("should reset the buffered records count when stopping", func(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName) + client := newKafkaProduceClient(t, clusterAddr) + + fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, concurrency, recordsPerFetch) + + // Produce some records. + for i := 0; i < 10; i++ { + produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("record-%d", i))) + } + + // We are not consuming the records, so we expect the count of buffered records to increase. + // The actual number of buffered records may change due to concurrency, so we just check + // that there are some buffered records. + test.Poll(t, time.Second, true, func() interface{} { + return fetchers.BufferedRecords() > 0 + }) + + // Stop the fetchers. + fetchers.Stop() + + // Even if there were some buffered records we expect the count to be reset to 0 when stopping + // because the Stop() intentionally discard any buffered record. + require.Zero(t, fetchers.BufferedRecords()) + }) } func createConcurrentFetchers(ctx context.Context, t *testing.T, client *kgo.Client, topic string, partition int32, startOffset int64, concurrency, recordsPerFetch int) *concurrentFetchers { logger := log.NewNopLogger() reg := prometheus.NewPedanticRegistry() - metrics := newReaderMetrics(partition, reg) + metrics := newReaderMetrics(partition, reg, func() float64 { return 1 }) // This instantiates the fields of kprom. // This is usually done by franz-go, but since now we use the metrics ourselves, we need to instantiate the metrics ourselves. @@ -993,6 +1092,33 @@ func createConcurrentFetchers(ctx context.Context, t *testing.T, client *kgo.Cli return f } +// pollFetchesAndAssertNoRecords ensures that PollFetches() returns 0 records and there are +// no buffered records in fetchers. Since some records are discarded in the PollFetches(), +// we may have to call it multiple times to process all buffered records that need to be +// discarded. +func pollFetchesAndAssertNoRecords(t *testing.T, fetchers *concurrentFetchers) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + for { + fetches, returnCtx := fetchers.PollFetches(ctx) + if errors.Is(returnCtx.Err(), context.DeadlineExceeded) { + break + } + + // We always expect that PollFetches() returns zero records. + require.Len(t, fetches.Records(), 0) + + // If there are no buffered records, we're good. We can end the assertion. + if fetchers.BufferedRecords() == 0 { + return + } + } + + // We stopped polling fetches. We have to make sure there are no buffered records. + require.Zero(t, fetchers.BufferedRecords()) +} + type waiterFunc func() func (w waiterFunc) Wait() { w() } diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index f7000f5fbe1..43c4e664306 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -79,8 +79,11 @@ type PartitionReader struct { consumerGroup string concurrentFetchersMinBytesMaxWaitTime time.Duration - client *kgo.Client - fetcher fetcher + // client and fetcher are both start after PartitionReader creation. Fetcher could also be + // replaced during PartitionReader lifetime. To avoid concurrency issues with functions + // getting their pointers (e.g. BufferedRecords()) we use atomic to protect. + client atomic.Pointer[kgo.Client] + fetcher atomic.Pointer[fetcher] newConsumer consumerFactory metrics readerMetrics @@ -110,13 +113,14 @@ func newPartitionReader(kafkaCfg KafkaConfig, partitionID int32, instanceID stri partitionID: partitionID, newConsumer: consumer, consumerGroup: kafkaCfg.GetConsumerGroup(instanceID, partitionID), - metrics: newReaderMetrics(partitionID, reg), consumedOffsetWatcher: newPartitionOffsetWatcher(), concurrentFetchersMinBytesMaxWaitTime: defaultMinBytesMaxWaitTime, logger: log.With(logger, "partition", partitionID), reg: reg, } + r.metrics = newReaderMetrics(partitionID, reg, func() float64 { return float64(r.BufferedRecords()) }) + r.Service = services.NewBasicService(r.start, r.run, r.stop) return r, nil } @@ -131,6 +135,20 @@ func (r *PartitionReader) Update(_ context.Context, _, _ int) { // Given the partition reader has no concurrency it doesn't support updates. } +func (r *PartitionReader) BufferedRecords() int64 { + var fcount, ccount int64 + + if f := r.getFetcher(); f != nil && f != r { + fcount = f.BufferedRecords() + } + + if c := r.client.Load(); c != nil { + ccount = c.BufferedFetchRecords() + } + + return fcount + ccount +} + func (r *PartitionReader) start(ctx context.Context) (returnErr error) { if r.kafkaCfg.AutoCreateTopicEnabled { setDefaultNumberOfPartitionsForAutocreatedTopics(r.kafkaCfg, r.logger) @@ -154,14 +172,15 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) { } // Create a Kafka client without configuring any partition to consume (it will be done later). - r.client, err = NewKafkaReaderClient(r.kafkaCfg, r.metrics.kprom, r.logger) + client, err := NewKafkaReaderClient(r.kafkaCfg, r.metrics.kprom, r.logger) if err != nil { return errors.Wrap(err, "creating kafka reader client") } + r.client.Store(client) - r.committer = newPartitionCommitter(r.kafkaCfg, kadm.NewClient(r.client), r.partitionID, r.consumerGroup, r.logger, r.reg) + r.committer = newPartitionCommitter(r.kafkaCfg, kadm.NewClient(r.client.Load()), r.partitionID, r.consumerGroup, r.logger, r.reg) - offsetsClient := newPartitionOffsetClient(r.client, r.kafkaCfg.Topic, r.reg, r.logger) + offsetsClient := newPartitionOffsetClient(r.client.Load(), r.kafkaCfg.Topic, r.reg, r.logger) // It's ok to have the start offset slightly outdated. // We only need this offset accurate if we fall behind or if we start and the log gets truncated from beneath us. @@ -194,26 +213,26 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) { // // To make it happen, we do pause the fetching first and then we configure consumption. The consumption // will be kept paused until the explicit ResumeFetchPartitions() is called. - r.client.PauseFetchPartitions(map[string][]int32{ + r.client.Load().PauseFetchPartitions(map[string][]int32{ r.kafkaCfg.Topic: {r.partitionID}, }) - r.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{ + r.client.Load().AddConsumePartitions(map[string]map[int32]kgo.Offset{ r.kafkaCfg.Topic: {r.partitionID: kgo.NewOffset().At(startOffset)}, }) - f, err := newConcurrentFetchers(ctx, r.client, r.logger, r.kafkaCfg.Topic, r.partitionID, startOffset, r.kafkaCfg.StartupFetchConcurrency, r.kafkaCfg.StartupRecordsPerFetch, r.kafkaCfg.UseCompressedBytesAsFetchMaxBytes, r.concurrentFetchersMinBytesMaxWaitTime, offsetsClient, startOffsetReader, &r.metrics) + f, err := newConcurrentFetchers(ctx, r.client.Load(), r.logger, r.kafkaCfg.Topic, r.partitionID, startOffset, r.kafkaCfg.StartupFetchConcurrency, r.kafkaCfg.StartupRecordsPerFetch, r.kafkaCfg.UseCompressedBytesAsFetchMaxBytes, r.concurrentFetchersMinBytesMaxWaitTime, offsetsClient, startOffsetReader, &r.metrics) if err != nil { return errors.Wrap(err, "creating concurrent fetchers during startup") } - r.fetcher = f + r.setFetcher(f) } else { // When concurrent fetch is disabled we read records directly from the Kafka client, so we want it // to consume the partition. - r.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{ + r.client.Load().AddConsumePartitions(map[string]map[int32]kgo.Offset{ r.kafkaCfg.Topic: {r.partitionID: kgo.NewOffset().At(startOffset)}, }) - r.fetcher = r + r.setFetcher(r) } // Enforce the max consumer lag (if enabled). @@ -243,12 +262,12 @@ func (r *PartitionReader) stopDependencies() error { } } - if r.fetcher != nil { - r.fetcher.Stop() + if f := r.getFetcher(); f != nil { + f.Stop() } - if r.client != nil { - r.client.Close() + if c := r.client.Load(); c != nil { + c.Close() } return nil @@ -278,12 +297,12 @@ func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) { if r.kafkaCfg.StartupFetchConcurrency > 0 && r.kafkaCfg.OngoingFetchConcurrency > 0 { // No need to switch the fetcher, just update the records per fetch. - r.fetcher.Update(ctx, r.kafkaCfg.OngoingFetchConcurrency, r.kafkaCfg.OngoingRecordsPerFetch) + r.getFetcher().Update(ctx, r.kafkaCfg.OngoingFetchConcurrency, r.kafkaCfg.OngoingRecordsPerFetch) return } if r.kafkaCfg.StartupFetchConcurrency > 0 && r.kafkaCfg.OngoingFetchConcurrency == 0 { - if r.fetcher == r { + if r.getFetcher() == r { // This method has been called before, no need to switch the fetcher. return } @@ -291,11 +310,11 @@ func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) { level.Info(r.logger).Log("msg", "partition reader is switching to non-concurrent fetcher") // Stop the current fetcher before replacing it. - r.fetcher.Stop() + r.getFetcher().Stop() // We need to switch to franz-go for ongoing fetches. // If we've already fetched some records, we should discard them from franz-go and start from the last consumed offset. - r.fetcher = r + r.setFetcher(r) lastConsumed := r.consumedOffsetWatcher.LastConsumedOffset() if lastConsumed == -1 { @@ -303,7 +322,7 @@ func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) { // // The franz-go client is initialized to start consuming from the same place as the other fetcher. // We can just use the client, but we have to resume the fetching because it was previously paused. - r.client.ResumeFetchPartitions(map[string][]int32{ + r.client.Load().ResumeFetchPartitions(map[string][]int32{ r.kafkaCfg.Topic: {r.partitionID}, }) return @@ -313,13 +332,13 @@ func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) { // from a clean setup and have the guarantee that we're not going to read any previously buffered record, // we do remove the partition consumption (this clears the buffer), then we resume the fetching and finally // we add the consumption back. - r.client.RemoveConsumePartitions(map[string][]int32{ + r.client.Load().RemoveConsumePartitions(map[string][]int32{ r.kafkaCfg.Topic: {r.partitionID}, }) - r.client.ResumeFetchPartitions(map[string][]int32{ + r.client.Load().ResumeFetchPartitions(map[string][]int32{ r.kafkaCfg.Topic: {r.partitionID}, }) - r.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{ + r.client.Load().AddConsumePartitions(map[string]map[int32]kgo.Offset{ // Resume from the next unconsumed offset. r.kafkaCfg.Topic: {r.partitionID: kgo.NewOffset().At(lastConsumed + 1)}, }) @@ -327,7 +346,7 @@ func (r *PartitionReader) switchToOngoingFetcher(ctx context.Context) { } func (r *PartitionReader) processNextFetches(ctx context.Context, delayObserver prometheus.Observer) error { - fetches, fetchCtx := r.fetcher.PollFetches(ctx) + fetches, fetchCtx := r.getFetcher().PollFetches(ctx) // Propagate the fetching span to consuming the records. ctx = opentracing.ContextWithSpan(ctx, opentracing.SpanFromContext(fetchCtx)) r.recordFetchesMetrics(fetches, delayObserver) @@ -799,11 +818,24 @@ func (r *PartitionReader) waitReadConsistency(ctx context.Context, withOffset bo return err } +func (r *PartitionReader) setFetcher(f fetcher) { + r.fetcher.Store(&f) +} + +func (r *PartitionReader) getFetcher() fetcher { + pointer := r.fetcher.Load() + if pointer == nil { + return nil + } + + return *pointer +} + func (r *PartitionReader) PollFetches(ctx context.Context) (result kgo.Fetches, fetchContext context.Context) { defer func(start time.Time) { r.metrics.fetchWaitDuration.Observe(time.Since(start).Seconds()) }(time.Now()) - return r.client.PollFetches(ctx), ctx + return r.client.Load().PollFetches(ctx), ctx } type partitionCommitter struct { @@ -937,6 +969,7 @@ func (r *partitionCommitter) stop(error) error { } type readerMetrics struct { + bufferedFetchedRecords prometheus.GaugeFunc receiveDelayWhenStarting prometheus.Observer receiveDelayWhenRunning prometheus.Observer recordsPerFetch prometheus.Histogram @@ -950,7 +983,7 @@ type readerMetrics struct { kprom *kprom.Metrics } -func newReaderMetrics(partitionID int32, reg prometheus.Registerer) readerMetrics { +func newReaderMetrics(partitionID int32, reg prometheus.Registerer, bufferedRecordsCollector func() float64) readerMetrics { const component = "partition-reader" receiveDelay := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ @@ -973,6 +1006,10 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer) readerMetric lastConsumedOffset.Set(-1) return readerMetrics{ + bufferedFetchedRecords: promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{ + Name: "cortex_ingest_storage_reader_buffered_fetched_records", + Help: "The number of records fetched from Kafka by both concurrent fetchers and the kafka client but not yet processed.", + }, bufferedRecordsCollector), receiveDelayWhenStarting: receiveDelay.WithLabelValues("starting"), receiveDelayWhenRunning: receiveDelay.WithLabelValues("running"), recordsPerFetch: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ diff --git a/pkg/storage/ingest/reader_test.go b/pkg/storage/ingest/reader_test.go index 29aad7b5b5c..432d285c8b3 100644 --- a/pkg/storage/ingest/reader_test.go +++ b/pkg/storage/ingest/reader_test.go @@ -1793,74 +1793,125 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { }) } -func TestPartitionReader_ShouldNotBufferRecordsInTheKafkaClientWhenConcurrentFetchIsEnabled(t *testing.T) { +func TestPartitionReader_ShouldNotBufferRecordsInTheKafkaClientWhenDone(t *testing.T) { const ( topicName = "test" partitionID = 1 ) - var ( - ctx = context.Background() - _, clusterAddr = testkafka.CreateCluster(t, partitionID+1, topicName) - consumedRecordsMx sync.Mutex - consumedRecords []string - ) + tc := map[string]struct { + concurrencyVariant []readerTestCfgOpt + expectedBufferedRecords int + expectedBufferedRecordsFromClient int + }{ + "without concurrency": { + concurrencyVariant: []readerTestCfgOpt{withStartupConcurrency(0), withOngoingConcurrency(0)}, + expectedBufferedRecords: 1, + expectedBufferedRecordsFromClient: 1, + }, + "with startup concurrency": { + concurrencyVariant: []readerTestCfgOpt{withStartupConcurrency(2), withOngoingConcurrency(0)}, + expectedBufferedRecords: 1, + expectedBufferedRecordsFromClient: 1, + }, + "with startup and ongoing concurrency": { + concurrencyVariant: []readerTestCfgOpt{withStartupConcurrency(2), withOngoingConcurrency(2)}, + expectedBufferedRecords: 1, + expectedBufferedRecordsFromClient: 0, + }, + "with startup and ongoing concurrency (different settings)": { + concurrencyVariant: []readerTestCfgOpt{withStartupConcurrency(2), withOngoingConcurrency(4)}, + expectedBufferedRecords: 1, + expectedBufferedRecordsFromClient: 0, + }, + } - consumer := consumerFunc(func(_ context.Context, records []record) error { - consumedRecordsMx.Lock() - defer consumedRecordsMx.Unlock() + for concurrencyName, tt := range tc { + concurrencyVariant := tt.concurrencyVariant - for _, r := range records { - consumedRecords = append(consumedRecords, string(r.content)) - } - return nil - }) + t.Run(concurrencyName, func(t *testing.T) { + t.Parallel() - // Produce some records. - writeClient := newKafkaProduceClient(t, clusterAddr) - produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-1")) - produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-2")) - t.Log("produced 2 records") - - // Create and start the reader. - reg := prometheus.NewPedanticRegistry() - logs := &concurrency.SyncBuffer{} - reader := createReader(t, clusterAddr, topicName, partitionID, consumer, - withConsumeFromPositionAtStartup(consumeFromStart), - withTargetAndMaxConsumerLagAtStartup(time.Second, 2*time.Second), - withRegistry(reg), - withLogger(log.NewLogfmtLogger(logs)), - // Enable both startup and ongoing fetch concurrency. - withStartupConcurrency(2), - withOngoingConcurrency(2)) - - require.NoError(t, reader.StartAsync(ctx)) - t.Cleanup(func() { - require.NoError(t, services.StopAndAwaitTerminated(ctx, reader)) - }) + var ( + ctx = context.Background() + _, clusterAddr = testkafka.CreateCluster(t, partitionID+1, topicName) + consumedRecordsMx sync.Mutex + consumedRecords []string + blocked = atomic.NewBool(false) + ) + + consumer := consumerFunc(func(_ context.Context, records []record) error { + if blocked.Load() { + blockedTicker := time.NewTicker(100 * time.Millisecond) + defer blockedTicker.Stop() + outer: + for { + select { + case <-blockedTicker.C: + if !blocked.Load() { + break outer + } + case <-time.After(3 * time.Second): + // This is basically a test failure as we never finish the test in time. + t.Log("failed to finish unblocking the consumer in time") + return nil + } + } + } - // We expect the reader to catch up, and then switch to Running state. - test.Poll(t, 5*time.Second, services.Running, func() interface{} { - return reader.State() - }) + consumedRecordsMx.Lock() + defer consumedRecordsMx.Unlock() + for _, r := range records { + consumedRecords = append(consumedRecords, string(r.content)) + } + return nil + }) - // We expect the reader to have switched to running because target consumer lag has been honored. - assert.Contains(t, logs.String(), "partition reader consumed partition and current lag is lower than configured target consumer lag") + // Produce some records. + writeClient := newKafkaProduceClient(t, clusterAddr) + produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-1")) + produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-2")) + t.Log("produced 2 records") - // We expect the reader to have consumed the partition from start. - test.Poll(t, time.Second, []string{"record-1", "record-2"}, func() interface{} { - consumedRecordsMx.Lock() - defer consumedRecordsMx.Unlock() - return slices.Clone(consumedRecords) - }) + // Create and start the reader. + reg := prometheus.NewPedanticRegistry() + logs := &concurrency.SyncBuffer{} + + readerOpts := append([]readerTestCfgOpt{ + withConsumeFromPositionAtStartup(consumeFromStart), + withTargetAndMaxConsumerLagAtStartup(time.Second, 2*time.Second), + withRegistry(reg), + withLogger(log.NewLogfmtLogger(logs)), + }, concurrencyVariant...) + + reader := createReader(t, clusterAddr, topicName, partitionID, consumer, readerOpts...) + require.NoError(t, reader.StartAsync(ctx)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, reader)) + }) + + // We expect the reader to catch up, and then switch to Running state. + test.Poll(t, 5*time.Second, services.Running, func() interface{} { + return reader.State() + }) + + // We expect the reader to have switched to running because target consumer lag has been honored. + assert.Contains(t, logs.String(), "partition reader consumed partition and current lag is lower than configured target consumer lag") + + // We expect the reader to have consumed the partition from start. + test.Poll(t, time.Second, []string{"record-1", "record-2"}, func() interface{} { + consumedRecordsMx.Lock() + defer consumedRecordsMx.Unlock() + return slices.Clone(consumedRecords) + }) - // Wait some time to give some time for the Kafka client to eventually read and buffer records. - // We don't expect it, but to make sure it's not happening we have to give it some time. - time.Sleep(time.Second) + // Wait some time to give some time for the Kafka client to eventually read and buffer records. + // We don't expect it, but to make sure it's not happening we have to give it some time. + time.Sleep(time.Second) - // We expect the last consumed offset to be tracked in a metric, and there are no buffered records reported. - test.Poll(t, time.Second, nil, func() interface{} { - return promtest.GatherAndCompare(reg, strings.NewReader(` + // We expect the last consumed offset to be tracked in a metric, and there are no buffered records reported. + test.Poll(t, time.Second, nil, func() interface{} { + return promtest.GatherAndCompare(reg, strings.NewReader(` # HELP cortex_ingest_storage_reader_last_consumed_offset The last offset successfully consumed by the partition reader. Set to -1 if not offset has been consumed yet. # TYPE cortex_ingest_storage_reader_last_consumed_offset gauge cortex_ingest_storage_reader_last_consumed_offset{partition="1"} 1 @@ -1868,28 +1919,56 @@ func TestPartitionReader_ShouldNotBufferRecordsInTheKafkaClientWhenConcurrentFet # HELP cortex_ingest_storage_reader_buffered_fetch_records_total Total number of records buffered within the client ready to be consumed # TYPE cortex_ingest_storage_reader_buffered_fetch_records_total gauge cortex_ingest_storage_reader_buffered_fetch_records_total{component="partition-reader"} 0 - `), "cortex_ingest_storage_reader_last_consumed_offset", "cortex_ingest_storage_reader_buffered_fetch_records_total") - }) - // Produce more records after the reader has started. - produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-3")) - produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-4")) - t.Log("produced 2 records") + # HELP cortex_ingest_storage_reader_buffered_fetched_records The number of records fetched from Kafka by both concurrent fetchers and the kafka client but not yet processed. + # TYPE cortex_ingest_storage_reader_buffered_fetched_records gauge + cortex_ingest_storage_reader_buffered_fetched_records 0 + `), "cortex_ingest_storage_reader_last_consumed_offset", "cortex_ingest_storage_reader_buffered_fetch_records_total", "cortex_ingest_storage_reader_buffered_fetched_records") + }) - // We expect the reader to consume subsequent records too. - test.Poll(t, time.Second, []string{"record-1", "record-2", "record-3", "record-4"}, func() interface{} { - consumedRecordsMx.Lock() - defer consumedRecordsMx.Unlock() - return slices.Clone(consumedRecords) - }) + // Now, we want to assert that when the reader does have records buffered the metrics correctly reflect the current state. + // First, make the consumer block on the next consumption. + blocked.Store(true) + + // Now, produce more records after the reader has started. + produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-3")) + produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("record-4")) + t.Log("produced 2 records") + + // Now, we expect to have some records buffered. + test.Poll(t, time.Second, nil, func() interface{} { + return promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` + # HELP cortex_ingest_storage_reader_last_consumed_offset The last offset successfully consumed by the partition reader. Set to -1 if not offset has been consumed yet. + # TYPE cortex_ingest_storage_reader_last_consumed_offset gauge + cortex_ingest_storage_reader_last_consumed_offset{partition="1"} 1 + + # HELP cortex_ingest_storage_reader_buffered_fetch_records_total Total number of records buffered within the client ready to be consumed + # TYPE cortex_ingest_storage_reader_buffered_fetch_records_total gauge + cortex_ingest_storage_reader_buffered_fetch_records_total{component="partition-reader"} %d + + # HELP cortex_ingest_storage_reader_buffered_fetched_records The number of records fetched from Kafka by both concurrent fetchers and the kafka client but not yet processed. + # TYPE cortex_ingest_storage_reader_buffered_fetched_records gauge + cortex_ingest_storage_reader_buffered_fetched_records %d + `, tt.expectedBufferedRecordsFromClient, tt.expectedBufferedRecords)), "cortex_ingest_storage_reader_last_consumed_offset", "cortex_ingest_storage_reader_buffered_fetch_records_total", "cortex_ingest_storage_reader_buffered_fetched_records") + }) + + // With that assertion done, we can unblock records consumption. + blocked.Store(false) + + // We expect the reader to consume subsequent records too. + test.Poll(t, time.Second, []string{"record-1", "record-2", "record-3", "record-4"}, func() interface{} { + consumedRecordsMx.Lock() + defer consumedRecordsMx.Unlock() + return slices.Clone(consumedRecords) + }) - // Wait some time to give some time for the Kafka client to eventually read and buffer records. - // We don't expect it, but to make sure it's not happening we have to give it some time. - time.Sleep(time.Second) + // Wait some time to give some time for the Kafka client to eventually read and buffer records. + // We don't expect it, but to make sure it's not happening we have to give it some time. + time.Sleep(time.Second) - // We expect the last consumed offset to be tracked in a metric, and there are no buffered records reported. - test.Poll(t, time.Second, nil, func() interface{} { - return promtest.GatherAndCompare(reg, strings.NewReader(` + // We expect the last consumed offset to be tracked in a metric, and there are no buffered records reported. + test.Poll(t, time.Second, nil, func() interface{} { + return promtest.GatherAndCompare(reg, strings.NewReader(` # HELP cortex_ingest_storage_reader_last_consumed_offset The last offset successfully consumed by the partition reader. Set to -1 if not offset has been consumed yet. # TYPE cortex_ingest_storage_reader_last_consumed_offset gauge cortex_ingest_storage_reader_last_consumed_offset{partition="1"} 3 @@ -1897,8 +1976,26 @@ func TestPartitionReader_ShouldNotBufferRecordsInTheKafkaClientWhenConcurrentFet # HELP cortex_ingest_storage_reader_buffered_fetch_records_total Total number of records buffered within the client ready to be consumed # TYPE cortex_ingest_storage_reader_buffered_fetch_records_total gauge cortex_ingest_storage_reader_buffered_fetch_records_total{component="partition-reader"} 0 - `), "cortex_ingest_storage_reader_last_consumed_offset", "cortex_ingest_storage_reader_buffered_fetch_records_total") - }) + + # HELP cortex_ingest_storage_reader_buffered_fetched_records The number of records fetched from Kafka by both concurrent fetchers and the kafka client but not yet processed. + # TYPE cortex_ingest_storage_reader_buffered_fetched_records gauge + cortex_ingest_storage_reader_buffered_fetched_records 0 + `), "cortex_ingest_storage_reader_last_consumed_offset", "cortex_ingest_storage_reader_buffered_fetch_records_total", "cortex_ingest_storage_reader_buffered_fetched_records") + }) + }) + } +} + +func TestPartitionReader_ShouldNotPanicIfBufferedRecordsIsCalledBeforeStarting(t *testing.T) { + const ( + topicName = "test" + partitionID = 1 + ) + + _, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName) + reader := createReader(t, clusterAddr, topicName, partitionID, nil) + + require.Zero(t, reader.BufferedRecords()) } func TestPartitionReader_fetchLastCommittedOffset(t *testing.T) {