From 91f28e332031844d1278cb40897de68747e7544a Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Fri, 15 Nov 2024 18:04:12 +0100 Subject: [PATCH] MemPostings: keep a map of label values slices While investigating lock contention on `MemPostings`, we saw that lots of locking is happening in `LabelValues` and `PostingsForLabelsMatching`, both copying the label values slices while holding the mutex. This adds an extra map that holds an append-only label values slice for each one of the label names. Since the slice is append-only, it can be copied without holding the mutex. Signed-off-by: Oleg Zaytsev --- tsdb/index/postings.go | 110 ++++++++++++++++++++++++----------------- 1 file changed, 64 insertions(+), 46 deletions(-) diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index 15536c69c..38aa3fb34 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -32,6 +32,8 @@ import ( "github.com/prometheus/prometheus/storage" ) +const exponentialSliceGrowthFactor = 2 + var allPostingsKey = labels.Label{} // AllPostingsKey returns the label key that is used to store the postings list of all existing IDs. @@ -57,6 +59,7 @@ var ensureOrderBatchPool = sync.Pool{ type MemPostings struct { mtx sync.RWMutex m map[string]map[string][]storage.SeriesRef + lvs map[string][]string ordered bool } @@ -64,6 +67,7 @@ type MemPostings struct { func NewMemPostings() *MemPostings { return &MemPostings{ m: make(map[string]map[string][]storage.SeriesRef, 512), + lvs: make(map[string][]string, 512), ordered: true, } } @@ -73,6 +77,7 @@ func NewMemPostings() *MemPostings { func NewUnorderedMemPostings() *MemPostings { return &MemPostings{ m: make(map[string]map[string][]storage.SeriesRef, 512), + lvs: make(map[string][]string, 512), ordered: false, } } @@ -83,9 +88,9 @@ func (p *MemPostings) Symbols() StringIter { // Add all the strings to a map to de-duplicate. symbols := make(map[string]struct{}, 512) - for n, e := range p.m { + for n, lvs := range p.lvs { symbols[n] = struct{}{} - for v := range e { + for _, v := range lvs { symbols[v] = struct{}{} } } @@ -145,13 +150,14 @@ func (p *MemPostings) LabelNames() []string { // LabelValues returns label values for the given name. func (p *MemPostings) LabelValues(_ context.Context, name string) []string { p.mtx.RLock() - defer p.mtx.RUnlock() + values := p.lvs[name] + p.mtx.RUnlock() - values := make([]string, 0, len(p.m[name])) - for v := range p.m[name] { - values = append(values, v) - } - return values + // The slice from p.lvs[name] is shared between all readers, and it is append-only. + // Since it's shared, we need to make a copy of it before returning it to make + // sure that no caller modifies the original one by sorting it or filtering it. + // Since it's append-only, we can do this while not holding the mutex anymore. + return slices.Clone(values) } // PostingsStats contains cardinality based statistics for postings. @@ -294,6 +300,7 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma p.mtx.Lock() defer p.mtx.Unlock() + affectedLabelNames := map[string]struct{}{} process := func(l labels.Label) { orig := p.m[l.Name][l.Value] repl := make([]storage.SeriesRef, 0, len(orig)) @@ -306,10 +313,7 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma p.m[l.Name][l.Value] = repl } else { delete(p.m[l.Name], l.Value) - // Delete the key if we removed all values. - if len(p.m[l.Name]) == 0 { - delete(p.m, l.Name) - } + affectedLabelNames[l.Name] = struct{}{} } } @@ -339,6 +343,35 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma } } process(allPostingsKey) + + // Now we need to update the label values slices. + i = 0 + for name := range affectedLabelNames { + i++ + // Same mutex pause as above. + if i%512 == 0 { + p.mtx.Unlock() + p.mtx.RLock() + p.mtx.RUnlock() //nolint:staticcheck // SA2001: this is an intentionally empty critical section. + time.Sleep(time.Millisecond) + p.mtx.Lock() + } + + if len(p.m[name]) == 0 { + // Delete the label name key if we deleted all values. + delete(p.m, name) + delete(p.lvs, name) + continue + } + + // Create the new slice with enough room to grow without reallocating. + // We have deleted values here, so there's definitely some churn, so be prepared for it. + lvs := make([]string, 0, exponentialSliceGrowthFactor*len(p.m[name])) + for v := range p.m[name] { + lvs = append(lvs, v) + } + p.lvs[name] = lvs + } } // Iter calls f for each postings list. It aborts if f returns an error and returns it. @@ -370,7 +403,7 @@ func (p *MemPostings) Add(id storage.SeriesRef, lset labels.Labels) { func appendWithExponentialGrowth[T any](a []T, v T) []T { if cap(a) < len(a)+1 { - newList := make([]T, len(a), len(a)*2+1) + newList := make([]T, len(a), len(a)*exponentialSliceGrowthFactor+1) copy(newList, a) a = newList } @@ -383,7 +416,11 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) { nm = map[string][]storage.SeriesRef{} p.m[l.Name] = nm } - list := appendWithExponentialGrowth(nm[l.Value], id) + vm, ok := nm[l.Value] + if !ok { + p.lvs[l.Name] = appendWithExponentialGrowth(p.lvs[l.Name], l.Value) + } + list := appendWithExponentialGrowth(vm, id) nm[l.Value] = list if !p.ordered { @@ -402,25 +439,27 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) { } func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings { - // We'll copy the values into a slice and then match over that, + // We'll take the label values slice and then match over that, // this way we don't need to hold the mutex while we're matching, // which can be slow (seconds) if the match function is a huge regex. // Holding this lock prevents new series from being added (slows down the write path) // and blocks the compaction process. - vals := p.labelValues(name) - for i, count := 0, 1; i < len(vals); count++ { - if count%checkContextEveryNIterations == 0 && ctx.Err() != nil { + // + // We just need to make sure we don't modify the slice we took, + // so we'll append matching values to a different one. + p.mtx.RLock() + readOnlyLabelValues := p.lvs[name] + p.mtx.RUnlock() + + vals := make([]string, 0, len(readOnlyLabelValues)) + for i, v := range readOnlyLabelValues { + if i%checkContextEveryNIterations == 0 && ctx.Err() != nil { return ErrPostings(ctx.Err()) } - if match(vals[i]) { - i++ - continue + if match(v) { + vals = append(vals, v) } - - // Didn't match, bring the last value to this position, make the slice shorter and check again. - // The order of the slice doesn't matter as it comes from a map iteration. - vals[i], vals = vals[len(vals)-1], vals[:len(vals)-1] } // If none matched (or this label had no values), no need to grab the lock again. @@ -447,27 +486,6 @@ func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, return Merge(ctx, its...) } -// labelValues returns a slice of label values for the given label name. -// It will take the read lock. -func (p *MemPostings) labelValues(name string) []string { - p.mtx.RLock() - defer p.mtx.RUnlock() - - e := p.m[name] - if len(e) == 0 { - return nil - } - - vals := make([]string, 0, len(e)) - for v, srs := range e { - if len(srs) > 0 { - vals = append(vals, v) - } - } - - return vals -} - // ExpandPostings returns the postings expanded as a slice. func ExpandPostings(p Postings) (res []storage.SeriesRef, err error) { for p.Next() {