Skip to content

Commit

Permalink
Add LabelQuerier.LabelValuesStream method
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <[email protected]>
  • Loading branch information
aknuds1 committed Sep 4, 2023
1 parent 002ae0a commit 6e2bc1b
Show file tree
Hide file tree
Showing 20 changed files with 1,371 additions and 14 deletions.
4 changes: 4 additions & 0 deletions promql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ func (*errQuerier) LabelValues(string, ...*labels.Matcher) ([]string, storage.Wa
return nil, nil, nil
}

func (*errQuerier) LabelValuesStream(string, ...*labels.Matcher) storage.LabelValues {
return nil
}

func (*errQuerier) LabelNames(...*labels.Matcher) ([]string, storage.Warnings, error) {
return nil, nil, nil
}
Expand Down
4 changes: 4 additions & 0 deletions storage/fanout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ func (errQuerier) LabelValues(string, ...*labels.Matcher) ([]string, storage.War
return nil, nil, errors.New("label values error")
}

func (errQuerier) LabelValuesStream(string, ...*labels.Matcher) storage.LabelValues {
return storage.ErrLabelValues(errors.New("label values stream error"))
}

func (errQuerier) LabelNames(...*labels.Matcher) ([]string, storage.Warnings, error) {
return nil, nil, errors.New("label names error")
}
Expand Down
24 changes: 24 additions & 0 deletions storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ func (q *MockQuerier) LabelValues(string, ...*labels.Matcher) ([]string, Warning
return nil, nil, nil
}

func (q *MockQuerier) LabelValuesStream(string, ...*labels.Matcher) LabelValues {
return nil
}

func (q *MockQuerier) LabelNames(...*labels.Matcher) ([]string, Warnings, error) {
return nil, nil, nil
}
Expand Down Expand Up @@ -159,6 +163,12 @@ type LabelQuerier interface {
// to label values of metrics matching the matchers.
LabelValues(name string, matchers ...*labels.Matcher) ([]string, Warnings, error)

// LabelValuesStream returns an iterator over all potential values for a label name.
// It is not safe to use the strings beyond the lifetime of the querier.
// If matchers are specified the returned result set is reduced
// to label values of metrics matching the matchers.
LabelValuesStream(name string, matchers ...*labels.Matcher) LabelValues

// LabelNames returns all the unique label names present in the block in sorted order.
// If matchers are specified the returned result set is reduced
// to label names of metrics matching the matchers.
Expand Down Expand Up @@ -444,3 +454,17 @@ type ChunkIterable interface {
}

type Warnings []error

// LabelValues is an iterator over label values.
type LabelValues interface {
// Next tries to advance the iterator and returns true if it could, false otherwise.
Next() bool
// At returns the current label value.
At() string
// Err is the error that iteration eventually failed with.
// When an error occurs, the iterator cannot continue.
Err() error
// Warnings is a collection of warnings that have occurred during iteration.
// Warnings could be non-empty even if iteration has not failed with error.
Warnings() Warnings
}
147 changes: 147 additions & 0 deletions storage/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,153 @@ func mergeStrings(a, b []string) []string {
return res
}

// LabelValuesStream implements LabelQuerier.
func (q *mergeGenericQuerier) LabelValuesStream(name string, matchers ...*labels.Matcher) LabelValues {
if len(q.queriers) == 0 {
return EmptyLabelValues()
}
if len(q.queriers) == 1 {
return q.queriers[0].LabelValuesStream(name, matchers...)
}

h := make(labelValuesHeap, 0, len(q.queriers))
var ws Warnings
for i, sq := range q.queriers {
fmt.Printf("Calling sub-querier %d for name %q\n", i, name)
it := sq.LabelValuesStream(name, matchers...)
switch {
case it.Next():
fmt.Printf("Got non-empty iterator %p from sub-querier %d\n", it, i)
h = append(h, it)
case it.Err() != nil:
return errLabelValues{
err: it.Err(),
warnings: it.Warnings(),
}
case len(it.Warnings()) > 0:
// Iterator is immediately exhausted, but keep its warnings
ws = append(ws, it.Warnings()...)
default:
fmt.Printf("Got empty iterator from sub-querier %d\n", i)
}
}

return &mergedLabelValues{
h: h,
warnings: ws,
}
}

// mergedLabelValues is a label values iterator merging a collection of sub-iterators.
type mergedLabelValues struct {
h labelValuesHeap
cur string
initialized bool
err error
warnings Warnings
}

func (m *mergedLabelValues) Next() bool {
if m.h.Len() == 0 || m.err != nil {
return false
}

if !m.initialized {
heap.Init(&m.h)
m.cur = m.h[0].At()
fmt.Printf("mergedLabelValues initializing: At the top of the heap is value %q\n", m.cur)
m.initialized = true
return true
}

fmt.Printf("mergedLabelValues.Next() called after initialization\n")
for {
cur := m.h[0]
if !cur.Next() {
fmt.Printf("mergedLabelValues: Current iterator exhausted, popping it from heap\n")
heap.Pop(&m.h)
if len(cur.Warnings()) > 0 {
m.warnings = append(m.warnings, cur.Warnings()...)
}
if cur.Err() != nil {
m.err = cur.Err()
fmt.Printf("mergedLabelValues: Current iterator failed: %q\n", m.err)
return false
}
if m.h.Len() == 0 {
fmt.Printf("mergedLabelValues: Heap empty, returning false\n")
return false
}
} else {
// Heap top has changed, fix up
fmt.Printf("mergedLabelValues: Advanced heap top iterator, re-sorting heap\n")
heap.Fix(&m.h, 0)
}

if m.h[0].At() != m.cur {
m.cur = m.h[0].At()
fmt.Printf("mergedLabelValues: Found new current label value: %q\n", m.cur)
return true
} else {
fmt.Printf("mergedLabelValues: Heap top's value is the same as the current one\n")
}
}
}

func (m *mergedLabelValues) At() string {
return m.cur
}

func (m *mergedLabelValues) Err() error {
return m.err
}

func (m *mergedLabelValues) Warnings() Warnings {
return m.warnings
}

// errLabelValues is an empty label values iterator with an error.
type errLabelValues struct {
err error
warnings Warnings
}

func (e errLabelValues) Next() bool { return false }
func (e errLabelValues) At() string { return "" }
func (e errLabelValues) Err() error { return e.err }
func (e errLabelValues) Warnings() Warnings { return e.warnings }

// ErrLabelValues returns a LabelValues with err.
func ErrLabelValues(err error) LabelValues {
return errLabelValues{err: err}
}

var emptyLabelValues = errLabelValues{}

// EmptyLabelValues returns an empty LabelValues.
func EmptyLabelValues() LabelValues {
return emptyLabelValues
}

// labelValuesHeap is a heap of LabelValues iterators, sorted on label value.
type labelValuesHeap []LabelValues

func (h labelValuesHeap) Len() int { return len(h) }
func (h labelValuesHeap) Less(i, j int) bool { return h[i].At() < h[j].At() }
func (h labelValuesHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }

func (h *labelValuesHeap) Push(x interface{}) {
*h = append(*h, x.(LabelValues))
}

func (h *labelValuesHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

// LabelNames returns all the unique label names present in all queriers in sorted order.
func (q *mergeGenericQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, Warnings, error) {
var (
Expand Down
100 changes: 90 additions & 10 deletions storage/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,58 @@ func (m *mockGenericQuerier) LabelValues(name string, matchers ...*labels.Matche
return m.resp, m.warnings, m.err
}

func (m *mockGenericQuerier) LabelValuesStream(name string, matchers ...*labels.Matcher) LabelValues {
m.mtx.Lock()
m.labelNamesRequested = append(m.labelNamesRequested, labelNameRequest{
name: name,
matchers: matchers,
})
m.mtx.Unlock()

if m.err == nil {
return newLabelValuesList(m.resp, m.warnings)
}
return errLabelValues{
err: m.err,
warnings: m.warnings,
}
}

type labelValuesList struct {
i int
values []string
warnings Warnings
}

func newLabelValuesList(values []string, warnings Warnings) *labelValuesList {
return &labelValuesList{
i: -1,
values: values,
warnings: warnings,
}
}

func (l *labelValuesList) Next() bool {
if l.i >= len(l.values)-1 {
return false
}

l.i++
return true
}

func (l *labelValuesList) At() string {
return l.values[l.i]
}

func (*labelValuesList) Err() error {
return nil
}

func (l *labelValuesList) Warnings() Warnings {
return l.warnings
}

func (m *mockGenericQuerier) LabelNames(...*labels.Matcher) ([]string, Warnings, error) {
m.mtx.Lock()
m.labelNamesCalls++
Expand Down Expand Up @@ -1081,8 +1133,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
expectedSelectsSeries []labels.Labels
expectedLabels []string

expectedWarnings [4]Warnings
expectedErrs [4]error
expectedWarnings [5]Warnings
expectedErrs [5]error
}{
{},
{
Expand Down Expand Up @@ -1110,7 +1162,7 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
{
name: "one failed primary querier",
queriers: []genericQuerier{&mockGenericQuerier{warnings: nil, err: errStorage}},
expectedErrs: [4]error{errStorage, errStorage, errStorage, errStorage},
expectedErrs: [5]error{errStorage, errStorage, errStorage, errStorage, errStorage},
},
{
name: "one successful primary querier with successful secondaries",
Expand Down Expand Up @@ -1146,7 +1198,7 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: nil, err: nil}},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"c"}, warnings: nil, err: nil}},
},
expectedErrs: [4]error{errStorage, errStorage, errStorage, errStorage},
expectedErrs: [5]error{errStorage, errStorage, errStorage, errStorage, errStorage},
},
{
name: "one successful primary querier with failed secondaries",
Expand All @@ -1159,7 +1211,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
labels.FromStrings("test", "a"),
},
expectedLabels: []string{"a"},
expectedWarnings: [4]Warnings{
expectedWarnings: [5]Warnings{
[]error{errStorage, errStorage},
[]error{errStorage, errStorage},
[]error{errStorage, errStorage},
[]error{errStorage, errStorage},
Expand All @@ -1177,7 +1230,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
labels.FromStrings("test", "b"),
},
expectedLabels: []string{"a", "b"},
expectedWarnings: [4]Warnings{
expectedWarnings: [5]Warnings{
[]error{warnStorage, warnStorage},
[]error{warnStorage, warnStorage},
[]error{warnStorage, warnStorage},
[]error{warnStorage, warnStorage},
Expand Down Expand Up @@ -1242,11 +1296,36 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
require.Equal(t, []labelNameRequest{{name: "test"}}, m.labelNamesRequested)
}
})
t.Run("LabelValuesStream", func(t *testing.T) {
it := q.LabelValuesStream("test2")
require.NotNil(t, it)
var res []string
for it.Next() {
res = append(res, it.At())
}
require.Equal(t, tcase.expectedWarnings[3], it.Warnings())
err := it.Err()
require.True(t, errors.Is(err, tcase.expectedErrs[3]), "expected error doesn't match")
if err != nil {
return
}

require.Equal(t, tcase.expectedLabels, res)

for _, qr := range q.queriers {
m := unwrapMockGenericQuerier(t, qr)

require.Equal(t, []labelNameRequest{
{name: "test"},
{name: "test2"},
}, m.labelNamesRequested)
}
})
t.Run("LabelValuesWithMatchers", func(t *testing.T) {
matcher := labels.MustNewMatcher(labels.MatchEqual, "otherLabel", "someValue")
res, w, err := q.LabelValues("test2", matcher)
require.Equal(t, tcase.expectedWarnings[3], w)
require.True(t, errors.Is(err, tcase.expectedErrs[3]), "expected error doesn't match")
res, w, err := q.LabelValues("test3", matcher)
require.Equal(t, tcase.expectedWarnings[4], w)
require.True(t, errors.Is(err, tcase.expectedErrs[4]), "expected error doesn't match")
require.Equal(t, tcase.expectedLabels, res)

if err != nil {
Expand All @@ -1257,7 +1336,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {

require.Equal(t, []labelNameRequest{
{name: "test"},
{name: "test2", matchers: []*labels.Matcher{matcher}},
{name: "test2"},
{name: "test3", matchers: []*labels.Matcher{matcher}},
}, m.labelNamesRequested)
}
})
Expand Down
8 changes: 8 additions & 0 deletions storage/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func (noopQuerier) LabelValues(string, ...*labels.Matcher) ([]string, Warnings,
return nil, nil, nil
}

func (noopQuerier) LabelValuesStream(string, ...*labels.Matcher) LabelValues {
return nil
}

func (noopQuerier) LabelNames(...*labels.Matcher) ([]string, Warnings, error) {
return nil, nil, nil
}
Expand All @@ -55,6 +59,10 @@ func (noopChunkQuerier) LabelValues(string, ...*labels.Matcher) ([]string, Warni
return nil, nil, nil
}

func (noopChunkQuerier) LabelValuesStream(string, ...*labels.Matcher) LabelValues {
return nil
}

func (noopChunkQuerier) LabelNames(...*labels.Matcher) ([]string, Warnings, error) {
return nil, nil, nil
}
Expand Down
Loading

0 comments on commit 6e2bc1b

Please sign in to comment.