diff --git a/tsdb/postings_for_matchers_cache.go b/tsdb/postings_for_matchers_cache.go index bb4aba661e..43e3b77874 100644 --- a/tsdb/postings_for_matchers_cache.go +++ b/tsdb/postings_for_matchers_cache.go @@ -3,15 +3,14 @@ package tsdb import ( "container/list" "context" + "fmt" "strings" "sync" "time" "github.com/DmitriyVTitov/size" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/trace" + "github.com/pkg/errors" + "go.uber.org/atomic" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb/index" @@ -43,7 +42,7 @@ type IndexPostingsReader interface { // If `force` is true, then all requests go through cache, regardless of the `concurrent` param provided to the PostingsForMatchers method. func NewPostingsForMatchersCache(ttl time.Duration, maxItems int, maxBytes int64, force bool) *PostingsForMatchersCache { b := &PostingsForMatchersCache{ - calls: &sync.Map{}, + calls: map[string]*postingsForMatchersPromise{}, cached: list.New(), ttl: ttl, @@ -64,7 +63,7 @@ func NewPostingsForMatchersCache(ttl time.Duration, maxItems int, maxBytes int64 // PostingsForMatchersCache caches PostingsForMatchers call results when the concurrent hint is passed in or force is true. type PostingsForMatchersCache struct { - calls *sync.Map + calls map[string]*postingsForMatchersPromise cachedMtx sync.RWMutex cached *list.List @@ -87,113 +86,124 @@ type PostingsForMatchersCache struct { } func (c *PostingsForMatchersCache) PostingsForMatchers(ctx context.Context, ix IndexPostingsReader, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { - ctx, span := c.tracer.Start(ctx, "PostingsForMatchersCache.PostingsForMatchers", trace.WithAttributes( - attribute.Bool("concurrent", concurrent), - c.ttlAttrib, - c.forceAttrib, - )) - defer span.End() - if !concurrent && !c.force { - span.AddEvent("cache not used") - p, err := c.postingsForMatchers(ctx, ix, ms...) - if err != nil { - span.SetStatus(codes.Error, "getting postings for matchers without cache failed") - span.RecordError(err) - } - return p, err + return c.postingsForMatchers(ctx, ix, ms...) } - - span.AddEvent("using cache") c.expire() - p, err := c.postingsForMatchersPromise(ctx, ix, ms)(ctx) - if err != nil { - span.SetStatus(codes.Error, "getting postings for matchers with cache failed") - span.RecordError(err) + + key := matchersKey(ms) + // Protect cache state from concurrent write access + c.cachedMtx.RLock() + + promise := c.calls[key] + if promise != nil { + // Let's wait on promise to be done + promise.waiting.Inc() + c.cachedMtx.RUnlock() + return c.waitOnPromise(ctx, promise, key) + } + + c.cachedMtx.RUnlock() + + c.cachedMtx.Lock() + if c.calls[key] != nil { + // A promise has been injected into the cache in the meantime + promise = c.calls[key] + promise.waiting.Inc() + c.cachedMtx.Unlock() + + return c.waitOnPromise(ctx, promise, key) } - return p, err + + queryCtx, cancel := context.WithCancelCause(context.Background()) + promise = &postingsForMatchersPromise{ + done: make(chan int64), + waiting: atomic.NewInt32(1), + cancel: cancel, + } + c.calls[key] = promise + c.cachedMtx.Unlock() + + go func() { + // Close promise channel after fulfilment + defer close(promise.done) + + // Don't let context cancellation fail the promise, since it may be used by multiple goroutines, each with + // its own context. Also, keep the call independent of this particular context, since the promise will be reused. + postings, err := c.postingsForMatchers(queryCtx, ix, ms...) + if err != nil { + promise.err = err + if errors.Is(promise.err, context.DeadlineExceeded) || errors.Is(promise.err, context.Canceled) { + // Canceled queries will be pruned from the cache + return + } + } else { + if queryCtx.Err() != nil { + // Canceled queries will be pruned from the cache + return + } + + promise.cloner = index.NewPostingsCloner(postings) + } + + sizeBytes := int64(len(key) + size.Of(promise)) + promise.done <- sizeBytes + }() + + return c.waitOnPromise(ctx, promise, key) } -type postingsForMatcherPromise struct { - done chan struct{} +type postingsForMatchersPromise struct { + // done signals when the promise is fulfilled, sending the cache entry size for storing by a consumer + done chan int64 + waiting *atomic.Int32 + cancel context.CancelCauseFunc cloner *index.PostingsCloner err error } -func (p *postingsForMatcherPromise) result(ctx context.Context) (index.Postings, error) { - span := trace.SpanFromContext(ctx) - +// waitOnPromise waits until either the context is done or the promise is. +// If the context is done, the number of waiting is decremented, and the promise gets canceled if the +// number of waiting has reached 0. +func (c *PostingsForMatchersCache) waitOnPromise(ctx context.Context, promise *postingsForMatchersPromise, key string) (index.Postings, error) { select { case <-ctx.Done(): - span.AddEvent("interrupting wait on postingsForMatchers promise due to context error", trace.WithAttributes( - attribute.String("err", ctx.Err().Error()), - )) - return nil, ctx.Err() - case <-p.done: + // The request was canceled, reduce the waiting count + c.cachedMtx.Lock() + waiting := promise.waiting.Dec() + if waiting > 0 { + // Promise is in use by other goroutines + c.cachedMtx.Unlock() + return nil, errors.Wrap(ctx.Err(), "PostingsForMatchers context error") + } + + // There are no more waiting goroutines, cancel the promise and remove it from the cache + promise.cancel(fmt.Errorf("no remaining callers interested in query")) + delete(c.calls, key) + c.cachedMtx.Unlock() + + // Wait for query execution goroutine to finish + <-promise.done + return nil, errors.Wrap(ctx.Err(), "PostingsForMatchers context error") + case sizeBytes := <-promise.done: // Checking context error is necessary for deterministic tests, // as channel selection order is random if ctx.Err() != nil { - span.AddEvent("completed postingsForMatchers promise, but context has error", trace.WithAttributes( - attribute.String("err", ctx.Err().Error()), - )) - return nil, ctx.Err() + return nil, errors.Wrap(ctx.Err(), "PostingsForMatchers context error") } - if p.err != nil { - span.AddEvent("postingsForMatchers promise completed with error", trace.WithAttributes( - attribute.String("err", p.err.Error()), - )) - return nil, p.err - } - span.AddEvent("postingsForMatchers promise completed successfully") - return p.cloner.Clone(), nil - } -} -func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Context, ix IndexPostingsReader, ms []*labels.Matcher) func(context.Context) (index.Postings, error) { - span := trace.SpanFromContext(ctx) - - promise := &postingsForMatcherPromise{ - done: make(chan struct{}), - } + if sizeBytes > 0 { + // Got the promise cache entry's size, store it + c.created(key, c.timeNow(), sizeBytes) + } - key := matchersKey(ms) - oldPromise, loaded := c.calls.LoadOrStore(key, promise) - if loaded { - // promise was not stored, we return a previously stored promise, that's possibly being fulfilled in another goroutine - span.AddEvent("using cached postingsForMatchers promise", trace.WithAttributes( - attribute.String("cache_key", key), - )) - close(promise.done) - return oldPromise.(*postingsForMatcherPromise).result - } + if promise.err != nil { + return nil, promise.err + } - span.AddEvent("no postingsForMatchers promise in cache, executing query") - - // promise was stored, close its channel after fulfilment - defer close(promise.done) - - // Don't let context cancellation fail the promise, since it may be used by multiple goroutines, each with - // its own context. Also, keep the call independent of this particular context, since the promise will be reused. - // FIXME: do we need to cancel the call to postingsForMatchers if all the callers waiting for the result have - // cancelled their context? - if postings, err := c.postingsForMatchers(context.Background(), ix, ms...); err != nil { - span.AddEvent("postingsForMatchers failed", trace.WithAttributes( - attribute.String("cache_key", key), - attribute.String("err", err.Error()), - )) - promise.err = err - } else { - span.AddEvent("postingsForMatchers succeeded", trace.WithAttributes( - attribute.String("cache_key", key), - )) - promise.cloner = index.NewPostingsCloner(postings) + return promise.cloner.Clone(), nil } - - sizeBytes := int64(len(key) + size.Of(promise)) - - c.created(ctx, key, c.timeNow(), sizeBytes) - return promise.result } type postingsForMatchersCachedCall struct { @@ -244,25 +254,22 @@ func (c *PostingsForMatchersCache) shouldEvictHead() bool { func (c *PostingsForMatchersCache) evictHead() { front := c.cached.Front() oldest := front.Value.(*postingsForMatchersCachedCall) - c.calls.Delete(oldest.key) + delete(c.calls, oldest.key) c.cached.Remove(front) c.cachedBytes -= oldest.sizeBytes } // created has to be called when returning from the PostingsForMatchers call that creates the promise. // the ts provided should be the call time. -func (c *PostingsForMatchersCache) created(ctx context.Context, key string, ts time.Time, sizeBytes int64) { - span := trace.SpanFromContext(ctx) +func (c *PostingsForMatchersCache) created(key string, ts time.Time, sizeBytes int64) { + c.cachedMtx.Lock() + defer c.cachedMtx.Unlock() if c.ttl <= 0 { - span.AddEvent("deleting cached promise since c.ttl <= 0") - c.calls.Delete(key) + delete(c.calls, key) return } - c.cachedMtx.Lock() - defer c.cachedMtx.Unlock() - c.cached.PushBack(&postingsForMatchersCachedCall{ key: key, ts: ts,