Skip to content

Commit

Permalink
Add mmapped ooo chunks counter reset test
Browse files Browse the repository at this point in the history
  • Loading branch information
fionaliao committed Oct 10, 2023
1 parent 0b00227 commit 0d8f62b
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 4 deletions.
4 changes: 3 additions & 1 deletion tsdb/head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ package tsdb

import (
"context"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"math"
"sync"

"github.com/prometheus/prometheus/tsdb/tsdbutil"

"github.com/go-kit/log/level"
"github.com/pkg/errors"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -501,6 +502,7 @@ func (s *memSeries) oooMergedChunk(meta chunks.Meta, cdm *chunks.ChunkDiskMapper
continue
}
if c.meta.Ref == oooHeadRef {
//TODO: make sure this is tested (ooo head chunk is read and preserves counter resets)
chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(meta.OOOLastMinTime, meta.OOOLastMaxTime)
if err != nil {
return nil, errors.Wrap(err, "failed to convert ooo head chunk to encoded chunk(s)")
Expand Down
187 changes: 187 additions & 0 deletions tsdb/head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4177,6 +4177,193 @@ func TestHistogramCounterResetHeader(t *testing.T) {
}
}

type expOOOMmappedChunks struct {
header chunkenc.CounterResetHeader
mint, maxt int64
numSamples uint16
}

func TestOOOHistogramCounterResetHeaders(t *testing.T) {
for _, floatHisto := range []bool{true, false} {
t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) {
l := labels.FromStrings("a", "b")
head, _ := newTestHead(t, 1000, wlog.CompressionNone, true)
head.opts.OutOfOrderCapMax.Store(5)

t.Cleanup(func() {
require.NoError(t, head.Close())
})
require.NoError(t, head.Init(0))

appendHistogram := func(ts int64, h *histogram.Histogram) {
app := head.Appender(context.Background())
var err error
if floatHisto {
_, err = app.AppendHistogram(0, l, ts, nil, h.ToFloat())
} else {
_, err = app.AppendHistogram(0, l, ts, h.Copy(), nil)
}
require.NoError(t, err)
require.NoError(t, app.Commit())
}

var expChunks []expOOOMmappedChunks
checkOOOExpCounterResetHeader := func(newChunks ...expOOOMmappedChunks) {
expChunks = append(expChunks, newChunks...)

ms, _, err := head.getOrCreate(l.Hash(), l)
require.NoError(t, err)

require.Len(t, ms.ooo.oooMmappedChunks, len(expChunks))

for i, mmapChunk := range ms.ooo.oooMmappedChunks {
chk, err := head.chunkDiskMapper.Chunk(mmapChunk.ref)
require.NoError(t, err)
if floatHisto {
require.Equal(t, expChunks[i].header, chk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader())
} else {
require.Equal(t, expChunks[i].header, chk.(*chunkenc.HistogramChunk).GetCounterResetHeader())
}
require.Equal(t, expChunks[i].mint, mmapChunk.minTime)
require.Equal(t, expChunks[i].maxt, mmapChunk.maxTime)
require.Equal(t, expChunks[i].numSamples, mmapChunk.numSamples)
}
}

h := tsdbutil.GenerateTestHistograms(1)[0]
h.PositiveBuckets = []int64{100, 1, 1, 1}
h.NegativeBuckets = []int64{100, 1, 1, 1}
h.Count = 1000

// Append an in-order histogram, so the rest of the samples can be detected as OOO.
appendHistogram(1000, h)

// OOO histogram
for i := 1; i <= 5; i++ {
h.Count = 1000 + uint64(i)
appendHistogram(100+int64(i), h)
}
// Nothing mmapped yet.
checkOOOExpCounterResetHeader()

// 6th observation (which triggers a head chunk mmapping).
h.Count = 1002
appendHistogram(int64(112), h)

// One mmapped chunk with (ts, val) [(101, 1001), (102, 1002), (103, 1003), (104, 1004), (105, 1005)].
checkOOOExpCounterResetHeader(expOOOMmappedChunks{
header: chunkenc.UnknownCounterReset,
mint: 101,
maxt: 105,
numSamples: 5,
})

// Add more samples, there's a counter reset at ts 122.
h.Count = 1001
appendHistogram(int64(110), h)
h.Count = 904
appendHistogram(int64(124), h)
h.Count = 903
appendHistogram(int64(123), h)
h.Count = 902
appendHistogram(int64(122), h)

// New samples not mmapped yet.
checkOOOExpCounterResetHeader()

// 11th observation (which triggers another head chunk mmapping).
h.Count = 2000
appendHistogram(int64(200), h)

// Two new mmapped chunks [(110, 1001), (112, 1002)], [(122, 902), (123, 903), (124, 904)].
checkOOOExpCounterResetHeader(
expOOOMmappedChunks{
header: chunkenc.UnknownCounterReset,
mint: 110,
maxt: 112,
numSamples: 2,
},
expOOOMmappedChunks{
header: chunkenc.CounterReset,
mint: 122,
maxt: 124,
numSamples: 3,
},
)

// Count is lower than previous sample at ts 200, so the NotCounterReset is ignored.
h.Count = 1000
h.CounterResetHint = histogram.NotCounterReset
appendHistogram(int64(205), h)

//FIXME: The reset hint is currently not respected (so assertion is failing). This is fixed by https://github.com/prometheus/prometheus/commit/b6f903b5f92b5458ad2244d9f442f7f859c01eb3
h.Count = 2010
h.CounterResetHint = histogram.CounterReset
appendHistogram(int64(210), h)

h.Count = 2020
h.CounterResetHint = histogram.UnknownCounterReset
appendHistogram(int64(220), h)

h.Count = 2005
appendHistogram(int64(215), h)

// 16th observation (which triggers another head chunk mmapping).
h.Count = 4000
appendHistogram(int64(350), h)

// Four new mmapped chunks: [(200, 2000)] [(205, 1000)], [(210, 2010)], [(215, 2015), (220, 2020)]
checkOOOExpCounterResetHeader(
expOOOMmappedChunks{
header: chunkenc.UnknownCounterReset,
mint: 200,
maxt: 200,
numSamples: 1,
},
expOOOMmappedChunks{
header: chunkenc.CounterReset,
mint: 205,
maxt: 205,
numSamples: 1,
},
expOOOMmappedChunks{
header: chunkenc.CounterReset,
mint: 210,
maxt: 210,
numSamples: 1,
},
expOOOMmappedChunks{
header: chunkenc.CounterReset,
mint: 215,
maxt: 220,
numSamples: 2,
},
)

// Adding five more samples (21 in total), so another mmapped chunk is created.
h.CounterResetHint = histogram.CounterReset
h.Count = 3000
appendHistogram(300, h)

h.CounterResetHint = histogram.UnknownCounterReset
for i := 1; i <= 4; i++ {
h.Count = 3000 + uint64(i)
appendHistogram(300+int64(i), h)
}

// One mmapped chunk with (ts, val) [(300, 3000), (301, 3001), (302, 3002), (303, 3003), (350, 4000)].
//FIXME: The reset hint is currently not respected (so assertion is failing). This is fixed by https://github.com/prometheus/prometheus/commit/b6f903b5f92b5458ad2244d9f442f7f859c01eb3
checkOOOExpCounterResetHeader(expOOOMmappedChunks{
header: chunkenc.CounterReset,
mint: 300,
maxt: 350,
numSamples: 5,
})

})
}
}

func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
dir := t.TempDir()
opts := DefaultOptions()
Expand Down
12 changes: 10 additions & 2 deletions tsdb/ooo_head.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
} else if s.fh != nil {
encoding = chunkenc.EncFloatHistogram
}

// prevApp is the appender for the previous sample.
prevApp := app

if encoding != prevEncoding { // For the first sample, this will always be true as EncNone != EncXOR | EncHistogram | EncFloatHistogram
if prevEncoding != chunkenc.EncNone {
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
Expand All @@ -123,7 +127,8 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
case chunkenc.EncXOR:
app.Append(s.t, s.f)
case chunkenc.EncHistogram:
prevHApp, _ := app.(*chunkenc.HistogramAppender)
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevHApp, _ := prevApp.(*chunkenc.HistogramAppender)
var (
newChunk chunkenc.Chunk
recoded bool
Expand All @@ -134,9 +139,11 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
}
chunk = newChunk
cmint = s.t
}
case chunkenc.EncFloatHistogram:
prevHApp, _ := app.(*chunkenc.FloatHistogramAppender)
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevHApp, _ := prevApp.(*chunkenc.FloatHistogramAppender)
var (
newChunk chunkenc.Chunk
recoded bool
Expand All @@ -147,6 +154,7 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
}
chunk = newChunk
cmint = s.t
}
}
cmaxt = s.t
Expand Down
3 changes: 2 additions & 1 deletion tsdb/ooo_head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
package tsdb

import (
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"testing"

"github.com/prometheus/prometheus/tsdb/tsdbutil"

"github.com/stretchr/testify/require"
)

Expand Down

0 comments on commit 0d8f62b

Please sign in to comment.