Skip to content

Commit

Permalink
Don't hold labels from store-gateways in two forms, and don't convert…
Browse files Browse the repository at this point in the history
… them multiple times (#9914) (#9930)

* Don't hold labels from store-gateways in two forms

* Don't retain labels longer than needed

* Don't convert mimirpb.LabelAdaptors to labels.Labels multiple times

* Add changelog entry

(cherry picked from commit d2367de)

Co-authored-by: Charles Korn <[email protected]>
  • Loading branch information
grafanabot and charleskorn authored Nov 18, 2024
1 parent c3b4ada commit cb2a2b6
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 38 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
* [ENHANCEMENT] PromQL: make `sort_by_label` stable. #9879
* [ENHANCEMENT] Distributor: Initialize ha_tracker cache before ha_tracker and distributor reach running state and begin serving writes. #9826
* [ENHANCEMENT] Ingester: `-ingest-storage.kafka.max-buffered-bytes` to limit the memory for buffered records when using concurrent fetching. #9892
* [ENHANCEMENT] Querier: improve performance and memory consumption of queries that select many series. #9914
* [BUGFIX] Fix issue where functions such as `rate()` over native histograms could return incorrect values if a float stale marker was present in the selected range. #9508
* [BUGFIX] Fix issue where negation of native histograms (eg. `-some_native_histogram_series`) did nothing. #9508
* [BUGFIX] Fix issue where `metric might not be a counter, name does not end in _total/_sum/_count/_bucket` annotation would be emitted even if `rate` or `increase` did not have enough samples to compute a result. #9508
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2557,7 +2557,7 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through

result := make([]labels.Labels, 0, len(metrics))
for _, m := range metrics {
if err := queryLimiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(m)); err != nil {
if err := queryLimiter.AddSeries(m); err != nil {
return nil, err
}
result = append(result, m)
Expand Down
10 changes: 6 additions & 4 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [

if len(resp.Timeseries) > 0 {
for _, series := range resp.Timeseries {
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
if limitErr := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(series.Labels)); limitErr != nil {
return ingesterQueryResult{}, limitErr
}
}
Expand All @@ -285,7 +285,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
}

for _, series := range resp.Chunkseries {
if err := queryLimiter.AddSeries(series.Labels); err != nil {
if err := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(series.Labels)); err != nil {
return ingesterQueryResult{}, err
}
}
Expand All @@ -300,7 +300,9 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
streamingSeriesCount += len(resp.StreamingSeries)

for _, s := range resp.StreamingSeries {
if err := queryLimiter.AddSeries(s.Labels); err != nil {
l := mimirpb.FromLabelAdaptersToLabels(s.Labels)

if err := queryLimiter.AddSeries(l); err != nil {
return ingesterQueryResult{}, err
}

Expand All @@ -313,7 +315,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
return ingesterQueryResult{}, err
}

labelsBatch = append(labelsBatch, mimirpb.FromLabelAdaptersToLabels(s.Labels))
labelsBatch = append(labelsBatch, l)
}

streamingSeriesBatches = append(streamingSeriesBatches, labelsBatch)
Expand Down
13 changes: 8 additions & 5 deletions pkg/querier/block_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/util/annotations"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/storage/series"
"github.com/grafana/mimir/pkg/storegateway/storegatewaypb"
Expand All @@ -31,7 +30,7 @@ import (

// Implementation of storage.SeriesSet, based on individual responses from store client.
type blockStreamingQuerierSeriesSet struct {
series []*storepb.StreamingSeries
series []labels.Labels
streamReader chunkStreamReader

// next response to process
Expand All @@ -55,18 +54,22 @@ func (bqss *blockStreamingQuerierSeriesSet) Next() bool {
return false
}

currLabels := bqss.series[bqss.nextSeriesIndex].Labels
currLabels := bqss.series[bqss.nextSeriesIndex]
seriesIdxStart := bqss.nextSeriesIndex // First series in this group. We might merge with more below.
bqss.nextSeriesIndex++

// Chunks may come in multiple responses, but as soon as the response has chunks for a new series,
// we can stop searching. Series are sorted. See documentation for StoreClient.Series call for details.
// The actually merging of chunks happens in the Iterator() call where chunks are fetched.
for bqss.nextSeriesIndex < len(bqss.series) && mimirpb.CompareLabelAdapters(currLabels, bqss.series[bqss.nextSeriesIndex].Labels) == 0 {
for bqss.nextSeriesIndex < len(bqss.series) && labels.Equal(currLabels, bqss.series[bqss.nextSeriesIndex]) {
bqss.nextSeriesIndex++
}

bqss.currSeries = newBlockStreamingQuerierSeries(mimirpb.FromLabelAdaptersToLabels(currLabels), seriesIdxStart, bqss.nextSeriesIndex-1, bqss.streamReader, bqss.chunkInfo, bqss.nextSeriesIndex >= len(bqss.series), bqss.remoteAddress)
bqss.currSeries = newBlockStreamingQuerierSeries(currLabels, seriesIdxStart, bqss.nextSeriesIndex-1, bqss.streamReader, bqss.chunkInfo, bqss.nextSeriesIndex >= len(bqss.series), bqss.remoteAddress)

// Clear any labels we no longer need, to allow them to be garbage collected when they're no longer needed elsewhere.
clear(bqss.series[seriesIdxStart : bqss.nextSeriesIndex-1])

return true
}

Expand Down
5 changes: 1 addition & 4 deletions pkg/querier/block_streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"go.uber.org/atomic"
"google.golang.org/grpc/metadata"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/storegateway/storepb"
"github.com/grafana/mimir/pkg/util/limiter"
Expand Down Expand Up @@ -166,9 +165,7 @@ func TestBlockStreamingQuerierSeriesSet(t *testing.T) {
t.Run(name, func(t *testing.T) {
ss := &blockStreamingQuerierSeriesSet{streamReader: &mockChunkStreamer{series: c.input, causeError: c.errorChunkStreamer}}
for _, s := range c.input {
ss.series = append(ss.series, &storepb.StreamingSeries{
Labels: mimirpb.FromLabelsToLabelAdapters(s.lbls),
})
ss.series = append(ss.series, s.lbls)
}
idx := 0
var it chunkenc.Iterator
Expand Down
30 changes: 18 additions & 12 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,9 +780,9 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
return err
}

// A storegateway client will only fill either of mySeries or myStreamingSeries, and not both.
// A storegateway client will only fill either of mySeries or myStreamingSeriesLabels, and not both.
mySeries := []*storepb.Series(nil)
myStreamingSeries := []*storepb.StreamingSeries(nil)
myStreamingSeriesLabels := []labels.Labels(nil)
var myWarnings annotations.Annotations
myQueriedBlocks := []ulid.ULID(nil)
indexBytesFetched := uint64(0)
Expand Down Expand Up @@ -813,7 +813,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
mySeries = append(mySeries, s)

// Add series fingerprint to query limiter; will return error if we are over the limit
if err := queryLimiter.AddSeries(s.Labels); err != nil {
if err := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(s.Labels)); err != nil {
return err
}

Expand Down Expand Up @@ -853,16 +853,22 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
}

if ss := resp.GetStreamingSeries(); ss != nil {
myStreamingSeriesLabels = slices.Grow(myStreamingSeriesLabels, len(ss.Series))

for _, s := range ss.Series {
// Add series fingerprint to query limiter; will return error if we are over the limit
if limitErr := queryLimiter.AddSeries(s.Labels); limitErr != nil {
l := mimirpb.FromLabelAdaptersToLabels(s.Labels)

if limitErr := queryLimiter.AddSeries(l); limitErr != nil {
return limitErr
}

myStreamingSeriesLabels = append(myStreamingSeriesLabels, l)
}
myStreamingSeries = append(myStreamingSeries, ss.Series...)

if ss.IsEndOfSeriesStream {
// If we aren't expecting any series from this stream, close it now.
if len(myStreamingSeries) == 0 {
if len(myStreamingSeriesLabels) == 0 {
util.CloseAndExhaust[*storepb.SeriesResponse](stream) //nolint:errcheck
}

Expand Down Expand Up @@ -904,13 +910,13 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
chunkInfo.EndSeries(i == len(mySeries)-1)
}
}
} else if len(myStreamingSeries) > 0 {
} else if len(myStreamingSeriesLabels) > 0 {
// FetchedChunks and FetchedChunkBytes are added by the SeriesChunksStreamReader.
reqStats.AddFetchedSeries(uint64(len(myStreamingSeries)))
streamReader = newStoreGatewayStreamReader(reqCtx, stream, len(myStreamingSeries), queryLimiter, reqStats, q.metrics, q.logger)
reqStats.AddFetchedSeries(uint64(len(myStreamingSeriesLabels)))
streamReader = newStoreGatewayStreamReader(reqCtx, stream, len(myStreamingSeriesLabels), queryLimiter, reqStats, q.metrics, q.logger)
level.Debug(log).Log("msg", "received streaming series from store-gateway",
"instance", c.RemoteAddress(),
"fetched series", len(myStreamingSeries),
"fetched series", len(myStreamingSeriesLabels),
"fetched index bytes", indexBytesFetched,
"requested blocks", strings.Join(convertULIDsToString(blockIDs), " "),
"queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " "))
Expand All @@ -925,12 +931,12 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
mtx.Lock()
if len(mySeries) > 0 {
seriesSets = append(seriesSets, &blockQuerierSeriesSet{series: mySeries})
} else if len(myStreamingSeries) > 0 {
} else if len(myStreamingSeriesLabels) > 0 {
if chunkInfo != nil {
chunkInfo.SetMsg("store-gateway streaming")
}
seriesSets = append(seriesSets, &blockStreamingQuerierSeriesSet{
series: myStreamingSeries,
series: myStreamingSeriesLabels,
streamReader: streamReader,
chunkInfo: chunkInfo,
remoteAddress: c.RemoteAddress(),
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/limiter/query_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"context"
"sync"

"github.com/prometheus/prometheus/model/labels"
"go.uber.org/atomic"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/util/validation"
)
Expand Down Expand Up @@ -74,12 +74,12 @@ func QueryLimiterFromContextWithFallback(ctx context.Context) *QueryLimiter {
}

// AddSeries adds the input series and returns an error if the limit is reached.
func (ql *QueryLimiter) AddSeries(seriesLabels []mimirpb.LabelAdapter) validation.LimitError {
func (ql *QueryLimiter) AddSeries(seriesLabels labels.Labels) validation.LimitError {
// If the max series is unlimited just return without managing map
if ql.maxSeriesPerQuery == 0 {
return nil
}
fingerprint := mimirpb.FromLabelAdaptersToLabels(seriesLabels).Hash()
fingerprint := seriesLabels.Hash()

ql.uniqueSeriesMx.Lock()
defer ql.uniqueSeriesMx.Unlock()
Expand Down
17 changes: 8 additions & 9 deletions pkg/util/limiter/query_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/querier/stats"
)

Expand All @@ -37,15 +36,15 @@ func TestQueryLimiter_AddSeries_ShouldReturnNoErrorOnLimitNotExceeded(t *testing
reg = prometheus.NewPedanticRegistry()
limiter = NewQueryLimiter(100, 0, 0, 0, stats.NewQueryMetrics(reg))
)
err := limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series1))
err := limiter.AddSeries(series1)
assert.NoError(t, err)
err = limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series2))
err = limiter.AddSeries(series2)
assert.NoError(t, err)
assert.Equal(t, 2, limiter.uniqueSeriesCount())
assertRejectedQueriesMetricValue(t, reg, 0, 0, 0, 0)

// Re-add previous series to make sure it's not double counted
err = limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series1))
err = limiter.AddSeries(series1)
assert.NoError(t, err)
assert.Equal(t, 2, limiter.uniqueSeriesCount())
assertRejectedQueriesMetricValue(t, reg, 0, 0, 0, 0)
Expand All @@ -72,21 +71,21 @@ func TestQueryLimiter_AddSeries_ShouldReturnErrorOnLimitExceeded(t *testing.T) {
reg = prometheus.NewPedanticRegistry()
limiter = NewQueryLimiter(1, 0, 0, 0, stats.NewQueryMetrics(reg))
)
err := limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series1))
err := limiter.AddSeries(series1)
require.NoError(t, err)
assertRejectedQueriesMetricValue(t, reg, 0, 0, 0, 0)

err = limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series2))
err = limiter.AddSeries(series2)
require.Error(t, err)
assertRejectedQueriesMetricValue(t, reg, 1, 0, 0, 0)

// Add the same series again and ensure that we don't increment the failed queries metric again.
err = limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series2))
err = limiter.AddSeries(series2)
require.Error(t, err)
assertRejectedQueriesMetricValue(t, reg, 1, 0, 0, 0)

// Add another series and ensure that we don't increment the failed queries metric again.
err = limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series3))
err = limiter.AddSeries(series3)
require.Error(t, err)
assertRejectedQueriesMetricValue(t, reg, 1, 0, 0, 0)
}
Expand Down Expand Up @@ -188,7 +187,7 @@ func BenchmarkQueryLimiter_AddSeries(b *testing.B) {
reg := prometheus.NewPedanticRegistry()
limiter := NewQueryLimiter(b.N+1, 0, 0, 0, stats.NewQueryMetrics(reg))
for _, s := range series {
err := limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(s))
err := limiter.AddSeries(s)
assert.NoError(b, err)
}
}
Expand Down

0 comments on commit cb2a2b6

Please sign in to comment.