From 5c5011be2afafa0ea7d3d8f8839671a5fdc833c8 Mon Sep 17 00:00:00 2001 From: Fiona Liao Date: Mon, 9 Oct 2023 16:43:45 +0100 Subject: [PATCH] Add counter reset test to head_test.go --- tsdb/head_read.go | 4 +- tsdb/head_test.go | 118 ++++++++++++++++++++++++++++++++++++++++++ tsdb/ooo_head.go | 12 ++++- tsdb/ooo_head_test.go | 3 +- 4 files changed, 133 insertions(+), 4 deletions(-) diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 0649a954801..d642bd572ed 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -15,10 +15,11 @@ package tsdb import ( "context" - "github.com/prometheus/prometheus/tsdb/tsdbutil" "math" "sync" + "github.com/prometheus/prometheus/tsdb/tsdbutil" + "github.com/go-kit/log/level" "github.com/pkg/errors" "golang.org/x/exp/slices" @@ -501,6 +502,7 @@ func (s *memSeries) oooMergedChunk(meta chunks.Meta, cdm *chunks.ChunkDiskMapper continue } if c.meta.Ref == oooHeadRef { + //TODO: make sure this is tested (ooo head chunk is read and preserves counter resets) chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(meta.OOOLastMinTime, meta.OOOLastMaxTime) if err != nil { return nil, errors.Wrap(err, "failed to convert ooo head chunk to encoded chunk(s)") diff --git a/tsdb/head_test.go b/tsdb/head_test.go index a0f4f26c5b9..6f06bd99fb9 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -4177,6 +4177,124 @@ func TestHistogramCounterResetHeader(t *testing.T) { } } +func TestOOOHistogramCounterResetHeaders(t *testing.T) { + for _, floatHisto := range []bool{true, false} { + t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) { + l := labels.FromStrings("a", "b") + head, _ := newTestHead(t, 1000, wlog.CompressionNone, true) + head.opts.OutOfOrderCapMax.Store(5) + + t.Cleanup(func() { + require.NoError(t, head.Close()) + }) + require.NoError(t, head.Init(0)) + + appendHistogram := func(ts int64, h *histogram.Histogram) { + app := head.Appender(context.Background()) + var err error + if floatHisto { + _, err = app.AppendHistogram(0, l, ts, nil, h.ToFloat()) + } else { + _, err = app.AppendHistogram(0, l, ts, h.Copy(), nil) + } + require.NoError(t, err) + require.NoError(t, app.Commit()) + } + + var expOOOMmappedHeaders []chunkenc.CounterResetHeader + checkOOOExpCounterResetHeader := func(newHeaders ...chunkenc.CounterResetHeader) { + expOOOMmappedHeaders = append(expOOOMmappedHeaders, newHeaders...) + + ms, _, err := head.getOrCreate(l.Hash(), l) + require.NoError(t, err) + + require.Len(t, ms.ooo.oooMmappedChunks, len(expOOOMmappedHeaders)) + + for i, mmapChunk := range ms.ooo.oooMmappedChunks { + chk, err := head.chunkDiskMapper.Chunk(mmapChunk.ref) + require.NoError(t, err) + if floatHisto { + require.Equal(t, expOOOMmappedHeaders[i], chk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader()) + } else { + require.Equal(t, expOOOMmappedHeaders[i], chk.(*chunkenc.HistogramChunk).GetCounterResetHeader()) + } + } + } + + h := tsdbutil.GenerateTestHistograms(1)[0] + h.PositiveBuckets = []int64{100, 1, 1, 1} + h.NegativeBuckets = []int64{100, 1, 1, 1} + h.Count = 1000 + + // Append an in-order histogram, so the rest of the samples can be detected as OOO. + appendHistogram(1000, h) + + // OOO histogram + for i := 1; i <= 5; i++ { + h.Count = 1000 + uint64(i) + appendHistogram(100+int64(i), h) + } + // Nothing mmapped yet. + checkOOOExpCounterResetHeader() + + // 6th observation (which triggers a head chunk mmapping). + h.Count = 1002 + appendHistogram(int64(112), h) + + // One mmapped chunk with (ts, val) [(101, 1001), (102, 1002), (103, 1003), (104, 1004), (105, 1005)]. + checkOOOExpCounterResetHeader(chunkenc.UnknownCounterReset) + + // Add more samples, there's a counter reset at ts 122. + h.Count = 1001 + appendHistogram(int64(110), h) + h.Count = 904 + appendHistogram(int64(124), h) + h.Count = 903 + appendHistogram(int64(123), h) + h.Count = 902 + appendHistogram(int64(122), h) + + // New samples not mmapped yet. + checkOOOExpCounterResetHeader() + + // 11th observation (which triggers another head chunk mmapping). + h.Count = 2000 + appendHistogram(int64(200), h) + + // Two new mmapped chunks [(110, 1001), (112, 1002)], [(122, 902), (123, 903), (124, 904)]. + checkOOOExpCounterResetHeader(chunkenc.UnknownCounterReset, chunkenc.CounterReset) + + // Count is lower than previous sample at ts 200, so the NotCounterReset is ignored. + h.Count = 1000 + h.CounterResetHint = histogram.NotCounterReset + appendHistogram(int64(205), h) + + // The hint is currently not respected. This is fixed by https://github.com/prometheus/prometheus/commit/b6f903b5f92b5458ad2244d9f442f7f859c01eb3 + h.Count = 2010 + h.CounterResetHint = histogram.CounterReset + appendHistogram(int64(210), h) + + h.Count = 2020 + h.CounterResetHint = histogram.UnknownCounterReset + appendHistogram(int64(220), h) + + h.Count = 2005 + appendHistogram(int64(215), h) + + // 15th observation (which triggers another head chunk mmapping). + h.Count = 2015 + appendHistogram(int64(216), h) + + //TODO: counter reset at start of chunk + + //FIXED: minTime and maxTime incorrect + // [(200, 2000)] [(205, 1000)], (210, 2010)], [(215, 2015), (220, 2020)] + //TODO: check time ranges for chunks as well + checkOOOExpCounterResetHeader(chunkenc.UnknownCounterReset, chunkenc.CounterReset, chunkenc.CounterReset, chunkenc.CounterReset) + }) + } +} + func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { dir := t.TempDir() opts := DefaultOptions() diff --git a/tsdb/ooo_head.go b/tsdb/ooo_head.go index 391539f8103..225183c79fb 100644 --- a/tsdb/ooo_head.go +++ b/tsdb/ooo_head.go @@ -99,6 +99,10 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error } else if s.fh != nil { encoding = chunkenc.EncFloatHistogram } + + // prevApp is the appender for the previous sample. + prevApp := app + if encoding != prevEncoding { // For the first sample, this will always be true as EncNone != EncXOR | EncHistogram | EncFloatHistogram if prevEncoding != chunkenc.EncNone { chks = append(chks, memChunk{chunk, cmint, cmaxt, nil}) @@ -123,7 +127,8 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error case chunkenc.EncXOR: app.Append(s.t, s.f) case chunkenc.EncHistogram: - prevHApp, _ := app.(*chunkenc.HistogramAppender) + // Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway. + prevHApp, _ := prevApp.(*chunkenc.HistogramAppender) var ( newChunk chunkenc.Chunk recoded bool @@ -134,9 +139,11 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error chks = append(chks, memChunk{chunk, cmint, cmaxt, nil}) } chunk = newChunk + cmint = s.t } case chunkenc.EncFloatHistogram: - prevHApp, _ := app.(*chunkenc.FloatHistogramAppender) + // Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway. + prevHApp, _ := prevApp.(*chunkenc.FloatHistogramAppender) var ( newChunk chunkenc.Chunk recoded bool @@ -147,6 +154,7 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error chks = append(chks, memChunk{chunk, cmint, cmaxt, nil}) } chunk = newChunk + cmint = s.t } } cmaxt = s.t diff --git a/tsdb/ooo_head_test.go b/tsdb/ooo_head_test.go index dca17aaa7c1..d3d1557d24b 100644 --- a/tsdb/ooo_head_test.go +++ b/tsdb/ooo_head_test.go @@ -14,9 +14,10 @@ package tsdb import ( - "github.com/prometheus/prometheus/tsdb/tsdbutil" "testing" + "github.com/prometheus/prometheus/tsdb/tsdbutil" + "github.com/stretchr/testify/require" )