From c3b4adae7d8ac6518ab7dcc39f8de0f928376144 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Sun, 17 Nov 2024 19:14:23 +0100 Subject: [PATCH] kafka replay speed: don't trim fetchWants (#9919) * kafka replay speed: don't trim fetchWants I realized that trimming `fetchWant`s can end up discarding offsets in extreme circumstances. ### How it works If the fetchWant is so big that its size would exceed 2GiB, then we trim it. We trim it by reducing the end offset. The idea is that the next fetchWant will pick up from where this one left off. ### How it can break We trim the `fetchWant` in `UpdateBytesPerRecord` too. `UpdateBytesPerRecord` can be invoked in `concurrentFEtchers.run` after the `fetchWant` is dispatched. In that case the next `fetchWant` would have already been calculated. And we would end up with a gap. ### Did it break? It's hard to tell, but it's very unlikely. To reach 2GiB we would have needed to have the estimation for bytes per record be 2 MiB. While these large records are possible, they should be rare and our rolling average estimation for records size shouldn't reach it. Signed-off-by: Dimitar Dimitrov * Add tests for MaxBytes and UpdateBytesPerRecord Signed-off-by: Dimitar Dimitrov * Change expectedBytes to be int64 to be able to run on 64-bit systems Signed-off-by: Dimitar Dimitrov --------- Signed-off-by: Dimitar Dimitrov --- pkg/storage/ingest/fetcher.go | 22 ++----- pkg/storage/ingest/fetcher_test.go | 101 +++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+), 17 deletions(-) diff --git a/pkg/storage/ingest/fetcher.go b/pkg/storage/ingest/fetcher.go index 303da0adb1..710c7f5a6d 100644 --- a/pkg/storage/ingest/fetcher.go +++ b/pkg/storage/ingest/fetcher.go @@ -88,14 +88,14 @@ func fetchWantFrom(offset int64, targetMaxBytes, estimatedBytesPerRecord int) fe func (w fetchWant) Next() fetchWant { n := fetchWantFrom(w.endOffset, w.targetMaxBytes, w.estimatedBytesPerRecord) n.estimatedBytesPerRecord = w.estimatedBytesPerRecord - return n.trimIfTooBig() + return n } // MaxBytes returns the maximum number of bytes we can fetch in a single request. // It's capped at math.MaxInt32 to avoid overflow, and it'll always fetch a minimum of 1MB. func (w fetchWant) MaxBytes() int32 { fetchBytes := w.expectedBytes() - if fetchBytes > math.MaxInt32 { + if fetchBytes > math.MaxInt32 || fetchBytes < 0 { // This shouldn't happen because w should have been trimmed before sending the request. // But we definitely don't want to request negative bytes by casting to int32, so add this safeguard. return math.MaxInt32 @@ -114,28 +114,16 @@ func (w fetchWant) UpdateBytesPerRecord(lastFetchBytes int, lastFetchNumberOfRec actualBytesPerRecord := float64(lastFetchBytes) / float64(lastFetchNumberOfRecords) w.estimatedBytesPerRecord = int(currentEstimateWeight*float64(w.estimatedBytesPerRecord) + (1-currentEstimateWeight)*actualBytesPerRecord) - return w.trimIfTooBig() + return w } // expectedBytes returns how many bytes we'd need to accommodate the range of offsets using estimatedBytesPerRecord. // They may be more than the kafka protocol supports (> MaxInt32). Use MaxBytes. -func (w fetchWant) expectedBytes() int { +func (w fetchWant) expectedBytes() int64 { // We over-fetch bytes to reduce the likelihood of under-fetching and having to run another request. // Based on some testing 65% of under-estimations are by less than 5%. So we account for that. const overFetchBytesFactor = 1.05 - return int(overFetchBytesFactor * float64(w.estimatedBytesPerRecord*int(w.endOffset-w.startOffset))) -} - -// trimIfTooBig adjusts the end offset if we expect to fetch too many bytes. -// It's capped at math.MaxInt32 bytes. -func (w fetchWant) trimIfTooBig() fetchWant { - if w.expectedBytes() <= math.MaxInt32 { - return w - } - // We are overflowing, so we need to trim the end offset. - // We do this by calculating how many records we can fetch with the max bytes, and then setting the end offset to that. - w.endOffset = w.startOffset + int64(math.MaxInt32/w.estimatedBytesPerRecord) - return w + return int64(overFetchBytesFactor * float64(int64(w.estimatedBytesPerRecord)*(w.endOffset-w.startOffset))) } type fetchResult struct { diff --git a/pkg/storage/ingest/fetcher_test.go b/pkg/storage/ingest/fetcher_test.go index ace16bd367..08412f2094 100644 --- a/pkg/storage/ingest/fetcher_test.go +++ b/pkg/storage/ingest/fetcher_test.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "math" "sync" "testing" "time" @@ -1220,3 +1221,103 @@ func (w waiterFunc) Wait() { w() } type refresherFunc func() func (r refresherFunc) ForceMetadataRefresh() { r() } + +func TestFetchWant_MaxBytes(t *testing.T) { + testCases := map[string]struct { + fw fetchWant + expected int32 + }{ + "small fetch": { + fw: fetchWant{ + startOffset: 0, + endOffset: 10, + estimatedBytesPerRecord: 100, + }, + expected: 1_000_000, // minimum fetch size + }, + "medium fetch": { + fw: fetchWant{ + startOffset: 0, + endOffset: 1000, + estimatedBytesPerRecord: 1000, + }, + expected: 1_050_000, // 1000 * 1000 * 1.05 + }, + "huge fetch with huge bytes per record; overflow risk": { + fw: fetchWant{ + startOffset: 0, + endOffset: 2 << 31, + estimatedBytesPerRecord: 2 << 30, + }, + expected: math.MaxInt32, + }, + "negative product due to overflow": { + fw: fetchWant{ + startOffset: 0, + endOffset: math.MaxInt64, + estimatedBytesPerRecord: math.MaxInt32, + }, + expected: math.MaxInt32, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + result := tc.fw.MaxBytes() + assert.Equal(t, tc.expected, result) + assert.GreaterOrEqual(t, result, int32(0), "MaxBytes should never return negative values") + }) + } +} + +func TestFetchWant_UpdateBytesPerRecord(t *testing.T) { + baseWant := fetchWant{ + startOffset: 100, + endOffset: 200, + estimatedBytesPerRecord: 1000, + } + + testCases := map[string]struct { + lastFetchBytes int + lastFetchRecords int + expectedBytesPerRecord int + }{ + "similar to estimate": { + lastFetchBytes: 10000, + lastFetchRecords: 10, + expectedBytesPerRecord: 1000, + }, + "much larger than estimate": { + lastFetchBytes: 100000, + lastFetchRecords: 10, + expectedBytesPerRecord: 2800, + }, + "much smaller than estimate": { + lastFetchBytes: 1000, + lastFetchRecords: 10, + expectedBytesPerRecord: 820, + }, + "risk of overflow": { + lastFetchBytes: math.MaxInt64, + lastFetchRecords: 1, + expectedBytesPerRecord: math.MaxInt64/5 + int(float64(baseWant.estimatedBytesPerRecord)*0.8), + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + result := baseWant.UpdateBytesPerRecord(tc.lastFetchBytes, tc.lastFetchRecords) + + assert.Equal(t, baseWant.startOffset, result.startOffset, "startOffset should not change") + assert.Equal(t, baseWant.endOffset, result.endOffset, "endOffset should not change") + + // Check the new bytes per record estimation. Because of large numbers and floats we allow for 0.1% error. + assert.InEpsilon(t, tc.expectedBytesPerRecord, result.estimatedBytesPerRecord, 0.001) + + // Verify MaxBytes() doesn't overflow or return negative values + maxBytes := result.MaxBytes() + assert.GreaterOrEqual(t, maxBytes, int32(0), "MaxBytes should never return negative values") + assert.LessOrEqual(t, maxBytes, int32(math.MaxInt32), "MaxBytes should never exceed MaxInt32") + }) + } +}