Skip to content

Commit

Permalink
Add tests for MaxBytes and UpdateBytesPerRecord
Browse files Browse the repository at this point in the history
Signed-off-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
dimitarvdimitrov committed Nov 15, 2024
1 parent 175bef4 commit dcc1397
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 2 deletions.
4 changes: 2 additions & 2 deletions pkg/storage/ingest/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (w fetchWant) Next() fetchWant {
// It's capped at math.MaxInt32 to avoid overflow, and it'll always fetch a minimum of 1MB.
func (w fetchWant) MaxBytes() int32 {
fetchBytes := w.expectedBytes()
if fetchBytes > math.MaxInt32 {
if fetchBytes > math.MaxInt32 || fetchBytes < 0 {
// This shouldn't happen because w should have been trimmed before sending the request.
// But we definitely don't want to request negative bytes by casting to int32, so add this safeguard.
return math.MaxInt32
Expand Down Expand Up @@ -123,7 +123,7 @@ func (w fetchWant) expectedBytes() int {
// We over-fetch bytes to reduce the likelihood of under-fetching and having to run another request.
// Based on some testing 65% of under-estimations are by less than 5%. So we account for that.
const overFetchBytesFactor = 1.05
return int(overFetchBytesFactor * float64(w.estimatedBytesPerRecord*int(w.endOffset-w.startOffset)))
return int(overFetchBytesFactor * float64(int64(w.estimatedBytesPerRecord)*(w.endOffset-w.startOffset)))
}

type fetchResult struct {
Expand Down
102 changes: 102 additions & 0 deletions pkg/storage/ingest/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"math"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -1220,3 +1221,104 @@ func (w waiterFunc) Wait() { w() }
type refresherFunc func()

func (r refresherFunc) ForceMetadataRefresh() { r() }

func TestFetchWant_MaxBytes(t *testing.T) {
testCases := map[string]struct {
fw fetchWant
expected int32
}{
"small fetch": {
fw: fetchWant{
startOffset: 0,
endOffset: 10,
estimatedBytesPerRecord: 100,
},
expected: 1_000_000, // minimum fetch size
},
"medium fetch": {
fw: fetchWant{
startOffset: 0,
endOffset: 1000,
estimatedBytesPerRecord: 1000,
},
expected: 1_050_000, // 1000 * 1000 * 1.05
},
"huge fetch with huge bytes per record; overflow risk": {
fw: fetchWant{
startOffset: 0,
endOffset: 2 << 31,
estimatedBytesPerRecord: 2 << 30,
},
expected: math.MaxInt32,
},
"negative product due to overflow": {
fw: fetchWant{
startOffset: 0,
endOffset: math.MaxInt64,
estimatedBytesPerRecord: math.MaxInt32,
},
expected: math.MaxInt32,
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
result := tc.fw.MaxBytes()
assert.Equal(t, tc.expected, result)
assert.GreaterOrEqual(t, result, int32(0), "MaxBytes should never return negative values")
})
}
}

func TestFetchWant_UpdateBytesPerRecord(t *testing.T) {
baseWant := fetchWant{
startOffset: 100,
endOffset: 200,
estimatedBytesPerRecord: 1000,
}

testCases := map[string]struct {
lastFetchBytes int
lastFetchRecords int
expectedBytesPerRecord int
}{
"similar to estimate": {
lastFetchBytes: 10000,
lastFetchRecords: 10,
expectedBytesPerRecord: 1000,
},
"much larger than estimate": {
lastFetchBytes: 100000,
lastFetchRecords: 10,
expectedBytesPerRecord: 2800,
},
"much smaller than estimate": {
lastFetchBytes: 1000,
lastFetchRecords: 10,
expectedBytesPerRecord: 820,
},
"risk of overflow": {
lastFetchBytes: math.MaxInt64,
lastFetchRecords: 1,
expectedBytesPerRecord: math.MaxInt64/5 + int(float64(baseWant.estimatedBytesPerRecord)*0.8),
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
result := baseWant.UpdateBytesPerRecord(tc.lastFetchBytes, tc.lastFetchRecords)
//1844674407370955961
// endOffset should never change
assert.Equal(t, baseWant.startOffset, result.startOffset, "startOffset should not change")
assert.Equal(t, baseWant.endOffset, result.endOffset, "endOffset should not change")

// Check the new bytes per record estimation. Because of large numbers and floats we allow for 0.1% error.
assert.InEpsilon(t, tc.expectedBytesPerRecord, result.estimatedBytesPerRecord, 0.001)

// Verify MaxBytes() doesn't overflow or return negative values
maxBytes := result.MaxBytes()
assert.GreaterOrEqual(t, maxBytes, int32(0), "MaxBytes should never return negative values")
assert.LessOrEqual(t, maxBytes, int32(math.MaxInt32), "MaxBytes should never exceed MaxInt32")
})
}
}

0 comments on commit dcc1397

Please sign in to comment.