Skip to content

Commit

Permalink
Add Postings.Reset method
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <[email protected]>
  • Loading branch information
aknuds1 committed Sep 11, 2023
1 parent 8447fef commit cbd8123
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 129 deletions.
49 changes: 1 addition & 48 deletions tsdb/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1085,8 +1085,6 @@ type Reader struct {

// Provides a cache mapping series labels hash by series ID.
cacheProvider ReaderCacheProvider

postingsCache map[uint64]Postings
}

type postingOffset struct {
Expand Down Expand Up @@ -1153,7 +1151,6 @@ func newReader(b ByteSlice, c io.Closer, cacheProvider ReaderCacheProvider) (*Re
c: c,
postings: map[string][]postingOffset{},
cacheProvider: cacheProvider,
postingsCache: map[uint64]Postings{},
}

// Verify header.
Expand Down Expand Up @@ -1231,10 +1228,6 @@ func newReader(b ByteSlice, c io.Closer, cacheProvider ReaderCacheProvider) (*Re
copy(l, v)
r.postings[k] = l
}

if err := r.populateCache(); err != nil {
return nil, err
}
}

r.nameSymbols = make(map[uint32]string, len(r.postings))
Expand All @@ -1254,47 +1247,6 @@ func newReader(b ByteSlice, c io.Closer, cacheProvider ReaderCacheProvider) (*Re
return r, nil
}

func (r *Reader) populateCache() error {
for _, e := range r.postings {
d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil)
d.Skip(e[0].off)
lastVal := e[len(e)-1].value

skip := 0
for d.Err() == nil {
if skip == 0 {
// These are always the same number of bytes,
// and it's faster to skip than to parse.
skip = d.Len()
d.Uvarint() // Keycount.
d.UvarintBytes() // Label name.
skip -= d.Len()
} else {
d.Skip(skip)
}

// Label value
v := yoloString(d.UvarintBytes())

postingsOff := d.Uvarint64()
// Read from the postings table
d2 := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable)
_, p, err := r.dec.Postings(d2.Get())
if err != nil {
return errors.Wrap(err, "decode postings")
}

r.postingsCache[postingsOff] = p

if v == lastVal {
break
}
}
}

return nil
}

// Version returns the file format version of the underlying index.
func (r *Reader) Version() int {
return r.version
Expand Down Expand Up @@ -1927,6 +1879,7 @@ func (dec *Decoder) PostingsInPlace(b []byte, p *bigEndianPostings) (int, error)
return 0, fmt.Errorf("unexpected postings length, should be %d bytes for %d postings, got %d bytes", 4*n, n, len(l))
}
p.list = l
p.Reset()
return n, nil
}

Expand Down
67 changes: 28 additions & 39 deletions tsdb/index/labelvalues.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,26 +186,20 @@ func (r *Reader) LabelValuesIntersectingPostings(name string, postings Postings)
dec: r.dec,
lastVal: lastVal,
postings: postings,
cache: r.postingsCache,
seen: map[string]bool{},
}
}

type intersectLabelValues struct {
d *encoding.Decbuf
b ByteSlice
dec *Decoder
postings Postings
lastVal string
skip int
cur string
exhausted bool
err error
cache map[uint64]Postings
its [2]Postings
seen map[string]bool
p1 bigEndianPostings
p2 bigEndianPostings
d *encoding.Decbuf
b ByteSlice
dec *Decoder
postings Postings
curPostings bigEndianPostings
lastVal string
skip int
cur string
exhausted bool
err error
}

func (it *intersectLabelValues) Next() bool {
Expand All @@ -229,43 +223,38 @@ func (it *intersectLabelValues) Next() bool {
v := yoloString(it.d.UvarintBytes())

postingsOff := it.d.Uvarint64()
p, ok := it.cache[postingsOff]
if ok {
p.(*bigEndianPostings).CloneInPlace(&it.p1)
} else {
// Read from the postings table
d2 := encoding.NewDecbufAt(it.b, int(postingsOff), castagnoliTable)
var err error
_, err = it.dec.PostingsInPlace(d2.Get(), &it.p1)
if err != nil {
it.err = errors.Wrap(err, "decode postings")
return false
}
// Read from the postings table
d2 := encoding.NewDecbufAt(it.b, int(postingsOff), castagnoliTable)
_, err := it.dec.PostingsInPlace(d2.Get(), &it.curPostings)
if err != nil {
it.err = errors.Wrap(err, "decode postings")
return false
}

it.exhausted = v == it.lastVal

it.postings.(*bigEndianPostings).CloneInPlace(&it.p2)
if !it.p1.Next() || !it.p2.Next() {
it.postings.Reset()
if !it.curPostings.Next() || !it.postings.Next() {
continue
}
cur := it.p1.At()
if it.p2.At() > cur {
cur = it.p2.At()
cur := it.curPostings.At()
if it.postings.At() > cur {
cur = it.postings.At()
}

// Find at least one intersecting series
for {
if !it.p1.Seek(cur) {
if !it.curPostings.Seek(cur) {
break
}
if it.p1.At() > cur {
cur = it.p1.At()
if it.curPostings.At() > cur {
cur = it.curPostings.At()
}
if !it.p2.Seek(cur) {
if !it.postings.Seek(cur) {
break
}
if it.p2.At() > cur {
cur = it.p2.At()
if it.postings.At() > cur {
cur = it.postings.At()
continue
}

Expand Down
Loading

0 comments on commit cbd8123

Please sign in to comment.