diff --git a/storage/merge.go b/storage/merge.go index 452c891437..0db63c992c 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -497,7 +497,12 @@ func (c *chainSampleIterator) Seek(t int64) chunkenc.ValueType { c.consecutive = false c.h = samplesIteratorHeap{} for _, iter := range c.iterators { - if iter.Seek(t) != chunkenc.ValNone { + if iter.Seek(t) == chunkenc.ValNone { + if iter.Err() != nil { + // If any iterator is reporting an error, abort. + return chunkenc.ValNone + } + } else { heap.Push(&c.h, iter) } } @@ -571,7 +576,13 @@ func (c *chainSampleIterator) Next() chunkenc.ValueType { // So, we don't call Next() on it here. c.curr = c.iterators[0] for _, iter := range c.iterators[1:] { - if iter.Next() != chunkenc.ValNone { + if iter.Next() == chunkenc.ValNone { + if iter.Err() != nil { + // If any iterator is reporting an error, abort. + // If c.iterators[0] is reporting an error, we'll handle that below. + return chunkenc.ValNone + } + } else { heap.Push(&c.h, iter) } } @@ -583,7 +594,17 @@ func (c *chainSampleIterator) Next() chunkenc.ValueType { for { currValueType = c.curr.Next() - if currValueType != chunkenc.ValNone { + + if currValueType == chunkenc.ValNone { + if c.curr.Err() != nil { + // Abort if we've hit an error. + return chunkenc.ValNone + } else if len(c.h) == 0 { + // No iterator left to iterate. + c.curr = nil + return chunkenc.ValNone + } + } else { currT = c.curr.AtT() if currT == c.lastT { // Ignoring sample for the same timestamp. @@ -603,10 +624,6 @@ func (c *chainSampleIterator) Next() chunkenc.ValueType { } // Current iterator does not hold the smallest timestamp. heap.Push(&c.h, c.curr) - } else if len(c.h) == 0 { - // No iterator left to iterate. - c.curr = nil - return chunkenc.ValNone } c.curr = heap.Pop(&c.h).(chunkenc.Iterator) diff --git a/storage/merge_test.go b/storage/merge_test.go index 4252852376..0564913a8e 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -1144,6 +1144,35 @@ func TestChainSampleIteratorSeek(t *testing.T) { } } +func TestChainSampleIteratorSeekFailingIterator(t *testing.T) { + merged := ChainSampleIteratorFromIterators(nil, []chunkenc.Iterator{ + NewListSeriesIterator(samples{fSample{0, 0.1}, fSample{1, 1.1}, fSample{2, 2.1}}), + errIterator{errors.New("something went wrong")}, + }) + + require.Equal(t, chunkenc.ValNone, merged.Seek(0)) + require.EqualError(t, merged.Err(), "something went wrong") +} + +func TestChainSampleIteratorNextImmediatelyFailingIterator(t *testing.T) { + merged := ChainSampleIteratorFromIterators(nil, []chunkenc.Iterator{ + NewListSeriesIterator(samples{fSample{0, 0.1}, fSample{1, 1.1}, fSample{2, 2.1}}), + errIterator{errors.New("something went wrong")}, + }) + + require.Equal(t, chunkenc.ValNone, merged.Next()) + require.EqualError(t, merged.Err(), "something went wrong") + + // Next() does some special handling for the first iterator, so make sure it handles the first iterator returning an error too. + merged = ChainSampleIteratorFromIterators(nil, []chunkenc.Iterator{ + errIterator{errors.New("something went wrong")}, + NewListSeriesIterator(samples{fSample{0, 0.1}, fSample{1, 1.1}, fSample{2, 2.1}}), + }) + + require.Equal(t, chunkenc.ValNone, merged.Next()) + require.EqualError(t, merged.Err(), "something went wrong") +} + func TestChainSampleIteratorSeekHistogramCounterResetHint(t *testing.T) { for sampleType, sampleFunc := range map[string]func(int64, histogram.CounterResetHint) chunks.Sample{ "histogram": func(ts int64, hint histogram.CounterResetHint) chunks.Sample { return histogramSample(ts, hint) }, @@ -1539,3 +1568,35 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { }) } } + +type errIterator struct { + err error +} + +func (e errIterator) Next() chunkenc.ValueType { + return chunkenc.ValNone +} + +func (e errIterator) Seek(t int64) chunkenc.ValueType { + return chunkenc.ValNone +} + +func (e errIterator) At() (int64, float64) { + return 0, 0 +} + +func (e errIterator) AtHistogram() (int64, *histogram.Histogram) { + return 0, nil +} + +func (e errIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { + return 0, nil +} + +func (e errIterator) AtT() int64 { + return 0 +} + +func (e errIterator) Err() error { + return e.err +}