Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

continuous test rule evaluation scratch #9800

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions development/mimir-microservices-mode/config/prometheus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ scrape_configs:
- job_name: mimir-microservices
static_configs:
- targets:
- 'distributor-1:8000'
- 'distributor-2:8001'
- 'ingester-1:8002'
- 'ingester-2:8003'
- 'ingester-3:8004'
- 'querier:8005'
- 'ruler-1:8022'
- 'ruler-2:8023'
- 'compactor:8006'
- 'query-frontend:8007'
- 'store-gateway-1:8008'
- 'store-gateway-2:8009'
- 'query-scheduler:8011'
# - 'distributor-1:8000'
# - 'distributor-2:8001'
# - 'ingester-1:8002'
# - 'ingester-2:8003'
# - 'ingester-3:8004'
# - 'querier:8005'
# - 'ruler-1:8022'
# - 'ruler-2:8023'
# - 'compactor:8006'
# - 'query-frontend:8007'
# - 'store-gateway-1:8008'
# - 'store-gateway-2:8009'
# - 'query-scheduler:8011'
- 'continuous-test:8090'
labels:
cluster: 'docker-compose'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ std.manifestYamlDoc({
' -tests.read-endpoint=http://query-frontend:8007/prometheus' +
' -tests.tenant-id=mimir-continuous-test' +
' -tests.write-endpoint=http://distributor-1:8000' +
' -tests.recording-rule-read-series-test.max-query-age=1h' +
' -tests.write-read-series-test.max-query-age=1h' +
' -tests.write-read-series-test.num-series=100',
}),
Expand Down
36 changes: 32 additions & 4 deletions pkg/continuoustest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ type MimirClient interface {
// QueryRange performs a range query.
QueryRange(ctx context.Context, query string, start, end time.Time, step time.Duration, options ...RequestOption) (model.Matrix, error)

// Query performs an instant query.
Query(ctx context.Context, query string, ts time.Time, options ...RequestOption) (model.Vector, error)
// QueryInstant performs an instant query.
QueryInstant(ctx context.Context, query string, ts time.Time, options ...RequestOption) (model.Vector, error)

// QueryInstantRangeVector performs an instant query with a range vector.
QueryInstantRangeVector(ctx context.Context, query string, ts time.Time, options ...RequestOption) (model.Matrix, error)
}

type ClientConfig struct {
Expand Down Expand Up @@ -185,8 +188,8 @@ func (c *Client) QueryRange(ctx context.Context, query string, start, end time.T
return matrix, nil
}

// Query implements MimirClient.
func (c *Client) Query(ctx context.Context, query string, ts time.Time, options ...RequestOption) (model.Vector, error) {
// QueryInstant implements MimirClient.
func (c *Client) QueryInstant(ctx context.Context, query string, ts time.Time, options ...RequestOption) (model.Vector, error) {
ctx = contextWithRequestOptions(ctx, options...)
ctx, cancel := context.WithTimeout(ctx, c.cfg.ReadTimeout)
defer cancel()
Expand All @@ -210,6 +213,31 @@ func (c *Client) Query(ctx context.Context, query string, ts time.Time, options
return vector, nil
}

// QueryInstantRangeVector implements MimirClient.
func (c *Client) QueryInstantRangeVector(ctx context.Context, query string, ts time.Time, options ...RequestOption) (model.Matrix, error) {
ctx = contextWithRequestOptions(ctx, options...)
ctx, cancel := context.WithTimeout(ctx, c.cfg.ReadTimeout)
defer cancel()

ctx = querierapi.ContextWithReadConsistencyLevel(ctx, querierapi.ReadConsistencyStrong)

value, _, err := c.readClient.Query(ctx, query, ts)
if err != nil {
return nil, err
}

if value.Type() != model.ValMatrix {
return nil, fmt.Errorf("was expecting to get a Matrix, but got %s", value.Type().String())
}

matrix, ok := value.(model.Matrix)
if !ok {
return nil, fmt.Errorf("failed to cast type to Matrix, type was %T", value)
}

return matrix, nil
}

// WriteSeries implements MimirClient.
func (c *Client) WriteSeries(ctx context.Context, series []prompb.TimeSeries) (int, error) {
lastStatusCode := 0
Expand Down
13 changes: 9 additions & 4 deletions pkg/continuoustest/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func TestClient_Query(t *testing.T) {
t.Run("results cache not explicitly disabled", func(t *testing.T) {
receivedRequests = nil

_, err := c.Query(ctx, "up", time.Unix(0, 0))
_, err := c.QueryInstant(ctx, "up", time.Unix(0, 0))
require.NoError(t, err)

require.Len(t, receivedRequests, 1)
Expand All @@ -287,7 +287,7 @@ func TestClient_Query(t *testing.T) {
t.Run("results cache disabled", func(t *testing.T) {
receivedRequests = nil

_, err := c.Query(ctx, "up", time.Unix(0, 0), WithResultsCacheEnabled(false))
_, err := c.QueryInstant(ctx, "up", time.Unix(0, 0), WithResultsCacheEnabled(false))
require.NoError(t, err)

require.Len(t, receivedRequests, 1)
Expand Down Expand Up @@ -401,7 +401,7 @@ func TestClient_QueryHeaders(t *testing.T) {

ctx := context.Background()

_, err = c.Query(ctx, "up", time.Unix(0, 0))
_, err = c.QueryInstant(ctx, "up", time.Unix(0, 0))
require.NoError(t, err)

require.Len(t, receivedRequests, 1)
Expand Down Expand Up @@ -430,7 +430,12 @@ func (m *ClientMock) QueryRange(ctx context.Context, query string, start, end ti
return args.Get(0).(model.Matrix), args.Error(1)
}

func (m *ClientMock) Query(ctx context.Context, query string, ts time.Time, options ...RequestOption) (model.Vector, error) {
func (m *ClientMock) QueryInstant(ctx context.Context, query string, ts time.Time, options ...RequestOption) (model.Vector, error) {
args := m.Called(ctx, query, ts, options)
return args.Get(0).(model.Vector), args.Error(1)
}

func (m *ClientMock) QueryInstantRangeVector(ctx context.Context, query string, ts time.Time, options ...RequestOption) (model.Matrix, error) {
args := m.Called(ctx, query, ts, options)
return args.Get(0).(model.Matrix), args.Error(1)
}
8 changes: 5 additions & 3 deletions pkg/continuoustest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import (
)

type Config struct {
Client ClientConfig `yaml:"-"`
Manager ManagerConfig `yaml:"-"`
WriteReadSeriesTest WriteReadSeriesTestConfig `yaml:"-"`
Client ClientConfig `yaml:"-"`
Manager ManagerConfig `yaml:"-"`
WriteReadSeriesTest WriteReadSeriesTestConfig `yaml:"-"`
RecordingRuleReadSeriesTest RecordingRuleReadSeriesTestConfig `yaml:"-"`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.Client.RegisterFlags(f)
cfg.Manager.RegisterFlags(f)
cfg.WriteReadSeriesTest.RegisterFlags(f)
cfg.RecordingRuleReadSeriesTest.RegisterFlags(f)
}
112 changes: 112 additions & 0 deletions pkg/continuoustest/recording_rule_read_series.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package continuoustest

import (
"context"
"flag"
"fmt"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/multierror"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"time"

"github.com/go-kit/log"
)

type RecordingRuleReadSeriesTestConfig struct {
MaxQueryAge time.Duration
}

func (cfg *RecordingRuleReadSeriesTestConfig) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.MaxQueryAge, "tests.recording-rule-read-series-test.max-query-age", 7*24*time.Hour, "Limit how far into the past metrics can be queried.")

}

type RecordingRuleReadSeriesTest struct {
name string
cfg RecordingRuleReadSeriesTestConfig
client MimirClient
logger log.Logger
metrics *TestMetrics

recordedFloatMetric MetricHistory
}

const recordingRuleMetricName = "continuous_test:time"

func NewRecordingRuleReadSeriesTest(cfg RecordingRuleReadSeriesTestConfig, client MimirClient, logger log.Logger, reg prometheus.Registerer) *RecordingRuleReadSeriesTest {
const name = "recording-rule-read-series"

return &RecordingRuleReadSeriesTest{
name: name,
cfg: cfg,
client: client,
logger: log.With(logger, "test", name),
metrics: NewTestMetrics(name, reg),
}
}

// Name implements Test.
func (t *RecordingRuleReadSeriesTest) Name() string {
return t.name
}

// Init implements Test.
func (t *RecordingRuleReadSeriesTest) Init(_ context.Context, _ time.Time) error {
t.metrics.InitializeCountersToZero(floatTypeLabel)
return nil
}

// Run implements Test.
func (t *RecordingRuleReadSeriesTest) Run(ctx context.Context, now time.Time) error {
// Collect all errors on this test run
errs := new(multierror.MultiError)

t.RunInner(ctx, now, errs)

return errs.Err()
}

func (t *RecordingRuleReadSeriesTest) RunInner(
ctx context.Context,
now time.Time,
errs *multierror.MultiError,
// records *MetricHistory,
) {

//queryRanges, queryInstants, err := t.getQueryTimeRanges(now, records)
//if err != nil {
// errs.Add(err)
//}

queryMetric := queryRecordingRule(recordingRuleMetricName, "5m")
matrix, err := t.client.QueryInstantRangeVector(ctx, queryMetric, now, WithResultsCacheEnabled(false))
if err != nil {
level.Warn(t.logger).Log("msg", "Failed to execute instant query", "err", err)
errs.Add(errors.Wrap(err, "failed to execute instant query"))
return
}

if len(matrix) != 1 {
errs.Add(fmt.Errorf("expected 1 series in the result but got %d", len(matrix)))
return
}

samples := matrix[0].Values
if len(samples) == 0 {
errs.Add(errors.New("expected at least one sample in the result"))
return
}

latestSample := samples[len(samples)-1]
ts := time.UnixMilli(int64(latestSample.Timestamp)).UTC()

updatedNow := time.Now().UTC()
fmt.Println("updatedNow", updatedNow)
fmt.Println("latest sample", ts)
fmt.Println("difference", updatedNow.Sub(ts))
if now.Sub(ts) > 1*time.Minute {
errs.Add(fmt.Errorf("latest sample is too old: %s", ts))
return
}
}
6 changes: 6 additions & 0 deletions pkg/continuoustest/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ func init() {
}
}

type queryRecordingRuleFunc func(metricName, rangeVectorDuration string) string

func queryRecordingRule(metricName, rangeVectorDuration string) string {
return fmt.Sprintf("%s[%s]", metricName, rangeVectorDuration)
}

type querySumFunc func(metricName string) string

func querySumFloat(metricName string) string {
Expand Down
2 changes: 1 addition & 1 deletion pkg/continuoustest/write_read_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func (t *WriteReadSeriesTest) runInstantQueryAndVerifyResult(ctx context.Context

t.metrics.queriesTotal.WithLabelValues(typeLabel).Inc()
queryStart := time.Now()
vector, err := t.client.Query(ctx, metricSumQuery, ts, WithResultsCacheEnabled(resultsCacheEnabled))
vector, err := t.client.QueryInstant(ctx, metricSumQuery, ts, WithResultsCacheEnabled(resultsCacheEnabled))
t.metrics.queriesLatency.WithLabelValues(typeLabel, strconv.FormatBool(resultsCacheEnabled)).Observe(time.Since(queryStart).Seconds())
if err != nil {
t.metrics.queriesFailedTotal.WithLabelValues(typeLabel).Inc()
Expand Down
Loading
Loading