Skip to content

Commit

Permalink
Merge pull request #546 from grafana/charleskorn/postingsformatchersc…
Browse files Browse the repository at this point in the history
…ache-context-cancellation

Fix issue where `PostingsForMatchersCache` can cache a context cancellation error
  • Loading branch information
aknuds1 authored Oct 20, 2023
2 parents 97933fb + efcd876 commit d0d6240
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 13 deletions.
42 changes: 29 additions & 13 deletions tsdb/postings_for_matchers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,37 +78,53 @@ func (c *PostingsForMatchersCache) PostingsForMatchers(ctx context.Context, ix I
return c.postingsForMatchers(ctx, ix, ms...)
}
c.expire()
return c.postingsForMatchersPromise(ctx, ix, ms)()
return c.postingsForMatchersPromise(ix, ms)(ctx)
}

type postingsForMatcherPromise struct {
sync.WaitGroup
done chan struct{}

cloner *index.PostingsCloner
err error
}

func (p *postingsForMatcherPromise) result() (index.Postings, error) {
p.Wait()
if p.err != nil {
return nil, p.err
func (p *postingsForMatcherPromise) result(ctx context.Context) (index.Postings, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-p.done:
// Checking context error is necessary for deterministic tests,
// as channel selection order is random
if ctx.Err() != nil {
return nil, ctx.Err()
}
if p.err != nil {
return nil, p.err
}
return p.cloner.Clone(), nil
}
return p.cloner.Clone(), nil
}

func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Context, ix IndexPostingsReader, ms []*labels.Matcher) func() (index.Postings, error) {
func (c *PostingsForMatchersCache) postingsForMatchersPromise(ix IndexPostingsReader, ms []*labels.Matcher) func(context.Context) (index.Postings, error) {
promise := new(postingsForMatcherPromise)
promise.Add(1)
promise.done = make(chan struct{})

key := matchersKey(ms)
oldPromise, loaded := c.calls.LoadOrStore(key, promise)
if loaded {
promise = oldPromise.(*postingsForMatcherPromise)
return promise.result
// promise was not stored, we return a previously stored promise, that's possibly being fulfilled in another goroutine
close(promise.done)
return oldPromise.(*postingsForMatcherPromise).result
}
defer promise.Done()

if postings, err := c.postingsForMatchers(ctx, ix, ms...); err != nil {
// promise was stored, close its channel after fulfilment
defer close(promise.done)

// Don't let context cancellation fail the promise, since it may be used by multiple goroutines, each with
// its own context. Also, keep the call independent of this particular context, since the promise will be reused.
// FIXME: do we need to cancel the call to postingsForMatchers if all the callers waiting for the result have
// cancelled their context?
if postings, err := c.postingsForMatchers(context.Background(), ix, ms...); err != nil {
promise.err = err
} else {
promise.cloner = index.NewPostingsCloner(postings)
Expand Down
23 changes: 23 additions & 0 deletions tsdb/postings_for_matchers_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,29 @@ func TestPostingsForMatchersCache(t *testing.T) {
require.Equal(t, 1, callsPerMatchers[matchersKey(matchersLists[2])])
require.Equal(t, 1, callsPerMatchers[matchersKey(matchersLists[3])])
})

t.Run("initial request context is cancelled, second request is not cancelled", func(t *testing.T) {
matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}
expectedPostings := index.NewListPostings(nil)

c := newPostingsForMatchersCache(time.Hour, 5, 1000, func(ctx context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}

return expectedPostings, nil
}, &timeNowMock{}, false)

ctx1, cancel := context.WithCancel(context.Background())
cancel()
_, err := c.PostingsForMatchers(ctx1, indexForPostingsMock{}, true, matchers...)
require.ErrorIs(t, err, context.Canceled)

ctx2 := context.Background()
actualPostings, err := c.PostingsForMatchers(ctx2, indexForPostingsMock{}, true, matchers...)
require.NoError(t, err)
require.Equal(t, expectedPostings, actualPostings)
})
}

func BenchmarkPostingsForMatchersCache(b *testing.B) {
Expand Down

0 comments on commit d0d6240

Please sign in to comment.