Skip to content

Commit

Permalink
Merge pull request #531 from grafana/chore/sync-prometheus
Browse files Browse the repository at this point in the history
 Sync with latest Prometheus upstream
  • Loading branch information
aknuds1 authored Sep 21, 2023
2 parents 7d713e1 + 85df1d4 commit 320f0c9
Show file tree
Hide file tree
Showing 17 changed files with 228 additions and 127 deletions.
3 changes: 2 additions & 1 deletion cmd/promtool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func main() {
analyzeBlockID := tsdbAnalyzeCmd.Arg("block id", "Block to analyze (default is the last block).").String()
analyzeLimit := tsdbAnalyzeCmd.Flag("limit", "How many items to show in each list.").Default("20").Int()
analyzeRunExtended := tsdbAnalyzeCmd.Flag("extended", "Run extended analysis.").Bool()
analyzeMatchers := tsdbAnalyzeCmd.Flag("match", "Series selector to analyze. Only 1 set of matchers is supported now.").String()

tsdbListCmd := tsdbCmd.Command("list", "List tsdb blocks.")
listHumanReadable := tsdbListCmd.Flag("human-readable", "Print human readable values.").Short('r').Bool()
Expand Down Expand Up @@ -372,7 +373,7 @@ func main() {
os.Exit(checkErr(benchmarkWrite(*benchWriteOutPath, *benchSamplesFile, *benchWriteNumMetrics, *benchWriteNumScrapes)))

case tsdbAnalyzeCmd.FullCommand():
os.Exit(checkErr(analyzeBlock(ctx, *analyzePath, *analyzeBlockID, *analyzeLimit, *analyzeRunExtended)))
os.Exit(checkErr(analyzeBlock(ctx, *analyzePath, *analyzeBlockID, *analyzeLimit, *analyzeRunExtended, *analyzeMatchers)))

case tsdbListCmd.FullCommand():
os.Exit(checkErr(listBlocks(*listPath, *listHumanReadable)))
Expand Down
2 changes: 1 addition & 1 deletion cmd/promtool/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func parseAndPushMetrics(client *remote.Client, data []byte, labels map[string]s

// Encode the request body into snappy encoding.
compressed := snappy.Encode(nil, raw)
err = client.Store(context.Background(), compressed)
err = client.Store(context.Background(), compressed, 0)
if err != nil {
fmt.Fprintln(os.Stderr, " FAILED:", err)
return false
Expand Down
66 changes: 53 additions & 13 deletions cmd/promtool/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,17 @@ func openBlock(path, blockID string) (*tsdb.DBReadOnly, tsdb.BlockReader, error)
return db, b, nil
}

func analyzeBlock(ctx context.Context, path, blockID string, limit int, runExtended bool) error {
func analyzeBlock(ctx context.Context, path, blockID string, limit int, runExtended bool, matchers string) error {
var (
selectors []*labels.Matcher
err error
)
if len(matchers) > 0 {
selectors, err = parser.ParseMetricSelector(matchers)
if err != nil {
return err
}
}
db, block, err := openBlock(path, blockID)
if err != nil {
return err
Expand All @@ -426,14 +436,17 @@ func analyzeBlock(ctx context.Context, path, blockID string, limit int, runExten
fmt.Printf("Block ID: %s\n", meta.ULID)
// Presume 1ms resolution that Prometheus uses.
fmt.Printf("Duration: %s\n", (time.Duration(meta.MaxTime-meta.MinTime) * 1e6).String())
fmt.Printf("Series: %d\n", meta.Stats.NumSeries)
fmt.Printf("Total Series: %d\n", meta.Stats.NumSeries)
if len(matchers) > 0 {
fmt.Printf("Matcher: %s\n", matchers)
}
ir, err := block.Index()
if err != nil {
return err
}
defer ir.Close()

allLabelNames, err := ir.LabelNames(ctx)
allLabelNames, err := ir.LabelNames(ctx, selectors...)
if err != nil {
return err
}
Expand All @@ -460,10 +473,30 @@ func analyzeBlock(ctx context.Context, path, blockID string, limit int, runExten
labelpairsUncovered := map[string]uint64{}
labelpairsCount := map[string]uint64{}
entries := 0
p, err := ir.Postings(ctx, "", "") // The special all key.
if err != nil {
return err
var (
p index.Postings
refs []storage.SeriesRef
)
if len(matchers) > 0 {
p, err = tsdb.PostingsForMatchers(ctx, ir, selectors...)
if err != nil {
return err
}
// Expand refs first and cache in memory.
// So later we don't have to expand again.
refs, err = index.ExpandPostings(p)
if err != nil {
return err
}
fmt.Printf("Matched series: %d\n", len(refs))
p = index.NewListPostings(refs)
} else {
p, err = ir.Postings(ctx, "", "") // The special all key.
if err != nil {
return err
}
}

chks := []chunks.Meta{}
builder := labels.ScratchBuilder{}
for p.Next() {
Expand Down Expand Up @@ -512,7 +545,7 @@ func analyzeBlock(ctx context.Context, path, blockID string, limit int, runExten

postingInfos = postingInfos[:0]
for _, n := range allLabelNames {
values, err := ir.SortedLabelValues(ctx, n)
values, err := ir.SortedLabelValues(ctx, n, selectors...)
if err != nil {
return err
}
Expand All @@ -528,7 +561,7 @@ func analyzeBlock(ctx context.Context, path, blockID string, limit int, runExten

postingInfos = postingInfos[:0]
for _, n := range allLabelNames {
lv, err := ir.SortedLabelValues(ctx, n)
lv, err := ir.SortedLabelValues(ctx, n, selectors...)
if err != nil {
return err
}
Expand All @@ -538,7 +571,7 @@ func analyzeBlock(ctx context.Context, path, blockID string, limit int, runExten
printInfo(postingInfos)

postingInfos = postingInfos[:0]
lv, err := ir.SortedLabelValues(ctx, "__name__")
lv, err := ir.SortedLabelValues(ctx, "__name__", selectors...)
if err != nil {
return err
}
Expand All @@ -547,6 +580,7 @@ func analyzeBlock(ctx context.Context, path, blockID string, limit int, runExten
if err != nil {
return err
}
postings = index.Intersect(postings, index.NewListPostings(refs))
count := 0
for postings.Next() {
count++
Expand All @@ -560,18 +594,24 @@ func analyzeBlock(ctx context.Context, path, blockID string, limit int, runExten
printInfo(postingInfos)

if runExtended {
return analyzeCompaction(ctx, block, ir)
return analyzeCompaction(ctx, block, ir, selectors)
}

return nil
}

func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb.IndexReader) (err error) {
n, v := index.AllPostingsKey()
postingsr, err := indexr.Postings(ctx, n, v)
func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb.IndexReader, matchers []*labels.Matcher) (err error) {
var postingsr index.Postings
if len(matchers) > 0 {
postingsr, err = tsdb.PostingsForMatchers(ctx, indexr, matchers...)
} else {
n, v := index.AllPostingsKey()
postingsr, err = indexr.Postings(ctx, n, v)
}
if err != nil {
return err
}

chunkr, err := block.Chunks()
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions docs/command-line/promtool.md
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ Analyze churn, label pair cardinality and compaction efficiency.
| --- | --- | --- |
| <code class="text-nowrap">--limit</code> | How many items to show in each list. | `20` |
| <code class="text-nowrap">--extended</code> | Run extended analysis. | |
| <code class="text-nowrap">--match</code> | Series selector to analyze. Only 1 set of matchers is supported now. | |



Expand Down
51 changes: 17 additions & 34 deletions promql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,41 +1190,24 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
}

for si, series := range matrixes[i] {
for _, point := range series.Floats {
if point.T == ts {
if ev.currentSamples < ev.maxSamples {
vectors[i] = append(vectors[i], Sample{Metric: series.Metric, F: point.F, T: ts})
if prepSeries != nil {
bufHelpers[i] = append(bufHelpers[i], seriesHelpers[i][si])
}

// Move input vectors forward so we don't have to re-scan the same
// past points at the next step.
matrixes[i][si].Floats = series.Floats[1:]
ev.currentSamples++
} else {
ev.error(ErrTooManySamples(env))
}
}
break
switch {
case len(series.Floats) > 0 && series.Floats[0].T == ts:
vectors[i] = append(vectors[i], Sample{Metric: series.Metric, F: series.Floats[0].F, T: ts})
// Move input vectors forward so we don't have to re-scan the same
// past points at the next step.
matrixes[i][si].Floats = series.Floats[1:]
case len(series.Histograms) > 0 && series.Histograms[0].T == ts:
vectors[i] = append(vectors[i], Sample{Metric: series.Metric, H: series.Histograms[0].H, T: ts})
matrixes[i][si].Histograms = series.Histograms[1:]
default:
continue
}
for _, point := range series.Histograms {
if point.T == ts {
if ev.currentSamples < ev.maxSamples {
vectors[i] = append(vectors[i], Sample{Metric: series.Metric, H: point.H, T: ts})
if prepSeries != nil {
bufHelpers[i] = append(bufHelpers[i], seriesHelpers[i][si])
}

// Move input vectors forward so we don't have to re-scan the same
// past points at the next step.
matrixes[i][si].Histograms = series.Histograms[1:]
ev.currentSamples++
} else {
ev.error(ErrTooManySamples(env))
}
}
break
if prepSeries != nil {
bufHelpers[i] = append(bufHelpers[i], seriesHelpers[i][si])
}
ev.currentSamples++
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
}
args[i] = vectors[i]
Expand Down
12 changes: 7 additions & 5 deletions storage/remote/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ type RecoverableError struct {

// Store sends a batch of samples to the HTTP endpoint, the request is the proto marshalled
// and encoded bytes from codec.go.
func (c *Client) Store(ctx context.Context, req []byte) error {
func (c *Client) Store(ctx context.Context, req []byte, attempt int) error {
httpReq, err := http.NewRequest("POST", c.urlString, bytes.NewReader(req))
if err != nil {
// Errors from NewRequest are from unparsable URLs, so are not
Expand All @@ -207,6 +207,10 @@ func (c *Client) Store(ctx context.Context, req []byte) error {
httpReq.Header.Set("Content-Type", "application/x-protobuf")
httpReq.Header.Set("User-Agent", UserAgent)
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
if attempt > 0 {
httpReq.Header.Set("Retry-Attempt", strconv.Itoa(attempt))
}

ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()

Expand All @@ -232,10 +236,8 @@ func (c *Client) Store(ctx context.Context, req []byte) error {
}
err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, line)
}
if httpResp.StatusCode/100 == 5 {
return RecoverableError{err, defaultBackoff}
}
if c.retryOnRateLimit && httpResp.StatusCode == http.StatusTooManyRequests {
if httpResp.StatusCode/100 == 5 ||
(c.retryOnRateLimit && httpResp.StatusCode == http.StatusTooManyRequests) {
return RecoverableError{err, retryAfterDuration(httpResp.Header.Get("Retry-After"))}
}
return err
Expand Down
68 changes: 42 additions & 26 deletions storage/remote/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestStoreHTTPErrorHandling(t *testing.T) {
c, err := NewWriteClient(hash, conf)
require.NoError(t, err)

err = c.Store(context.Background(), []byte{})
err = c.Store(context.Background(), []byte{}, 0)
if test.err != nil {
require.EqualError(t, err, test.err.Error())
} else {
Expand All @@ -85,12 +85,22 @@ func TestStoreHTTPErrorHandling(t *testing.T) {
}

func TestClientRetryAfter(t *testing.T) {
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, longErrMessage, http.StatusTooManyRequests)
}),
)
defer server.Close()
setupServer := func(statusCode int) *httptest.Server {
return httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Retry-After", "5")
http.Error(w, longErrMessage, statusCode)
}),
)
}

getClientConfig := func(serverURL *url.URL, retryOnRateLimit bool) *ClientConfig {
return &ClientConfig{
URL: &config_util.URL{URL: serverURL},
Timeout: model.Duration(time.Second),
RetryOnRateLimit: retryOnRateLimit,
}
}

getClient := func(conf *ClientConfig) WriteClient {
hash, err := toHash(conf)
Expand All @@ -100,30 +110,36 @@ func TestClientRetryAfter(t *testing.T) {
return c
}

serverURL, err := url.Parse(server.URL)
require.NoError(t, err)

conf := &ClientConfig{
URL: &config_util.URL{URL: serverURL},
Timeout: model.Duration(time.Second),
RetryOnRateLimit: false,
testCases := []struct {
name string
statusCode int
retryOnRateLimit bool
expectedRecoverable bool
expectedRetryAfter model.Duration
}{
{"TooManyRequests - No Retry", http.StatusTooManyRequests, false, false, 0},
{"TooManyRequests - With Retry", http.StatusTooManyRequests, true, true, 5 * model.Duration(time.Second)},
{"InternalServerError", http.StatusInternalServerError, false, true, 5 * model.Duration(time.Second)}, // HTTP 5xx errors do not depend on retryOnRateLimit.
}

var recErr RecoverableError
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
server := setupServer(tc.statusCode)
defer server.Close()

c := getClient(conf)
err = c.Store(context.Background(), []byte{})
require.False(t, errors.As(err, &recErr), "Recoverable error not expected.")
serverURL, err := url.Parse(server.URL)
require.NoError(t, err)

conf = &ClientConfig{
URL: &config_util.URL{URL: serverURL},
Timeout: model.Duration(time.Second),
RetryOnRateLimit: true,
}
c := getClient(getClientConfig(serverURL, tc.retryOnRateLimit))

c = getClient(conf)
err = c.Store(context.Background(), []byte{})
require.True(t, errors.As(err, &recErr), "Recoverable error was expected.")
var recErr RecoverableError
err = c.Store(context.Background(), []byte{}, 0)
require.Equal(t, tc.expectedRecoverable, errors.As(err, &recErr), "Mismatch in expected recoverable error status.")
if tc.expectedRecoverable {
require.Equal(t, tc.expectedRetryAfter, err.(RecoverableError).retryAfter)
}
})
}
}

func TestRetryAfterDuration(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions storage/remote/queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (m *queueManagerMetrics) unregister() {
// external timeseries database.
type WriteClient interface {
// Store stores the given samples in the remote storage.
Store(context.Context, []byte) error
Store(context.Context, []byte, int) error
// Name uniquely identifies the remote storage.
Name() string
// Endpoint is the remote read or write endpoint for the storage client.
Expand Down Expand Up @@ -552,7 +552,7 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p
}

begin := time.Now()
err := t.storeClient.Store(ctx, req)
err := t.storeClient.Store(ctx, req, try)
t.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())

if err != nil {
Expand Down Expand Up @@ -1526,7 +1526,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
s.qm.metrics.samplesTotal.Add(float64(sampleCount))
s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
s.qm.metrics.histogramsTotal.Add(float64(histogramCount))
err := s.qm.client().Store(ctx, *buf)
err := s.qm.client().Store(ctx, *buf, try)
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())

if err != nil {
Expand Down
Loading

0 comments on commit 320f0c9

Please sign in to comment.