diff --git a/promql/engine_test.go b/promql/engine_test.go index 5a4c68cee9..085e6e9f73 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -202,6 +202,10 @@ func (*errQuerier) LabelValues(string, ...*labels.Matcher) ([]string, storage.Wa return nil, nil, nil } +func (*errQuerier) LabelValuesStream(string, ...*labels.Matcher) storage.LabelValues { + return nil +} + func (*errQuerier) LabelNames(...*labels.Matcher) ([]string, storage.Warnings, error) { return nil, nil, nil } diff --git a/storage/fanout_test.go b/storage/fanout_test.go index b4490636df..57182addb7 100644 --- a/storage/fanout_test.go +++ b/storage/fanout_test.go @@ -237,6 +237,10 @@ func (errQuerier) LabelValues(string, ...*labels.Matcher) ([]string, storage.War return nil, nil, errors.New("label values error") } +func (errQuerier) LabelValuesStream(string, ...*labels.Matcher) storage.LabelValues { + return storage.ErrLabelValues(errors.New("label values stream error")) +} + func (errQuerier) LabelNames(...*labels.Matcher) ([]string, storage.Warnings, error) { return nil, nil, errors.New("label names error") } diff --git a/storage/interface.go b/storage/interface.go index 74ddc5acad..71131b2171 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -122,6 +122,10 @@ func (q *MockQuerier) LabelValues(string, ...*labels.Matcher) ([]string, Warning return nil, nil, nil } +func (q *MockQuerier) LabelValuesStream(string, ...*labels.Matcher) LabelValues { + return nil +} + func (q *MockQuerier) LabelNames(...*labels.Matcher) ([]string, Warnings, error) { return nil, nil, nil } @@ -159,6 +163,12 @@ type LabelQuerier interface { // to label values of metrics matching the matchers. LabelValues(name string, matchers ...*labels.Matcher) ([]string, Warnings, error) + // LabelValuesStream returns an iterator over all potential values for a label name. + // It is not safe to use the strings beyond the lifetime of the querier. + // If matchers are specified the returned result set is reduced + // to label values of metrics matching the matchers. + LabelValuesStream(name string, matchers ...*labels.Matcher) LabelValues + // LabelNames returns all the unique label names present in the block in sorted order. // If matchers are specified the returned result set is reduced // to label names of metrics matching the matchers. @@ -444,3 +454,17 @@ type ChunkIterable interface { } type Warnings []error + +// LabelValues is an iterator over label values. +type LabelValues interface { + // Next tries to advance the iterator and returns true if it could, false otherwise. + Next() bool + // At returns the current label value. + At() string + // Err is the error that iteration eventually failed with. + // When an error occurs, the iterator cannot continue. + Err() error + // Warnings is a collection of warnings that have occurred during iteration. + // Warnings could be non-empty even if iteration has not failed with error. + Warnings() Warnings +} diff --git a/storage/merge.go b/storage/merge.go index a196b0bc0d..229fd827b8 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -216,6 +216,153 @@ func mergeStrings(a, b []string) []string { return res } +// LabelValuesStream implements LabelQuerier. +func (q *mergeGenericQuerier) LabelValuesStream(name string, matchers ...*labels.Matcher) LabelValues { + if len(q.queriers) == 0 { + return EmptyLabelValues() + } + if len(q.queriers) == 1 { + return q.queriers[0].LabelValuesStream(name, matchers...) + } + + h := make(labelValuesHeap, 0, len(q.queriers)) + var ws Warnings + for i, sq := range q.queriers { + fmt.Printf("Calling sub-querier %d for name %q\n", i, name) + it := sq.LabelValuesStream(name, matchers...) + switch { + case it.Next(): + fmt.Printf("Got non-empty iterator %p from sub-querier %d\n", it, i) + h = append(h, it) + case it.Err() != nil: + return errLabelValues{ + err: it.Err(), + warnings: it.Warnings(), + } + case len(it.Warnings()) > 0: + // Iterator is immediately exhausted, but keep its warnings + ws = append(ws, it.Warnings()...) + default: + fmt.Printf("Got empty iterator from sub-querier %d\n", i) + } + } + + return &mergedLabelValues{ + h: h, + warnings: ws, + } +} + +// mergedLabelValues is a label values iterator merging a collection of sub-iterators. +type mergedLabelValues struct { + h labelValuesHeap + cur string + initialized bool + err error + warnings Warnings +} + +func (m *mergedLabelValues) Next() bool { + if m.h.Len() == 0 || m.err != nil { + return false + } + + if !m.initialized { + heap.Init(&m.h) + m.cur = m.h[0].At() + fmt.Printf("mergedLabelValues initializing: At the top of the heap is value %q\n", m.cur) + m.initialized = true + return true + } + + fmt.Printf("mergedLabelValues.Next() called after initialization\n") + for { + cur := m.h[0] + if !cur.Next() { + fmt.Printf("mergedLabelValues: Current iterator exhausted, popping it from heap\n") + heap.Pop(&m.h) + if len(cur.Warnings()) > 0 { + m.warnings = append(m.warnings, cur.Warnings()...) + } + if cur.Err() != nil { + m.err = cur.Err() + fmt.Printf("mergedLabelValues: Current iterator failed: %q\n", m.err) + return false + } + if m.h.Len() == 0 { + fmt.Printf("mergedLabelValues: Heap empty, returning false\n") + return false + } + } else { + // Heap top has changed, fix up + fmt.Printf("mergedLabelValues: Advanced heap top iterator, re-sorting heap\n") + heap.Fix(&m.h, 0) + } + + if m.h[0].At() != m.cur { + m.cur = m.h[0].At() + fmt.Printf("mergedLabelValues: Found new current label value: %q\n", m.cur) + return true + } else { + fmt.Printf("mergedLabelValues: Heap top's value is the same as the current one\n") + } + } +} + +func (m *mergedLabelValues) At() string { + return m.cur +} + +func (m *mergedLabelValues) Err() error { + return m.err +} + +func (m *mergedLabelValues) Warnings() Warnings { + return m.warnings +} + +// errLabelValues is an empty label values iterator with an error. +type errLabelValues struct { + err error + warnings Warnings +} + +func (e errLabelValues) Next() bool { return false } +func (e errLabelValues) At() string { return "" } +func (e errLabelValues) Err() error { return e.err } +func (e errLabelValues) Warnings() Warnings { return e.warnings } + +// ErrLabelValues returns a LabelValues with err. +func ErrLabelValues(err error) LabelValues { + return errLabelValues{err: err} +} + +var emptyLabelValues = errLabelValues{} + +// EmptyLabelValues returns an empty LabelValues. +func EmptyLabelValues() LabelValues { + return emptyLabelValues +} + +// labelValuesHeap is a heap of LabelValues iterators, sorted on label value. +type labelValuesHeap []LabelValues + +func (h labelValuesHeap) Len() int { return len(h) } +func (h labelValuesHeap) Less(i, j int) bool { return h[i].At() < h[j].At() } +func (h labelValuesHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h *labelValuesHeap) Push(x interface{}) { + *h = append(*h, x.(LabelValues)) +} + +func (h *labelValuesHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + // LabelNames returns all the unique label names present in all queriers in sorted order. func (q *mergeGenericQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, Warnings, error) { var ( diff --git a/storage/merge_test.go b/storage/merge_test.go index 82627d9871..f92f153016 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -1014,6 +1014,58 @@ func (m *mockGenericQuerier) LabelValues(name string, matchers ...*labels.Matche return m.resp, m.warnings, m.err } +func (m *mockGenericQuerier) LabelValuesStream(name string, matchers ...*labels.Matcher) LabelValues { + m.mtx.Lock() + m.labelNamesRequested = append(m.labelNamesRequested, labelNameRequest{ + name: name, + matchers: matchers, + }) + m.mtx.Unlock() + + if m.err == nil { + return newLabelValuesList(m.resp, m.warnings) + } + return errLabelValues{ + err: m.err, + warnings: m.warnings, + } +} + +type labelValuesList struct { + i int + values []string + warnings Warnings +} + +func newLabelValuesList(values []string, warnings Warnings) *labelValuesList { + return &labelValuesList{ + i: -1, + values: values, + warnings: warnings, + } +} + +func (l *labelValuesList) Next() bool { + if l.i >= len(l.values)-1 { + return false + } + + l.i++ + return true +} + +func (l *labelValuesList) At() string { + return l.values[l.i] +} + +func (*labelValuesList) Err() error { + return nil +} + +func (l *labelValuesList) Warnings() Warnings { + return l.warnings +} + func (m *mockGenericQuerier) LabelNames(...*labels.Matcher) ([]string, Warnings, error) { m.mtx.Lock() m.labelNamesCalls++ @@ -1081,8 +1133,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { expectedSelectsSeries []labels.Labels expectedLabels []string - expectedWarnings [4]Warnings - expectedErrs [4]error + expectedWarnings [5]Warnings + expectedErrs [5]error }{ {}, { @@ -1110,7 +1162,7 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { { name: "one failed primary querier", queriers: []genericQuerier{&mockGenericQuerier{warnings: nil, err: errStorage}}, - expectedErrs: [4]error{errStorage, errStorage, errStorage, errStorage}, + expectedErrs: [5]error{errStorage, errStorage, errStorage, errStorage, errStorage}, }, { name: "one successful primary querier with successful secondaries", @@ -1146,7 +1198,7 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { &secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: nil, err: nil}}, &secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"c"}, warnings: nil, err: nil}}, }, - expectedErrs: [4]error{errStorage, errStorage, errStorage, errStorage}, + expectedErrs: [5]error{errStorage, errStorage, errStorage, errStorage, errStorage}, }, { name: "one successful primary querier with failed secondaries", @@ -1159,7 +1211,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { labels.FromStrings("test", "a"), }, expectedLabels: []string{"a"}, - expectedWarnings: [4]Warnings{ + expectedWarnings: [5]Warnings{ + []error{errStorage, errStorage}, []error{errStorage, errStorage}, []error{errStorage, errStorage}, []error{errStorage, errStorage}, @@ -1177,7 +1230,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { labels.FromStrings("test", "b"), }, expectedLabels: []string{"a", "b"}, - expectedWarnings: [4]Warnings{ + expectedWarnings: [5]Warnings{ + []error{warnStorage, warnStorage}, []error{warnStorage, warnStorage}, []error{warnStorage, warnStorage}, []error{warnStorage, warnStorage}, @@ -1242,11 +1296,36 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { require.Equal(t, []labelNameRequest{{name: "test"}}, m.labelNamesRequested) } }) + t.Run("LabelValuesStream", func(t *testing.T) { + it := q.LabelValuesStream("test2") + require.NotNil(t, it) + var res []string + for it.Next() { + res = append(res, it.At()) + } + require.Equal(t, tcase.expectedWarnings[3], it.Warnings()) + err := it.Err() + require.True(t, errors.Is(err, tcase.expectedErrs[3]), "expected error doesn't match") + if err != nil { + return + } + + require.Equal(t, tcase.expectedLabels, res) + + for _, qr := range q.queriers { + m := unwrapMockGenericQuerier(t, qr) + + require.Equal(t, []labelNameRequest{ + {name: "test"}, + {name: "test2"}, + }, m.labelNamesRequested) + } + }) t.Run("LabelValuesWithMatchers", func(t *testing.T) { matcher := labels.MustNewMatcher(labels.MatchEqual, "otherLabel", "someValue") - res, w, err := q.LabelValues("test2", matcher) - require.Equal(t, tcase.expectedWarnings[3], w) - require.True(t, errors.Is(err, tcase.expectedErrs[3]), "expected error doesn't match") + res, w, err := q.LabelValues("test3", matcher) + require.Equal(t, tcase.expectedWarnings[4], w) + require.True(t, errors.Is(err, tcase.expectedErrs[4]), "expected error doesn't match") require.Equal(t, tcase.expectedLabels, res) if err != nil { @@ -1257,7 +1336,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { require.Equal(t, []labelNameRequest{ {name: "test"}, - {name: "test2", matchers: []*labels.Matcher{matcher}}, + {name: "test2"}, + {name: "test3", matchers: []*labels.Matcher{matcher}}, }, m.labelNamesRequested) } }) diff --git a/storage/noop.go b/storage/noop.go index 83953ca43f..1223572eb7 100644 --- a/storage/noop.go +++ b/storage/noop.go @@ -32,6 +32,10 @@ func (noopQuerier) LabelValues(string, ...*labels.Matcher) ([]string, Warnings, return nil, nil, nil } +func (noopQuerier) LabelValuesStream(string, ...*labels.Matcher) LabelValues { + return nil +} + func (noopQuerier) LabelNames(...*labels.Matcher) ([]string, Warnings, error) { return nil, nil, nil } @@ -55,6 +59,10 @@ func (noopChunkQuerier) LabelValues(string, ...*labels.Matcher) ([]string, Warni return nil, nil, nil } +func (noopChunkQuerier) LabelValuesStream(string, ...*labels.Matcher) LabelValues { + return nil +} + func (noopChunkQuerier) LabelNames(...*labels.Matcher) ([]string, Warnings, error) { return nil, nil, nil } diff --git a/storage/remote/read.go b/storage/remote/read.go index af61334f48..675378fd09 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -211,12 +211,18 @@ func (q querier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, []s return ms, names } -// LabelValues implements storage.Querier and is a noop. +// LabelValues implements storage.LabelQuerier and is a noop. func (q *querier) LabelValues(string, ...*labels.Matcher) ([]string, storage.Warnings, error) { // TODO: Implement: https://github.com/prometheus/prometheus/issues/3351 return nil, nil, errors.New("not implemented") } +// LabelValuesStream implements storage.LabelQuerier and is a noop. +func (q *querier) LabelValuesStream(string, ...*labels.Matcher) storage.LabelValues { + // TODO: Implement: https://github.com/prometheus/prometheus/issues/3351 + return storage.ErrLabelValues(errors.New("not implemented")) +} + // LabelNames implements storage.Querier and is a noop. func (q *querier) LabelNames(...*labels.Matcher) ([]string, storage.Warnings, error) { // TODO: Implement: https://github.com/prometheus/prometheus/issues/3351 diff --git a/storage/secondary.go b/storage/secondary.go index d66a286172..0138d07c2f 100644 --- a/storage/secondary.go +++ b/storage/secondary.go @@ -55,6 +55,37 @@ func (s *secondaryQuerier) LabelValues(name string, matchers ...*labels.Matcher) return vals, w, nil } +func (s *secondaryQuerier) LabelValuesStream(name string, matchers ...*labels.Matcher) LabelValues { + it := s.genericQuerier.LabelValuesStream(name, matchers...) + return &secondaryLabelValues{ + it: it, + } +} + +type secondaryLabelValues struct { + it LabelValues +} + +func (s *secondaryLabelValues) Next() bool { + return s.it.Next() +} + +func (s *secondaryLabelValues) At() string { + return s.it.At() +} + +func (s *secondaryLabelValues) Err() error { + return nil +} + +func (s *secondaryLabelValues) Warnings() Warnings { + ws := s.it.Warnings() + if s.it.Err() != nil { + ws = append(ws, s.it.Err()) + } + return ws +} + func (s *secondaryQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, Warnings, error) { names, w, err := s.genericQuerier.LabelNames(matchers...) if err != nil { diff --git a/tsdb/block.go b/tsdb/block.go index d1c75fc83a..bbb5e9b937 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -70,6 +70,9 @@ type IndexReader interface { // LabelValues returns possible label values which may not be sorted. LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) + // LabelValuesStream returns an iterator over matching label values. + LabelValuesStream(name string, matchers ...*labels.Matcher) storage.LabelValues + // Postings returns the postings list iterator for the label pairs. // The Postings here contain the offsets to the series inside the index. // Found IDs are not strictly required to point to a valid Series, e.g. @@ -501,6 +504,10 @@ func (r blockIndexReader) LabelValues(name string, matchers ...*labels.Matcher) return labelValuesWithMatchers(r.ir, name, matchers...) } +func (r blockIndexReader) LabelValuesStream(name string, matchers ...*labels.Matcher) storage.LabelValues { + return r.ir.LabelValuesStream(name, matchers...) +} + func (r blockIndexReader) LabelNames(matchers ...*labels.Matcher) ([]string, error) { if len(matchers) == 0 { return r.b.LabelNames() diff --git a/tsdb/block_test.go b/tsdb/block_test.go index ac2c0920e1..7cacebe3f2 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -257,7 +257,7 @@ func TestLabelValuesWithMatchers(t *testing.T) { matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "unique", "value[5-7]5")}, expectedValues: []string{"value5", "value6", "value7"}, }, { - name: "get tens by matching for absence of unique label", + name: "get tens by matching for presence of unique label", labelName: "tens", matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "unique", "")}, expectedValues: []string{"value0", "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9"}, @@ -278,6 +278,88 @@ func TestLabelValuesWithMatchers(t *testing.T) { } } +func TestLabelValuesStream_WithMatchers(t *testing.T) { + tmpdir := t.TempDir() + + var seriesEntries []storage.Series + for i := 0; i < 100; i++ { + seriesEntries = append(seriesEntries, storage.NewListSeries(labels.FromStrings( + "tens", fmt.Sprintf("value%d", i/10), + "unique", fmt.Sprintf("value%d", i), + ), []tsdbutil.Sample{sample{100, 0, nil, nil}})) + } + + blockDir := createBlock(t, tmpdir, seriesEntries) + files, err := sequenceFiles(chunkDir(blockDir)) + require.NoError(t, err) + require.Greater(t, len(files), 0, "No chunk created.") + + // Check open err. + block, err := OpenBlock(nil, blockDir, nil) + require.NoError(t, err) + defer func() { require.NoError(t, block.Close()) }() + + indexReader, err := block.Index() + require.NoError(t, err) + defer func() { require.NoError(t, indexReader.Close()) }() + + var uniqueWithout30s []string + for i := 0; i < 100; i++ { + if i/10 != 3 { + uniqueWithout30s = append(uniqueWithout30s, fmt.Sprintf("value%d", i)) + } + } + sort.Strings(uniqueWithout30s) + testCases := []struct { + name string + labelName string + matchers []*labels.Matcher + expectedValues []string + }{ + { + name: "get tens based on unique id", + labelName: "tens", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "unique", "value35")}, + expectedValues: []string{"value3"}, + }, { + name: "get unique ids based on a ten", + labelName: "unique", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "tens", "value1")}, + expectedValues: []string{"value10", "value11", "value12", "value13", "value14", "value15", "value16", "value17", "value18", "value19"}, + }, { + name: "get tens by pattern matching on unique id", + labelName: "tens", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "unique", "value[5-7]5")}, + expectedValues: []string{"value5", "value6", "value7"}, + }, { + name: "get tens by matching for presence of unique label", + labelName: "tens", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "unique", "")}, + expectedValues: []string{"value0", "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9"}, + }, { + name: "get unique IDs based on tens not being equal to a certain value, while not emptyy", + labelName: "unique", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchNotEqual, "tens", "value3"), + labels.MustNewMatcher(labels.MatchNotEqual, "tens", ""), + }, + expectedValues: uniqueWithout30s, + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + it := indexReader.LabelValuesStream(tt.labelName, tt.matchers...) + var values []string + for it.Next() { + values = append(values, it.At()) + } + require.NoError(t, it.Err()) + require.Empty(t, it.Warnings()) + require.Equal(t, tt.expectedValues, values) + }) + } +} + // TestBlockSize ensures that the block size is calculated correctly. func TestBlockSize(t *testing.T) { tmpdir := t.TempDir() diff --git a/tsdb/db_test.go b/tsdb/db_test.go index bde729d1dd..0fccc164c2 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -6664,3 +6664,97 @@ Outer: require.NoError(t, writerErr) } + +func TestQuerier_LabelValuesStream(t *testing.T) { + db := openTestDB(t, nil, nil) + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + + var seriesEntries []storage.Series + // Add a block of 70 series with timestamp 1 + for i := 0; i < 70; i++ { + seriesEntries = append(seriesEntries, storage.NewListSeries(labels.FromStrings( + "tens", fmt.Sprintf("value%d", i/10), + "unique", fmt.Sprintf("value%d", i), + ), tsdbutil.GenerateSamples(1, 1))) + } + createBlock(t, db.Dir(), seriesEntries) + + // Add a block of 50 series with timestamp 2 + // Since "tens" start at 50, two of the label values ("value5", "value6") will overlap with the + // previous block + seriesEntries = seriesEntries[:0] + for i := 50; i < 100; i++ { + seriesEntries = append(seriesEntries, storage.NewListSeries(labels.FromStrings( + "tens", fmt.Sprintf("value%d", i/10), + "unique", fmt.Sprintf("value%d", i), + ), tsdbutil.GenerateSamples(2, 1))) + } + createBlock(t, db.Dir(), seriesEntries) + + require.NoError(t, db.reloadBlocks()) + + querier, err := db.Querier(context.Background(), 1, 2) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, querier.Close()) }) + + t.Run("without matchers", func(t *testing.T) { + it := querier.LabelValuesStream("tens") + var values []string + for it.Next() { + values = append(values, it.At()) + } + require.NoError(t, it.Err()) + + require.Equal(t, []string{ + "value0", "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9", + }, values) + }) + + t.Run("with matchers", func(t *testing.T) { + testCases := []struct { + name string + label string + matchers []*labels.Matcher + expLabels []string + }{ + { + name: "matching on requested label", + label: "tens", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "tens", "value1"), + }, + expLabels: []string{"value1"}, + }, + { + name: "unsuccessful matching on requested label", + label: "tens", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "tens", "value10"), + }, + expLabels: nil, + }, + { + name: "matching on other label", + label: "tens", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "unique", "value51"), + }, + expLabels: []string{"value5"}, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + it := querier.LabelValuesStream(tc.label, tc.matchers...) + var values []string + for it.Next() { + values = append(values, it.At()) + } + require.NoError(t, it.Err()) + + require.Equal(t, tc.expLabels, values) + }) + } + }) +} diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 52fef38463..9db70235e1 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -86,6 +86,20 @@ func (h *headIndexReader) LabelValues(name string, matchers ...*labels.Matcher) return labelValuesWithMatchers(h, name, matchers...) } +// LabelValuesStream returns an iterator over label values present in +// the head for the specific label name that are within the time range +// mint to maxt. +// If matchers are specified the returned result set is reduced +// to label values of metrics matching the matchers. +func (h *headIndexReader) LabelValuesStream(name string, matchers ...*labels.Matcher) storage.LabelValues { + if h.maxt < h.head.MinTime() || h.mint > h.head.MaxTime() { + return nil + } + + // TODO: Implement matchers + return h.head.postings.LabelValuesStream(name) +} + // LabelNames returns all the unique label names present in the head // that are within the time range mint to maxt. func (h *headIndexReader) LabelNames(matchers ...*labels.Matcher) ([]string, error) { diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 368c7732c2..25873053a3 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -2460,6 +2460,66 @@ func TestHeadLabelValuesWithMatchers(t *testing.T) { } } +func TestHeadLabelValuesStream_WithMatchers(t *testing.T) { + head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + t.Cleanup(func() { require.NoError(t, head.Close()) }) + + app := head.Appender(context.Background()) + for i := 0; i < 100; i++ { + _, err := app.Append(0, labels.FromStrings( + "tens", fmt.Sprintf("value%d", i/10), + "unique", fmt.Sprintf("value%d", i), + ), 100, 0) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + testCases := []struct { + name string + labelName string + matchers []*labels.Matcher + expectedValues []string + }{ + { + name: "get tens based on unique id", + labelName: "tens", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "unique", "value35")}, + expectedValues: []string{"value3"}, + }, { + name: "get unique ids based on a ten", + labelName: "unique", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "tens", "value1")}, + expectedValues: []string{"value10", "value11", "value12", "value13", "value14", "value15", "value16", "value17", "value18", "value19"}, + }, { + name: "get tens by pattern matching on unique id", + labelName: "tens", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "unique", "value[5-7]5")}, + expectedValues: []string{"value5", "value6", "value7"}, + }, { + name: "get tens by matching for absence of unique label", + labelName: "tens", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "unique", "")}, + expectedValues: []string{"value0", "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9"}, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + headIdxReader := head.indexRange(0, 200) + + it := headIdxReader.LabelValuesStream(tt.labelName, tt.matchers...) + var values []string + for it.Next() { + values = append(values, it.At()) + } + require.NoError(t, it.Err()) + require.Empty(t, it.Warnings()) + + require.Equal(t, tt.expectedValues, values) + }) + } +} + func TestHeadLabelNamesWithMatchers(t *testing.T) { head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) defer func() { diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 3b672ec2cc..dd2114e732 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -25,6 +25,7 @@ import ( "math" "os" "path/filepath" + "reflect" "sort" "unsafe" @@ -1502,6 +1503,7 @@ func (r *Reader) LabelValues(name string, matchers ...*labels.Matcher) ([]string return values, nil } + e, ok := r.postings[name] if !ok { return nil, nil @@ -1540,6 +1542,37 @@ func (r *Reader) LabelValues(name string, matchers ...*labels.Matcher) ([]string return values, nil } +func (r *Reader) LabelValuesStream(name string, matchers ...*labels.Matcher) storage.LabelValues { + ownMatchers := 0 + for _, m := range matchers { + if m.Name == name { + ownMatchers++ + } + } + + if r.version == FormatV1 { + p := r.postingsV1[name] + // TODO: Handle matchers on other label names + return &labelValuesV1{ + matchers: matchers, + it: reflect.ValueOf(p).MapRange(), + } + } + + p := r.postings[name] + if len(p) == 0 { + return nil + } + + if ownMatchers == len(matchers) { + // All matchers are for the requested label name + return r.newLabelValuesV2(name, matchers) + } + + // There are matchers on other label names than the requested one, so will need to intersect matching series + return r.labelValuesForMatchersStream(name, matchers) +} + // LabelNamesFor returns all the label names for the series referred to by IDs. // The names returned are sorted. func (r *Reader) LabelNamesFor(ids ...storage.SeriesRef) ([]string, error) { diff --git a/tsdb/index/labelvalues.go b/tsdb/index/labelvalues.go new file mode 100644 index 0000000000..9d66fae5a8 --- /dev/null +++ b/tsdb/index/labelvalues.go @@ -0,0 +1,717 @@ +package index + +import ( + "container/heap" + "fmt" + "reflect" + "sort" + + "github.com/pkg/errors" + "golang.org/x/exp/maps" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/encoding" +) + +type labelValuesV2 struct { + name string + cur string + dec encoding.Decbuf + matchers []*labels.Matcher + skip int + lastVal string + exhausted bool + err error +} + +// newLabelValuesV2 returns an iterator over label values in a v2 index. +func (r *Reader) newLabelValuesV2(name string, matchers []*labels.Matcher) storage.LabelValues { + p := r.postings[name] + + d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil) + d.Skip(p[0].off) + // These are always the same number of bytes, and it's faster to skip than to parse + skip := d.Len() + // Key count + d.Uvarint() + // Label name + d.UvarintBytes() + skip -= d.Len() + + return &labelValuesV2{ + name: name, + matchers: matchers, + dec: d, + lastVal: p[len(p)-1].value, + skip: skip, + } +} + +func (l *labelValuesV2) Next() bool { + if l.err != nil || l.exhausted { + return false + } + + // Pick the first matching label value + for l.dec.Err() == nil { + // Label value + val := yoloString(l.dec.UvarintBytes()) + isMatch := true + for _, m := range l.matchers { + if m.Name != l.name { + // This should not happen + continue + } + + if !m.Matches(val) { + isMatch = false + break + } + } + + if isMatch { + l.cur = val + } + if val == l.lastVal { + l.exhausted = true + return isMatch + } + + // Offset + l.dec.Uvarint64() + // Skip forward to next entry + l.dec.Skip(l.skip) + + if isMatch { + break + } + } + if l.dec.Err() != nil { + // An error occurred skipping to this entry + l.err = errors.Wrap(l.dec.Err(), "get postings offset entry") + return false + } + + return true +} + +func (l *labelValuesV2) At() string { + return l.cur +} + +func (l *labelValuesV2) Err() error { + return l.err +} + +func (l *labelValuesV2) Warnings() storage.Warnings { + return nil +} + +func (r *Reader) labelValuesForMatchersStream(name string, matchers []*labels.Matcher) storage.LabelValues { + // See which labels must be non-empty. + // Optimization for case like {l=~".", l!="1"}. + labelMustBeSet := make(map[string]bool, len(matchers)) + for _, m := range matchers { + if !m.Matches("") { + labelMustBeSet[m.Name] = true + } + } + + fmt.Printf("labelValuesForMatchersStream getting label values iterator for %q\n", name) + + // Make sure to intersect with series containing the label name + pit := r.postingsWithLabel(name) + if pit.Err() != nil { + return storage.ErrLabelValues(pit.Err()) + } + if IsEmptyPostingsType(pit) { + return storage.EmptyLabelValues() + } + its := []Postings{pit} + var notIts []Postings + for _, m := range matchers { + switch { + case labelMustBeSet[m.Name]: + // If this matcher must be non-empty, we can be smarter. + matchesEmpty := m.Matches("") + isNot := m.Type == labels.MatchNotEqual || m.Type == labels.MatchNotRegexp + switch { + case isNot && matchesEmpty: // l!="foo" + // If the label can't be empty and is a Not and the inner matcher + // doesn't match empty, then subtract it out at the end. + inverse, err := m.Inverse() + if err != nil { + return storage.ErrLabelValues(err) + } + + it, err := r.postingsForMatcher(inverse) + if err != nil { + return storage.ErrLabelValues(err) + } + notIts = append(notIts, it) + case isNot && !matchesEmpty: // l!="" + // If the label can't be empty and is a Not, but the inner matcher can + // be empty we need to use inversePostingsForMatcher. + inverse, err := m.Inverse() + if err != nil { + return storage.ErrLabelValues(err) + } + + it, err := r.inversePostingsForMatcher(inverse) + if err != nil { + return storage.ErrLabelValues(err) + } + if IsEmptyPostingsType(it) { + return storage.EmptyLabelValues() + } + its = append(its, it) + default: // l="a" + // Non-Not matcher, use normal postingsForMatcher. + it, err := r.postingsForMatcher(m) + if err != nil { + return storage.ErrLabelValues(err) + } + if IsEmptyPostingsType(it) { + return storage.EmptyLabelValues() + } + its = append(its, it) + } + default: // l="" + // If a matcher for a labelname selects an empty value, it selects all + // the series which don't have the label name set too. See: + // https://github.com/prometheus/prometheus/issues/3575 and + // https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555 + it, err := r.inversePostingsForMatcher(m) + if err != nil { + return storage.ErrLabelValues(err) + } + notIts = append(notIts, it) + } + } + + it := newIntersectLabelValues(its, notIts) + + // Fill in a map for de-duplication + m := map[string]struct{}{} + for it.Next() { + m[it.At()] = struct{}{} + } + if it.Err() != nil { + return storage.ErrLabelValues(it.Err()) + } + + vals := make([]string, 0, len(m)) + for v := range m { + vals = append(vals, v) + } + maps.Clear(m) + sort.Strings(vals) + fmt.Printf("labelValuesForMatchersStream returning merging label values iterator for %q\n", name) + return newListLabelValues(vals) +} + +// newIntersectLabelValues returns an iterator over label values stemming from the intersection of the Postings +// iterators in its, minus the ones in notIts. +func newIntersectLabelValues(its, notIts []Postings) storage.LabelValues { + if len(its) == 0 { + return storage.EmptyLabelValues() + } + for _, p := range its { + if p == EmptyPostings() { + return storage.EmptyLabelValues() + } + } + + if len(its) == 1 { + it, ok := its[0].(postingsWithLabelValues) + if !ok { + return storage.ErrLabelValues(fmt.Errorf("input iterator should be a storage.LabelValues")) + } + if !it.Next() { + return storage.EmptyLabelValues() + } + if it.Err() != nil { + storage.ErrLabelValues(it.Err()) + } + return newListLabelValues([]string{it.ValueAt()}) + } + + return &intersectPostingsWithLabel{ + arr: its, + remove: Merge(notIts...), + } +} + +type intersectPostingsWithLabel struct { + arr []Postings + remove Postings + curSeries storage.SeriesRef + cur string +} + +func (it *intersectPostingsWithLabel) At() string { + return it.cur +} + +func (it *intersectPostingsWithLabel) Next() bool { + // Advance all iterators in it.arr and record the maximum of their current series IDs as a starting point + for _, p := range it.arr { + if !p.Next() { + return false + } + + cur := p.At() + if cur > it.curSeries { + // This is a candidate, but make sure it's not in the remove set + if ok := it.remove.Seek(cur); ok && cur == it.remove.At() { + continue + } + it.curSeries = p.At() + } + } + + return it.doNext() +} + +func (it *intersectPostingsWithLabel) doNext() bool { + // Find next series ref greater than the current one, which all sub-iterators have in common +loop: + for { + for _, p := range it.arr { + if !p.Seek(it.curSeries) { + // Unable to find a common series ref >= the current one + return false + } + cur := p.At() + + if cur > it.curSeries { + // This is a candidate, but make sure it's not in the remove set + if ok := it.remove.Seek(cur); ok && cur == it.remove.At() { + continue + } + + it.curSeries = cur + continue loop + } + + if lvIt, ok := p.(postingsWithLabelValues); ok { + it.cur = lvIt.ValueAt() + } + } + + // All sub-iterators are currently aligned on the same series ref + return true + } +} + +func (it *intersectPostingsWithLabel) Seek(id storage.SeriesRef) bool { + if it.curSeries >= id { + return true + } + + it.curSeries = id + return it.doNext() +} + +func (it *intersectPostingsWithLabel) Err() error { + for _, p := range it.arr { + if p.Err() != nil { + return p.Err() + } + } + return nil +} + +func (*intersectPostingsWithLabel) Warnings() storage.Warnings { + return nil +} + +// postingsForMatcher returns a Postings iterator matching m. +func (r *Reader) postingsForMatcher(m *labels.Matcher) (Postings, error) { + // This method will not return postings for missing labels. + + // Fast-path for equal matching. + if m.Type == labels.MatchEqual { + return r.Postings(m.Name, m.Value) + } + + // Fast-path for set matching. + if m.Type == labels.MatchRegexp { + setMatches := m.SetMatches() + if len(setMatches) > 0 { + return r.Postings(m.Name, setMatches...) + } + } + + vals, err := r.LabelValues(m.Name) + if err != nil { + return nil, err + } + + var res []string + for _, val := range vals { + if m.Matches(val) { + res = append(res, val) + } + } + + if len(res) == 0 { + return EmptyPostings(), nil + } + + return r.Postings(m.Name, res...) +} + +// inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher. +func (r *Reader) inversePostingsForMatcher(m *labels.Matcher) (Postings, error) { + // Fast-path for MatchNotRegexp matching. + // Inverse of a MatchNotRegexp is MatchRegexp (double negation). + // Fast-path for set matching. + if m.Type == labels.MatchNotRegexp { + setMatches := m.SetMatches() + if len(setMatches) > 0 { + return r.Postings(m.Name, setMatches...) + } + } + + // Fast-path for MatchNotEqual matching. + // Inverse of a MatchNotEqual is MatchEqual (double negation). + if m.Type == labels.MatchNotEqual { + return r.Postings(m.Name, m.Value) + } + + vals, err := r.LabelValues(m.Name) + if err != nil { + return nil, err + } + + var res []string + // If the inverse match is ="", we just want all the values. + if m.Type == labels.MatchEqual && m.Value == "" { + res = vals + } else { + for _, val := range vals { + if !m.Matches(val) { + res = append(res, val) + } + } + } + + return r.Postings(m.Name, res...) +} + +type labelValuesV1 struct { + it *reflect.MapIter + matchers []*labels.Matcher +} + +func (l *labelValuesV1) Next() bool { + // TODO: Implement matchers + return l.it.Next() +} + +func (l *labelValuesV1) At() string { + return yoloString(l.it.Value().Bytes()) +} + +func (*labelValuesV1) Err() error { + return nil +} + +func (*labelValuesV1) Warnings() storage.Warnings { + return nil +} + +// postingsWithLabelValues is a Postings iterator also tracking label values. +type postingsWithLabelValues interface { + Postings + + // ValueAt returns the current label value. + ValueAt() string +} + +// postingsWithLabel returns a postingsWithLabelValues iterator over postings (series refs) with the specified +// label name, that also tracks the associated label values. +func (r *Reader) postingsWithLabel(name string) postingsWithLabelValues { + if r.version == FormatV1 { + e := r.postingsV1[name] + if len(e) == 0 { + return &wrapPostingsWithLabelValue{p: EmptyPostings()} + } + + var res []postingsWithLabelValues + for val, off := range e { + // Read from the postings table. + d := encoding.NewDecbufAt(r.b, int(off), castagnoliTable) + _, p, err := r.dec.Postings(d.Get()) + if err != nil { + return &wrapPostingsWithLabelValue{err: errors.Wrap(err, "decode postings")} + } + res = append(res, &wrapPostingsWithLabelValue{ + value: val, + p: p, + }) + } + return newMergedPostingsWithLabelValues(res) + } + + e := r.postings[name] + if len(e) == 0 { + return &wrapPostingsWithLabelValue{p: EmptyPostings()} + } + + d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil) + // Skip to start + d.Skip(e[0].off) + lastVal := e[len(e)-1].value + + skip := 0 + var res []postingsWithLabelValues + for d.Err() == nil { + if skip == 0 { + // These are always the same number of bytes, + // and it's faster to skip than to parse. + skip = d.Len() + d.Uvarint() // Keycount. + d.UvarintBytes() // Label name. + skip -= d.Len() + } else { + d.Skip(skip) + } + v := yoloString(d.UvarintBytes()) // Label value. + + postingsOff := d.Uvarint64() + // Read from the postings table + d2 := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable) + _, p, err := r.dec.Postings(d2.Get()) + if err != nil { + return &wrapPostingsWithLabelValue{err: errors.Wrap(err, "decode postings")} + } + res = append(res, &wrapPostingsWithLabelValue{ + value: v, + p: p, + }) + + if v == lastVal { + break + } + } + if d.Err() != nil { + return &wrapPostingsWithLabelValue{err: errors.Wrap(d.Err(), "get postings offset entry")} + } + + return newMergedPostingsWithLabelValues(res) +} + +// wrapPostingsWithLabelValue is a Postings iterator, that also wraps the label value they have in common. +type wrapPostingsWithLabelValue struct { + value string + p Postings + err error +} + +func (it *wrapPostingsWithLabelValue) Next() bool { + return it.p.Next() +} + +// At returns the current series ref. +func (it *wrapPostingsWithLabelValue) At() storage.SeriesRef { + return it.p.At() +} + +// ValueAt returns the associated label value. +func (it *wrapPostingsWithLabelValue) ValueAt() string { + return it.value +} + +func (it *wrapPostingsWithLabelValue) Seek(v storage.SeriesRef) bool { + return it.p.Seek(v) +} + +func (it *wrapPostingsWithLabelValue) Err() error { + if it.err != nil { + return it.err + } + return it.p.Err() +} + +func (*wrapPostingsWithLabelValue) Warnings() storage.Warnings { + return nil +} + +type listLabelValues struct { + cur string + values []string + matchers []*labels.Matcher +} + +func newListLabelValues(values []string, matchers ...*labels.Matcher) *listLabelValues { + return &listLabelValues{ + values: values, + matchers: matchers, + } +} + +func (l *listLabelValues) Next() bool { + if len(l.values) == 0 { + return false + } + + // TODO: Implement matchers + l.cur = l.values[0] + l.values = l.values[1:] + return true +} + +func (l *listLabelValues) At() string { + return l.cur +} + +func (*listLabelValues) Err() error { + return nil +} + +func (*listLabelValues) Warnings() storage.Warnings { + return nil +} + +func newMergedPostingsWithLabelValues(postings []postingsWithLabelValues) *mergedPostingsWithLabelValues { + if len(postings) == 0 { + return &mergedPostingsWithLabelValues{} + } + + h := make(postingsWithLabelValuesHeap, 0, len(postings)) + for _, p := range postings { + switch { + case p.Next(): + h = append(h, p) + case p.Err() != nil: + return &mergedPostingsWithLabelValues{err: p.Err()} + } + } + + return &mergedPostingsWithLabelValues{h: h} +} + +// mergedPostingsWithLabelValues is an iterator that merges postingsWithLabelValues iterators, based on their +// respective series references. +type mergedPostingsWithLabelValues struct { + h postingsWithLabelValuesHeap + err error + initialized bool + cur storage.SeriesRef + curVal string +} + +func (it *mergedPostingsWithLabelValues) Next() bool { + if it.h.Len() == 0 || it.err != nil { + return false + } + + if !it.initialized { + heap.Init(&it.h) + it.cur = it.h[0].At() + it.curVal = it.h[0].ValueAt() + it.initialized = true + fmt.Printf("mergedPostingsWithLabelValues initialized, current series ref: %d, current value: %s\n", + it.cur, it.curVal) + return true + } + + for { + cur := it.h[0] + if !cur.Next() { + fmt.Printf("mergedPostingsWithLabelValues popping current iterator from heap, since it's exhausted\n") + heap.Pop(&it.h) + if cur.Err() != nil { + it.err = cur.Err() + fmt.Printf("mergedPostingsWithLabelValues failing Next() due to error: %s\n", it.err) + return false + } + if it.h.Len() == 0 { + fmt.Printf("mergedPostingsWithLabelValues failing Next() due to exhaustion\n") + return false + } + } else { + // Value of top of heap has changed, re-heapify. + heap.Fix(&it.h, 0) + } + + sr := it.h[0].At() + val := it.h[0].ValueAt() + if sr != it.cur && val != it.curVal { + it.cur = sr + it.curVal = val + fmt.Printf("mergedPostingsWithLabelValues Next() successful, current series ref changed to %d, value: %s\n", it.cur, it.curVal) + return true + } + } +} + +// Seek advances the iterator until the current series reference is >= id, or the iterator is exhausted. +func (it *mergedPostingsWithLabelValues) Seek(id storage.SeriesRef) bool { + if it.h.Len() == 0 || it.err != nil { + return false + } + if !it.initialized && !it.Next() { + return false + } + + fmt.Printf("mergedPostingsWithLabelValues seeking series ref >= %d\n", id) + for it.cur < id { + cur := it.h[0] + if !cur.Seek(id) { + heap.Pop(&it.h) + if cur.Err() != nil { + it.err = cur.Err() + return false + } + if it.h.Len() == 0 { + return false + } + } else { + // Value of top of heap has changed, re-heapify. + heap.Fix(&it.h, 0) + } + + it.cur = it.h[0].At() + it.curVal = it.h[0].ValueAt() + } + fmt.Printf("mergedPostingsWithLabelValues found series ref %d, value: %q\n", it.cur, it.curVal) + return true +} + +func (it mergedPostingsWithLabelValues) At() storage.SeriesRef { + return it.cur +} + +func (it mergedPostingsWithLabelValues) ValueAt() string { + return it.curVal +} + +func (it mergedPostingsWithLabelValues) Err() error { + return it.err +} + +func (mergedPostingsWithLabelValues) Warnings() storage.Warnings { + return nil +} + +type postingsWithLabelValuesHeap []postingsWithLabelValues + +func (h postingsWithLabelValuesHeap) Len() int { return len(h) } +func (h postingsWithLabelValuesHeap) Less(i, j int) bool { return h[i].At() < h[j].At() } +func (h *postingsWithLabelValuesHeap) Swap(i, j int) { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] } + +func (h *postingsWithLabelValuesHeap) Push(x interface{}) { + *h = append(*h, x.(postingsWithLabelValues)) +} + +func (h *postingsWithLabelValuesHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index 9de86f5486..79df1eec19 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -146,6 +146,18 @@ func (p *MemPostings) LabelValues(name string) []string { return values } +// LabelValuesStream returns an iterator over label values for the given name. +func (p *MemPostings) LabelValuesStream(name string, matchers ...*labels.Matcher) storage.LabelValues { + p.mtx.RLock() + defer p.mtx.RUnlock() + + values := make([]string, 0, len(p.m[name])) + for v := range p.m[name] { + values = append(values, v) + } + return newListLabelValues(values, matchers...) +} + // PostingsStats contains cardinality based statistics for postings. type PostingsStats struct { CardinalityMetricsStats []Stat diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 33f774a8c2..e28853d0c6 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -175,6 +175,18 @@ func (oh *OOOHeadIndexReader) LabelValues(name string, matchers ...*labels.Match return labelValuesWithMatchers(oh, name, matchers...) } +// LabelValuesStream needs to be overridden from the headIndexReader implementation due +// to the check that happens at the beginning where we make sure that the query +// interval overlaps with the head minooot and maxooot. +func (oh *OOOHeadIndexReader) LabelValuesStream(name string, matchers ...*labels.Matcher) storage.LabelValues { + if oh.maxt < oh.head.MinOOOTime() || oh.mint > oh.head.MaxOOOTime() { + return nil + } + + // TODO: Implement matchers + return oh.head.postings.LabelValuesStream(name) +} + type chunkMetaAndChunkDiskMapperRef struct { meta chunks.Meta ref chunks.ChunkDiskMapperRef @@ -431,6 +443,10 @@ func (ir *OOOCompactionHeadIndexReader) LabelValues(name string, matchers ...*la return nil, errors.New("not implemented") } +func (ir *OOOCompactionHeadIndexReader) LabelValuesStream(string, ...*labels.Matcher) storage.LabelValues { + return storage.ErrLabelValues(errors.New("not implemented")) +} + func (ir *OOOCompactionHeadIndexReader) PostingsForMatchers(concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { return nil, errors.New("not implemented") } diff --git a/tsdb/querier.go b/tsdb/querier.go index f54236cec4..c66891b078 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -76,6 +76,10 @@ func (q *blockBaseQuerier) LabelValues(name string, matchers ...*labels.Matcher) return res, nil, err } +func (q *blockBaseQuerier) LabelValuesStream(name string, matchers ...*labels.Matcher) storage.LabelValues { + return q.index.LabelValuesStream(name, matchers...) +} + func (q *blockBaseQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { res, err := q.index.LabelNames(matchers...) return res, nil, err @@ -242,7 +246,7 @@ func PostingsForMatchers(ix IndexPostingsReader, ms ...*labels.Matcher) (index.P its = append(its, it) } default: // l="" - // If the matchers for a labelname selects an empty value, it selects all + // If a matcher for a labelname selects an empty value, it selects all // the series which don't have the label name set too. See: // https://github.com/prometheus/prometheus/issues/3575 and // https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555 diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index af1b0f1b2f..a4d0bc06f4 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -1521,6 +1521,11 @@ func (m mockIndex) LabelValues(name string, matchers ...*labels.Matcher) ([]stri return values, nil } +func (m mockIndex) LabelValuesStream(name string, matchers ...*labels.Matcher) storage.LabelValues { + // TODO + return nil +} + func (m mockIndex) LabelValueFor(id storage.SeriesRef, label string) (string, error) { return m.series[id].l.Get(label), nil } @@ -2397,6 +2402,11 @@ func (m mockMatcherIndex) LabelValues(name string, matchers ...*labels.Matcher) return []string{}, errors.New("label values called") } +// LabelValuesStream will return a failing label values iterator. +func (m mockMatcherIndex) LabelValuesStream(string, ...*labels.Matcher) storage.LabelValues { + return storage.ErrLabelValues(errors.New("label values stream called")) +} + func (m mockMatcherIndex) LabelValueFor(id storage.SeriesRef, label string) (string, error) { return "", errors.New("label value for called") } @@ -2789,7 +2799,7 @@ func TestPrependPostings(t *testing.T) { }) } -func TestLabelsValuesWithMatchersOptimization(t *testing.T) { +func TestLabelValuesWithMatchersOptimization(t *testing.T) { dir := t.TempDir() opts := DefaultHeadOptions() opts.ChunkRange = 1000 diff --git a/web/api/v1/errors_test.go b/web/api/v1/errors_test.go index afdd673375..173673b0d2 100644 --- a/web/api/v1/errors_test.go +++ b/web/api/v1/errors_test.go @@ -174,6 +174,10 @@ func (t errorTestQuerier) LabelValues(name string, matchers ...*labels.Matcher) return nil, nil, t.err } +func (t errorTestQuerier) LabelValuesStream(string, ...*labels.Matcher) storage.LabelValues { + return storage.ErrLabelValues(t.err) +} + func (t errorTestQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { return nil, nil, t.err }