From dcc1397eb1fcef76801419c8253301e6620715af Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Fri, 15 Nov 2024 18:19:00 +0200 Subject: [PATCH] Add tests for MaxBytes and UpdateBytesPerRecord Signed-off-by: Dimitar Dimitrov --- pkg/storage/ingest/fetcher.go | 4 +- pkg/storage/ingest/fetcher_test.go | 102 +++++++++++++++++++++++++++++ 2 files changed, 104 insertions(+), 2 deletions(-) diff --git a/pkg/storage/ingest/fetcher.go b/pkg/storage/ingest/fetcher.go index c4e72017d6..6c72c78b7a 100644 --- a/pkg/storage/ingest/fetcher.go +++ b/pkg/storage/ingest/fetcher.go @@ -95,7 +95,7 @@ func (w fetchWant) Next() fetchWant { // It's capped at math.MaxInt32 to avoid overflow, and it'll always fetch a minimum of 1MB. func (w fetchWant) MaxBytes() int32 { fetchBytes := w.expectedBytes() - if fetchBytes > math.MaxInt32 { + if fetchBytes > math.MaxInt32 || fetchBytes < 0 { // This shouldn't happen because w should have been trimmed before sending the request. // But we definitely don't want to request negative bytes by casting to int32, so add this safeguard. return math.MaxInt32 @@ -123,7 +123,7 @@ func (w fetchWant) expectedBytes() int { // We over-fetch bytes to reduce the likelihood of under-fetching and having to run another request. // Based on some testing 65% of under-estimations are by less than 5%. So we account for that. const overFetchBytesFactor = 1.05 - return int(overFetchBytesFactor * float64(w.estimatedBytesPerRecord*int(w.endOffset-w.startOffset))) + return int(overFetchBytesFactor * float64(int64(w.estimatedBytesPerRecord)*(w.endOffset-w.startOffset))) } type fetchResult struct { diff --git a/pkg/storage/ingest/fetcher_test.go b/pkg/storage/ingest/fetcher_test.go index ace16bd367..e5118c40eb 100644 --- a/pkg/storage/ingest/fetcher_test.go +++ b/pkg/storage/ingest/fetcher_test.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "math" "sync" "testing" "time" @@ -1220,3 +1221,104 @@ func (w waiterFunc) Wait() { w() } type refresherFunc func() func (r refresherFunc) ForceMetadataRefresh() { r() } + +func TestFetchWant_MaxBytes(t *testing.T) { + testCases := map[string]struct { + fw fetchWant + expected int32 + }{ + "small fetch": { + fw: fetchWant{ + startOffset: 0, + endOffset: 10, + estimatedBytesPerRecord: 100, + }, + expected: 1_000_000, // minimum fetch size + }, + "medium fetch": { + fw: fetchWant{ + startOffset: 0, + endOffset: 1000, + estimatedBytesPerRecord: 1000, + }, + expected: 1_050_000, // 1000 * 1000 * 1.05 + }, + "huge fetch with huge bytes per record; overflow risk": { + fw: fetchWant{ + startOffset: 0, + endOffset: 2 << 31, + estimatedBytesPerRecord: 2 << 30, + }, + expected: math.MaxInt32, + }, + "negative product due to overflow": { + fw: fetchWant{ + startOffset: 0, + endOffset: math.MaxInt64, + estimatedBytesPerRecord: math.MaxInt32, + }, + expected: math.MaxInt32, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + result := tc.fw.MaxBytes() + assert.Equal(t, tc.expected, result) + assert.GreaterOrEqual(t, result, int32(0), "MaxBytes should never return negative values") + }) + } +} + +func TestFetchWant_UpdateBytesPerRecord(t *testing.T) { + baseWant := fetchWant{ + startOffset: 100, + endOffset: 200, + estimatedBytesPerRecord: 1000, + } + + testCases := map[string]struct { + lastFetchBytes int + lastFetchRecords int + expectedBytesPerRecord int + }{ + "similar to estimate": { + lastFetchBytes: 10000, + lastFetchRecords: 10, + expectedBytesPerRecord: 1000, + }, + "much larger than estimate": { + lastFetchBytes: 100000, + lastFetchRecords: 10, + expectedBytesPerRecord: 2800, + }, + "much smaller than estimate": { + lastFetchBytes: 1000, + lastFetchRecords: 10, + expectedBytesPerRecord: 820, + }, + "risk of overflow": { + lastFetchBytes: math.MaxInt64, + lastFetchRecords: 1, + expectedBytesPerRecord: math.MaxInt64/5 + int(float64(baseWant.estimatedBytesPerRecord)*0.8), + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + result := baseWant.UpdateBytesPerRecord(tc.lastFetchBytes, tc.lastFetchRecords) + //1844674407370955961 + // endOffset should never change + assert.Equal(t, baseWant.startOffset, result.startOffset, "startOffset should not change") + assert.Equal(t, baseWant.endOffset, result.endOffset, "endOffset should not change") + + // Check the new bytes per record estimation. Because of large numbers and floats we allow for 0.1% error. + assert.InEpsilon(t, tc.expectedBytesPerRecord, result.estimatedBytesPerRecord, 0.001) + + // Verify MaxBytes() doesn't overflow or return negative values + maxBytes := result.MaxBytes() + assert.GreaterOrEqual(t, maxBytes, int32(0), "MaxBytes should never return negative values") + assert.LessOrEqual(t, maxBytes, int32(math.MaxInt32), "MaxBytes should never exceed MaxInt32") + }) + } +}