Skip to content

Commit

Permalink
MemPostings: keep a map of label values slices
Browse files Browse the repository at this point in the history
While investigating lock contention on `MemPostings`, we saw that lots
of locking is happening in `LabelValues` and
`PostingsForLabelsMatching`, both copying the label values slices while
holding the mutex.

This adds an extra map that holds an append-only label values slice for
each one of the label names. Since the slice is append-only, it can be
copied without holding the mutex.

Signed-off-by: Oleg Zaytsev <[email protected]>
  • Loading branch information
colega committed Nov 19, 2024
1 parent 5239f4a commit 91f28e3
Showing 1 changed file with 64 additions and 46 deletions.
110 changes: 64 additions & 46 deletions tsdb/index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"github.com/prometheus/prometheus/storage"
)

const exponentialSliceGrowthFactor = 2

var allPostingsKey = labels.Label{}

// AllPostingsKey returns the label key that is used to store the postings list of all existing IDs.
Expand All @@ -57,13 +59,15 @@ var ensureOrderBatchPool = sync.Pool{
type MemPostings struct {
mtx sync.RWMutex
m map[string]map[string][]storage.SeriesRef
lvs map[string][]string
ordered bool
}

// NewMemPostings returns a memPostings that's ready for reads and writes.
func NewMemPostings() *MemPostings {
return &MemPostings{
m: make(map[string]map[string][]storage.SeriesRef, 512),
lvs: make(map[string][]string, 512),
ordered: true,
}
}
Expand All @@ -73,6 +77,7 @@ func NewMemPostings() *MemPostings {
func NewUnorderedMemPostings() *MemPostings {
return &MemPostings{
m: make(map[string]map[string][]storage.SeriesRef, 512),
lvs: make(map[string][]string, 512),
ordered: false,
}
}
Expand All @@ -83,9 +88,9 @@ func (p *MemPostings) Symbols() StringIter {

// Add all the strings to a map to de-duplicate.
symbols := make(map[string]struct{}, 512)
for n, e := range p.m {
for n, lvs := range p.lvs {
symbols[n] = struct{}{}
for v := range e {
for _, v := range lvs {
symbols[v] = struct{}{}
}
}
Expand Down Expand Up @@ -145,13 +150,14 @@ func (p *MemPostings) LabelNames() []string {
// LabelValues returns label values for the given name.
func (p *MemPostings) LabelValues(_ context.Context, name string) []string {
p.mtx.RLock()
defer p.mtx.RUnlock()
values := p.lvs[name]
p.mtx.RUnlock()

values := make([]string, 0, len(p.m[name]))
for v := range p.m[name] {
values = append(values, v)
}
return values
// The slice from p.lvs[name] is shared between all readers, and it is append-only.
// Since it's shared, we need to make a copy of it before returning it to make
// sure that no caller modifies the original one by sorting it or filtering it.
// Since it's append-only, we can do this while not holding the mutex anymore.
return slices.Clone(values)
}

// PostingsStats contains cardinality based statistics for postings.
Expand Down Expand Up @@ -294,6 +300,7 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma
p.mtx.Lock()
defer p.mtx.Unlock()

affectedLabelNames := map[string]struct{}{}
process := func(l labels.Label) {
orig := p.m[l.Name][l.Value]
repl := make([]storage.SeriesRef, 0, len(orig))
Expand All @@ -306,10 +313,7 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma
p.m[l.Name][l.Value] = repl
} else {
delete(p.m[l.Name], l.Value)
// Delete the key if we removed all values.
if len(p.m[l.Name]) == 0 {
delete(p.m, l.Name)
}
affectedLabelNames[l.Name] = struct{}{}
}
}

Expand Down Expand Up @@ -339,6 +343,35 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma
}
}
process(allPostingsKey)

// Now we need to update the label values slices.
i = 0
for name := range affectedLabelNames {
i++
// Same mutex pause as above.
if i%512 == 0 {
p.mtx.Unlock()
p.mtx.RLock()
p.mtx.RUnlock() //nolint:staticcheck // SA2001: this is an intentionally empty critical section.
time.Sleep(time.Millisecond)
p.mtx.Lock()
}

if len(p.m[name]) == 0 {
// Delete the label name key if we deleted all values.
delete(p.m, name)
delete(p.lvs, name)
continue
}

// Create the new slice with enough room to grow without reallocating.
// We have deleted values here, so there's definitely some churn, so be prepared for it.
lvs := make([]string, 0, exponentialSliceGrowthFactor*len(p.m[name]))
for v := range p.m[name] {
lvs = append(lvs, v)
}
p.lvs[name] = lvs
}
}

// Iter calls f for each postings list. It aborts if f returns an error and returns it.
Expand Down Expand Up @@ -370,7 +403,7 @@ func (p *MemPostings) Add(id storage.SeriesRef, lset labels.Labels) {

func appendWithExponentialGrowth[T any](a []T, v T) []T {
if cap(a) < len(a)+1 {
newList := make([]T, len(a), len(a)*2+1)
newList := make([]T, len(a), len(a)*exponentialSliceGrowthFactor+1)
copy(newList, a)
a = newList
}
Expand All @@ -383,7 +416,11 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) {
nm = map[string][]storage.SeriesRef{}
p.m[l.Name] = nm
}
list := appendWithExponentialGrowth(nm[l.Value], id)
vm, ok := nm[l.Value]
if !ok {
p.lvs[l.Name] = appendWithExponentialGrowth(p.lvs[l.Name], l.Value)
}
list := appendWithExponentialGrowth(vm, id)
nm[l.Value] = list

if !p.ordered {
Expand All @@ -402,25 +439,27 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) {
}

func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings {
// We'll copy the values into a slice and then match over that,
// We'll take the label values slice and then match over that,
// this way we don't need to hold the mutex while we're matching,
// which can be slow (seconds) if the match function is a huge regex.
// Holding this lock prevents new series from being added (slows down the write path)
// and blocks the compaction process.
vals := p.labelValues(name)
for i, count := 0, 1; i < len(vals); count++ {
if count%checkContextEveryNIterations == 0 && ctx.Err() != nil {
//
// We just need to make sure we don't modify the slice we took,
// so we'll append matching values to a different one.
p.mtx.RLock()
readOnlyLabelValues := p.lvs[name]
p.mtx.RUnlock()

vals := make([]string, 0, len(readOnlyLabelValues))
for i, v := range readOnlyLabelValues {
if i%checkContextEveryNIterations == 0 && ctx.Err() != nil {
return ErrPostings(ctx.Err())
}

if match(vals[i]) {
i++
continue
if match(v) {
vals = append(vals, v)
}

// Didn't match, bring the last value to this position, make the slice shorter and check again.
// The order of the slice doesn't matter as it comes from a map iteration.
vals[i], vals = vals[len(vals)-1], vals[:len(vals)-1]
}

// If none matched (or this label had no values), no need to grab the lock again.
Expand All @@ -447,27 +486,6 @@ func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string,
return Merge(ctx, its...)
}

// labelValues returns a slice of label values for the given label name.
// It will take the read lock.
func (p *MemPostings) labelValues(name string) []string {
p.mtx.RLock()
defer p.mtx.RUnlock()

e := p.m[name]
if len(e) == 0 {
return nil
}

vals := make([]string, 0, len(e))
for v, srs := range e {
if len(srs) > 0 {
vals = append(vals, v)
}
}

return vals
}

// ExpandPostings returns the postings expanded as a slice.
func ExpandPostings(p Postings) (res []storage.SeriesRef, err error) {
for p.Next() {
Expand Down

0 comments on commit 91f28e3

Please sign in to comment.