Skip to content

Commit

Permalink
Add counter reset test for mmapped OOO chunks (#544)
Browse files Browse the repository at this point in the history
* Add counter reset test for mmapped OOO chunks

Fixed a couple of bugs that were discovered while writing the test:
- mint for ooo chunk needed to be set when a new chunk was created
- first chunk was using the wrong previous appender

Signed-off-by: Fiona Liao <[email protected]>

* nolint:staticcheck

---------

Signed-off-by: Fiona Liao <[email protected]>
  • Loading branch information
fionaliao committed Nov 29, 2023
1 parent e78f876 commit 40f2f3b
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 9 deletions.
184 changes: 184 additions & 0 deletions tsdb/head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4348,6 +4348,190 @@ func TestHistogramCounterResetHeader(t *testing.T) {
}
}

type expOOOMmappedChunks struct {
header chunkenc.CounterResetHeader
mint, maxt int64
numSamples uint16
}

func TestOOOHistogramCounterResetHeaders(t *testing.T) {
for _, floatHisto := range []bool{true, false} {
t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) {
l := labels.FromStrings("a", "b")
head, _ := newTestHead(t, 1000, wlog.CompressionNone, true)
head.opts.OutOfOrderCapMax.Store(5)

t.Cleanup(func() {
require.NoError(t, head.Close())
})
require.NoError(t, head.Init(0))

appendHistogram := func(ts int64, h *histogram.Histogram) {
app := head.Appender(context.Background())
var err error
if floatHisto {
_, err = app.AppendHistogram(0, l, ts, nil, h.ToFloat())
} else {
_, err = app.AppendHistogram(0, l, ts, h.Copy(), nil)
}
require.NoError(t, err)
require.NoError(t, app.Commit())
}

var expChunks []expOOOMmappedChunks
checkOOOExpCounterResetHeader := func(newChunks ...expOOOMmappedChunks) {
expChunks = append(expChunks, newChunks...)

ms, _, err := head.getOrCreate(l.Hash(), l)
require.NoError(t, err)

require.Len(t, ms.ooo.oooMmappedChunks, len(expChunks))

for i, mmapChunk := range ms.ooo.oooMmappedChunks {
chk, err := head.chunkDiskMapper.Chunk(mmapChunk.ref)
require.NoError(t, err)
if floatHisto {
require.Equal(t, expChunks[i].header, chk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader())
} else {
require.Equal(t, expChunks[i].header, chk.(*chunkenc.HistogramChunk).GetCounterResetHeader())
}
require.Equal(t, expChunks[i].mint, mmapChunk.minTime)
require.Equal(t, expChunks[i].maxt, mmapChunk.maxTime)
require.Equal(t, expChunks[i].numSamples, mmapChunk.numSamples)
}
}

h := tsdbutil.GenerateTestHistograms(1)[0]
h.PositiveBuckets = []int64{100, 1, 1, 1}
h.NegativeBuckets = []int64{100, 1, 1, 1}
h.Count = 1000

// Append an in-order histogram, so the rest of the samples can be detected as OOO.
appendHistogram(1000, h)

// OOO histogram
for i := 1; i <= 5; i++ {
h.Count = 1000 + uint64(i)
appendHistogram(100+int64(i), h)
}
// Nothing mmapped yet.
checkOOOExpCounterResetHeader()

// 6th observation (which triggers a head chunk mmapping).
h.Count = 1002
appendHistogram(int64(112), h)

// One mmapped chunk with (ts, val) [(101, 1001), (102, 1002), (103, 1003), (104, 1004), (105, 1005)].
checkOOOExpCounterResetHeader(expOOOMmappedChunks{
header: chunkenc.UnknownCounterReset,
mint: 101,
maxt: 105,
numSamples: 5,
})

// Add more samples, there's a counter reset at ts 122.
h.Count = 1001
appendHistogram(int64(110), h)
h.Count = 904
appendHistogram(int64(124), h)
h.Count = 903
appendHistogram(int64(123), h)
h.Count = 902
appendHistogram(int64(122), h)

// New samples not mmapped yet.
checkOOOExpCounterResetHeader()

// 11th observation (which triggers another head chunk mmapping).
h.Count = 2000
appendHistogram(int64(200), h)

// Two new mmapped chunks [(110, 1001), (112, 1002)], [(122, 902), (123, 903), (124, 904)].
checkOOOExpCounterResetHeader(
expOOOMmappedChunks{
header: chunkenc.UnknownCounterReset,
mint: 110,
maxt: 112,
numSamples: 2,
},
expOOOMmappedChunks{
header: chunkenc.CounterReset,
mint: 122,
maxt: 124,
numSamples: 3,
},
)

// Count is lower than previous sample at ts 200, so the NotCounterReset is ignored.
h.Count = 1000
h.CounterResetHint = histogram.NotCounterReset
appendHistogram(int64(205), h)

h.Count = 2010
h.CounterResetHint = histogram.CounterReset
appendHistogram(int64(210), h)

h.Count = 2020
h.CounterResetHint = histogram.UnknownCounterReset
appendHistogram(int64(220), h)

h.Count = 2005
appendHistogram(int64(215), h)

// 16th observation (which triggers another head chunk mmapping).
h.Count = 4000
appendHistogram(int64(350), h)

// Four new mmapped chunks: [(200, 2000)] [(205, 1000)], [(210, 2010)], [(215, 2015), (220, 2020)]
checkOOOExpCounterResetHeader(
expOOOMmappedChunks{
header: chunkenc.UnknownCounterReset,
mint: 200,
maxt: 200,
numSamples: 1,
},
expOOOMmappedChunks{
header: chunkenc.CounterReset,
mint: 205,
maxt: 205,
numSamples: 1,
},
expOOOMmappedChunks{
header: chunkenc.CounterReset,
mint: 210,
maxt: 210,
numSamples: 1,
},
expOOOMmappedChunks{
header: chunkenc.CounterReset,
mint: 215,
maxt: 220,
numSamples: 2,
},
)

// Adding five more samples (21 in total), so another mmapped chunk is created.
h.CounterResetHint = histogram.CounterReset
h.Count = 3000
appendHistogram(300, h)

h.CounterResetHint = histogram.UnknownCounterReset
for i := 1; i <= 4; i++ {
h.Count = 3000 + uint64(i)
appendHistogram(300+int64(i), h)
}

// One mmapped chunk with (ts, val) [(300, 3000), (301, 3001), (302, 3002), (303, 3003), (350, 4000)].
checkOOOExpCounterResetHeader(expOOOMmappedChunks{
header: chunkenc.CounterReset,
mint: 300,
maxt: 350,
numSamples: 5,
})
})
}
}

func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
dir := t.TempDir()
opts := DefaultOptions()
Expand Down
14 changes: 7 additions & 7 deletions tsdb/head_wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ Outer:
idx := uint64(mSeries.ref) % uint64(concurrency)
processors[idx].input <- walSubsetProcessorInputItem{walSeriesRef: walSeries.Ref, existingSeries: mSeries}
}
seriesPool.Put(v)
seriesPool.Put(v) //nolint:staticcheck
case []record.RefSample:
samples := v
minValidTime := h.minValidTime.Load()
Expand Down Expand Up @@ -288,7 +288,7 @@ Outer:
}
samples = samples[m:]
}
samplesPool.Put(v)
samplesPool.Put(v) //nolint:staticcheck
case []tombstones.Stone:
for _, s := range v {
for _, itv := range s.Intervals {
Expand All @@ -302,12 +302,12 @@ Outer:
h.tombstones.AddInterval(s.Ref, itv)
}
}
tstonesPool.Put(v)
tstonesPool.Put(v) //nolint:staticcheck
case []record.RefExemplar:
for _, e := range v {
exemplarsInput <- e
}
exemplarsPool.Put(v)
exemplarsPool.Put(v) //nolint:staticcheck
case []record.RefHistogramSample:
samples := v
minValidTime := h.minValidTime.Load()
Expand Down Expand Up @@ -343,7 +343,7 @@ Outer:
}
samples = samples[m:]
}
histogramsPool.Put(v)
histogramsPool.Put(v) //nolint:staticcheck
case []record.RefFloatHistogramSample:
samples := v
minValidTime := h.minValidTime.Load()
Expand Down Expand Up @@ -379,7 +379,7 @@ Outer:
}
samples = samples[m:]
}
floatHistogramsPool.Put(v)
floatHistogramsPool.Put(v) //nolint:staticcheck
case []record.RefMetadata:
for _, m := range v {
s := h.series.getByID(m.Ref)
Expand All @@ -393,7 +393,7 @@ Outer:
Help: m.Help,
}
}
metadataPool.Put(v)
metadataPool.Put(v) //nolint:staticcheck
default:
panic(fmt.Errorf("unexpected decoded type: %T", d))
}
Expand Down
12 changes: 10 additions & 2 deletions tsdb/ooo_head.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
} else if s.fh != nil {
encoding = chunkenc.EncFloatHistogram
}

// prevApp is the appender for the previous sample.
prevApp := app

if encoding != prevEncoding { // For the first sample, this will always be true as EncNone != EncXOR | EncHistogram | EncFloatHistogram
if prevEncoding != chunkenc.EncNone {
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
Expand All @@ -126,7 +130,8 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
case chunkenc.EncXOR:
app.Append(s.t, s.f)
case chunkenc.EncHistogram:
prevHApp, _ := app.(*chunkenc.HistogramAppender)
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevHApp, _ := prevApp.(*chunkenc.HistogramAppender)
var (
newChunk chunkenc.Chunk
recoded bool
Expand All @@ -137,9 +142,11 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
}
chunk = newChunk
cmint = s.t
}
case chunkenc.EncFloatHistogram:
prevHApp, _ := app.(*chunkenc.FloatHistogramAppender)
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevHApp, _ := prevApp.(*chunkenc.FloatHistogramAppender)
var (
newChunk chunkenc.Chunk
recoded bool
Expand All @@ -150,6 +157,7 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
}
chunk = newChunk
cmint = s.t
}
}
cmaxt = s.t
Expand Down

0 comments on commit 40f2f3b

Please sign in to comment.