Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make PostingsForMatchersCache.PostingsForMatchers respect context cancellation #551

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 107 additions & 100 deletions tsdb/postings_for_matchers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
import (
"container/list"
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/DmitriyVTitov/size"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"github.com/pkg/errors"
"go.uber.org/atomic"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/index"
Expand Down Expand Up @@ -43,7 +42,7 @@
// If `force` is true, then all requests go through cache, regardless of the `concurrent` param provided to the PostingsForMatchers method.
func NewPostingsForMatchersCache(ttl time.Duration, maxItems int, maxBytes int64, force bool) *PostingsForMatchersCache {
b := &PostingsForMatchersCache{
calls: &sync.Map{},
calls: map[string]*postingsForMatchersPromise{},
cached: list.New(),

ttl: ttl,
Expand All @@ -54,9 +53,9 @@
timeNow: time.Now,
postingsForMatchers: PostingsForMatchers,

tracer: otel.Tracer(""),

Check failure on line 56 in tsdb/postings_for_matchers_cache.go

View workflow job for this annotation

GitHub Actions / lint

undefined: otel

Check failure on line 56 in tsdb/postings_for_matchers_cache.go

View workflow job for this annotation

GitHub Actions / lint

undefined: otel
ttlAttrib: attribute.Stringer("ttl", ttl),

Check failure on line 57 in tsdb/postings_for_matchers_cache.go

View workflow job for this annotation

GitHub Actions / lint

undefined: attribute

Check failure on line 57 in tsdb/postings_for_matchers_cache.go

View workflow job for this annotation

GitHub Actions / lint

undefined: attribute
forceAttrib: attribute.Bool("force", force),

Check failure on line 58 in tsdb/postings_for_matchers_cache.go

View workflow job for this annotation

GitHub Actions / lint

undefined: attribute

Check failure on line 58 in tsdb/postings_for_matchers_cache.go

View workflow job for this annotation

GitHub Actions / lint

undefined: attribute
}

return b
Expand All @@ -64,7 +63,7 @@

// PostingsForMatchersCache caches PostingsForMatchers call results when the concurrent hint is passed in or force is true.
type PostingsForMatchersCache struct {
calls *sync.Map
calls map[string]*postingsForMatchersPromise

cachedMtx sync.RWMutex
cached *list.List
Expand All @@ -80,120 +79,131 @@
// postingsForMatchers can be replaced for testing purposes
postingsForMatchers func(ctx context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error)

tracer trace.Tracer

Check failure on line 82 in tsdb/postings_for_matchers_cache.go

View workflow job for this annotation

GitHub Actions / lint

undefined: trace

Check failure on line 82 in tsdb/postings_for_matchers_cache.go

View workflow job for this annotation

GitHub Actions / lint

undefined: trace
// Preallocated for performance
ttlAttrib attribute.KeyValue

Check failure on line 84 in tsdb/postings_for_matchers_cache.go

View workflow job for this annotation

GitHub Actions / lint

undefined: attribute

Check failure on line 84 in tsdb/postings_for_matchers_cache.go

View workflow job for this annotation

GitHub Actions / lint

undefined: attribute
forceAttrib attribute.KeyValue

Check failure on line 85 in tsdb/postings_for_matchers_cache.go

View workflow job for this annotation

GitHub Actions / lint

undefined: attribute

Check failure on line 85 in tsdb/postings_for_matchers_cache.go

View workflow job for this annotation

GitHub Actions / lint

undefined: attribute
}

func (c *PostingsForMatchersCache) PostingsForMatchers(ctx context.Context, ix IndexPostingsReader, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) {
ctx, span := c.tracer.Start(ctx, "PostingsForMatchersCache.PostingsForMatchers", trace.WithAttributes(
attribute.Bool("concurrent", concurrent),
c.ttlAttrib,
c.forceAttrib,
))
defer span.End()

if !concurrent && !c.force {
span.AddEvent("cache not used")
p, err := c.postingsForMatchers(ctx, ix, ms...)
if err != nil {
span.SetStatus(codes.Error, "getting postings for matchers without cache failed")
span.RecordError(err)
}
return p, err
return c.postingsForMatchers(ctx, ix, ms...)
}

span.AddEvent("using cache")
c.expire()
p, err := c.postingsForMatchersPromise(ctx, ix, ms)(ctx)
if err != nil {
span.SetStatus(codes.Error, "getting postings for matchers with cache failed")
span.RecordError(err)

key := matchersKey(ms)
// Protect cache state from concurrent write access
c.cachedMtx.RLock()

promise := c.calls[key]
if promise != nil {
// Let's wait on promise to be done
promise.waiting.Inc()
c.cachedMtx.RUnlock()
return c.waitOnPromise(ctx, promise, key)
}

c.cachedMtx.RUnlock()

c.cachedMtx.Lock()
if c.calls[key] != nil {
// A promise has been injected into the cache in the meantime
promise = c.calls[key]
aknuds1 marked this conversation as resolved.
Show resolved Hide resolved
promise.waiting.Inc()
c.cachedMtx.Unlock()

return c.waitOnPromise(ctx, promise, key)
}
return p, err

queryCtx, cancel := context.WithCancelCause(context.Background())
promise = &postingsForMatchersPromise{
done: make(chan int64),
waiting: atomic.NewInt32(1),
cancel: cancel,
}
c.calls[key] = promise
c.cachedMtx.Unlock()

go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move this to its own method on postingsForMatchersPromise perhaps? This method is starting to get quite big.

// Close promise channel after fulfilment
defer close(promise.done)

// Don't let context cancellation fail the promise, since it may be used by multiple goroutines, each with
// its own context. Also, keep the call independent of this particular context, since the promise will be reused.
postings, err := c.postingsForMatchers(queryCtx, ix, ms...)
if err != nil {
promise.err = err
if errors.Is(promise.err, context.DeadlineExceeded) || errors.Is(promise.err, context.Canceled) {
// Canceled queries will be pruned from the cache
return
}
} else {
if queryCtx.Err() != nil {
// Canceled queries will be pruned from the cache
return
}

promise.cloner = index.NewPostingsCloner(postings)
}

sizeBytes := int64(len(key) + size.Of(promise))
promise.done <- sizeBytes
}()

return c.waitOnPromise(ctx, promise, key)
}

type postingsForMatcherPromise struct {
done chan struct{}
type postingsForMatchersPromise struct {
// done signals when the promise is fulfilled, sending the cache entry size for storing by a consumer
done chan int64
waiting *atomic.Int32
cancel context.CancelCauseFunc

cloner *index.PostingsCloner
err error
}

func (p *postingsForMatcherPromise) result(ctx context.Context) (index.Postings, error) {
span := trace.SpanFromContext(ctx)

// waitOnPromise waits until either the context is done or the promise is.
// If the context is done, the number of waiting is decremented, and the promise gets canceled if the
// number of waiting has reached 0.
func (c *PostingsForMatchersCache) waitOnPromise(ctx context.Context, promise *postingsForMatchersPromise, key string) (index.Postings, error) {
select {
case <-ctx.Done():
span.AddEvent("interrupting wait on postingsForMatchers promise due to context error", trace.WithAttributes(
attribute.String("err", ctx.Err().Error()),
))
return nil, ctx.Err()
case <-p.done:
// The request was canceled, reduce the waiting count
c.cachedMtx.Lock()
waiting := promise.waiting.Dec()
if waiting > 0 {
// Promise is in use by other goroutines
c.cachedMtx.Unlock()
return nil, errors.Wrap(ctx.Err(), "PostingsForMatchers context error")
}

// There are no more waiting goroutines, cancel the promise and remove it from the cache
promise.cancel(fmt.Errorf("no remaining callers interested in query"))
delete(c.calls, key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is quite right - calling delete here will remove the promise from the cache if a subsequent call's context is cancelled, even if the promise succeeded. Imagine this sequence of events:

  1. Initial call for a set of matchers arrives, promise succeeds and value is cached and returned.
  2. Subsequent call arrives with a context that is already cancelled. Cached promise is retrieved from cache, waiting is incremented to 1 and then waitOnPromise is called.
  3. waitOnPromise observes cancelled context, decrements waiting to 0 and removes cached promise from cache.

Instead, we only want to cancel and remove the call from c.calls if there's nothing waiting and the call is still in progress.

c.cachedMtx.Unlock()

// Wait for query execution goroutine to finish
<-promise.done
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the reason for waiting here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's not necessary any longer, have to review it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it shouldn't be necessary any longer (it was in a previous revision).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, after removing this line I get a data race in tests. Not sure why yet.

Copy link
Contributor Author

@aknuds1 aknuds1 Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One reason to read from promise.done here is that otherwise the background goroutine will block on sending to the channel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Then we're holding the whole request until the PostingsForMatchers finishes. Can we have a select with default or with case <-ctx.Done() when sending to promise.done? Then we can immediately return the cancelled request and let the background goroutine clean up when it's ready

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can solve the blocking on promise.done by moving the call to c.created to the goroutine, and promise.done is used only to indicate completion (see comment below) - then promise.done is only ever closed, and never sent to.

return nil, errors.Wrap(ctx.Err(), "PostingsForMatchers context error")
case sizeBytes := <-promise.done:
// Checking context error is necessary for deterministic tests,
// as channel selection order is random
if ctx.Err() != nil {
span.AddEvent("completed postingsForMatchers promise, but context has error", trace.WithAttributes(
attribute.String("err", ctx.Err().Error()),
))
return nil, ctx.Err()
return nil, errors.Wrap(ctx.Err(), "PostingsForMatchers context error")
}
if p.err != nil {
span.AddEvent("postingsForMatchers promise completed with error", trace.WithAttributes(
attribute.String("err", p.err.Error()),
))
return nil, p.err
}
span.AddEvent("postingsForMatchers promise completed successfully")
return p.cloner.Clone(), nil
}
}

func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Context, ix IndexPostingsReader, ms []*labels.Matcher) func(context.Context) (index.Postings, error) {
span := trace.SpanFromContext(ctx)

promise := &postingsForMatcherPromise{
done: make(chan struct{}),
}
if sizeBytes > 0 {
// Got the promise cache entry's size, store it
c.created(key, c.timeNow(), sizeBytes)
}
Comment on lines +196 to +199
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't the call to c.created be made in the goroutine spawned by PostingsForMatchers? Then done can simply be closed to signal that the promise has completed


key := matchersKey(ms)
oldPromise, loaded := c.calls.LoadOrStore(key, promise)
if loaded {
// promise was not stored, we return a previously stored promise, that's possibly being fulfilled in another goroutine
span.AddEvent("using cached postingsForMatchers promise", trace.WithAttributes(
attribute.String("cache_key", key),
))
close(promise.done)
return oldPromise.(*postingsForMatcherPromise).result
}
if promise.err != nil {
return nil, promise.err
}

span.AddEvent("no postingsForMatchers promise in cache, executing query")

// promise was stored, close its channel after fulfilment
defer close(promise.done)

// Don't let context cancellation fail the promise, since it may be used by multiple goroutines, each with
// its own context. Also, keep the call independent of this particular context, since the promise will be reused.
// FIXME: do we need to cancel the call to postingsForMatchers if all the callers waiting for the result have
// cancelled their context?
if postings, err := c.postingsForMatchers(context.Background(), ix, ms...); err != nil {
span.AddEvent("postingsForMatchers failed", trace.WithAttributes(
attribute.String("cache_key", key),
attribute.String("err", err.Error()),
))
promise.err = err
} else {
span.AddEvent("postingsForMatchers succeeded", trace.WithAttributes(
attribute.String("cache_key", key),
))
promise.cloner = index.NewPostingsCloner(postings)
return promise.cloner.Clone(), nil
}

sizeBytes := int64(len(key) + size.Of(promise))

c.created(ctx, key, c.timeNow(), sizeBytes)
return promise.result
}

type postingsForMatchersCachedCall struct {
Expand Down Expand Up @@ -244,33 +254,30 @@
func (c *PostingsForMatchersCache) evictHead() {
front := c.cached.Front()
oldest := front.Value.(*postingsForMatchersCachedCall)
c.calls.Delete(oldest.key)
delete(c.calls, oldest.key)
c.cached.Remove(front)
c.cachedBytes -= oldest.sizeBytes
}

// created has to be called when returning from the PostingsForMatchers call that creates the promise.
// the ts provided should be the call time.
func (c *PostingsForMatchersCache) created(ctx context.Context, key string, ts time.Time, sizeBytes int64) {
span := trace.SpanFromContext(ctx)
func (c *PostingsForMatchersCache) created(key string, ts time.Time, sizeBytes int64) {
c.cachedMtx.Lock()
defer c.cachedMtx.Unlock()

if c.ttl <= 0 {
span.AddEvent("deleting cached promise since c.ttl <= 0")
c.calls.Delete(key)
delete(c.calls, key)
return
}

c.cachedMtx.Lock()
defer c.cachedMtx.Unlock()

c.cached.PushBack(&postingsForMatchersCachedCall{
key: key,
ts: ts,
sizeBytes: sizeBytes,
})
c.cachedBytes += sizeBytes
span.AddEvent("added cached value to expiry queue", trace.WithAttributes(

Check failure on line 279 in tsdb/postings_for_matchers_cache.go

View workflow job for this annotation

GitHub Actions / lint

undefined: span

Check failure on line 279 in tsdb/postings_for_matchers_cache.go

View workflow job for this annotation

GitHub Actions / lint

undefined: trace

Check failure on line 279 in tsdb/postings_for_matchers_cache.go

View workflow job for this annotation

GitHub Actions / lint

undefined: span

Check failure on line 279 in tsdb/postings_for_matchers_cache.go

View workflow job for this annotation

GitHub Actions / lint

undefined: trace
attribute.Stringer("timestamp", ts),

Check failure on line 280 in tsdb/postings_for_matchers_cache.go

View workflow job for this annotation

GitHub Actions / lint

undefined: attribute

Check failure on line 280 in tsdb/postings_for_matchers_cache.go

View workflow job for this annotation

GitHub Actions / lint

undefined: attribute
attribute.Int64("size in bytes", sizeBytes),
attribute.Int64("cached bytes", c.cachedBytes),
))
Expand Down
Loading