Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka replay speed: don't trim fetchWants #9919

Merged
merged 3 commits into from
Nov 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 5 additions & 17 deletions pkg/storage/ingest/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,14 @@ func fetchWantFrom(offset int64, targetMaxBytes, estimatedBytesPerRecord int) fe
func (w fetchWant) Next() fetchWant {
n := fetchWantFrom(w.endOffset, w.targetMaxBytes, w.estimatedBytesPerRecord)
n.estimatedBytesPerRecord = w.estimatedBytesPerRecord
return n.trimIfTooBig()
return n
}

// MaxBytes returns the maximum number of bytes we can fetch in a single request.
// It's capped at math.MaxInt32 to avoid overflow, and it'll always fetch a minimum of 1MB.
func (w fetchWant) MaxBytes() int32 {
fetchBytes := w.expectedBytes()
if fetchBytes > math.MaxInt32 {
if fetchBytes > math.MaxInt32 || fetchBytes < 0 {
// This shouldn't happen because w should have been trimmed before sending the request.
// But we definitely don't want to request negative bytes by casting to int32, so add this safeguard.
return math.MaxInt32
Expand All @@ -114,28 +114,16 @@ func (w fetchWant) UpdateBytesPerRecord(lastFetchBytes int, lastFetchNumberOfRec
actualBytesPerRecord := float64(lastFetchBytes) / float64(lastFetchNumberOfRecords)
w.estimatedBytesPerRecord = int(currentEstimateWeight*float64(w.estimatedBytesPerRecord) + (1-currentEstimateWeight)*actualBytesPerRecord)

return w.trimIfTooBig()
return w
}

// expectedBytes returns how many bytes we'd need to accommodate the range of offsets using estimatedBytesPerRecord.
// They may be more than the kafka protocol supports (> MaxInt32). Use MaxBytes.
func (w fetchWant) expectedBytes() int {
func (w fetchWant) expectedBytes() int64 {
// We over-fetch bytes to reduce the likelihood of under-fetching and having to run another request.
// Based on some testing 65% of under-estimations are by less than 5%. So we account for that.
const overFetchBytesFactor = 1.05
return int(overFetchBytesFactor * float64(w.estimatedBytesPerRecord*int(w.endOffset-w.startOffset)))
}

// trimIfTooBig adjusts the end offset if we expect to fetch too many bytes.
// It's capped at math.MaxInt32 bytes.
func (w fetchWant) trimIfTooBig() fetchWant {
if w.expectedBytes() <= math.MaxInt32 {
return w
}
// We are overflowing, so we need to trim the end offset.
// We do this by calculating how many records we can fetch with the max bytes, and then setting the end offset to that.
w.endOffset = w.startOffset + int64(math.MaxInt32/w.estimatedBytesPerRecord)
return w
return int64(overFetchBytesFactor * float64(int64(w.estimatedBytesPerRecord)*(w.endOffset-w.startOffset)))
}

type fetchResult struct {
Expand Down
101 changes: 101 additions & 0 deletions pkg/storage/ingest/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"math"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -1220,3 +1221,103 @@ func (w waiterFunc) Wait() { w() }
type refresherFunc func()

func (r refresherFunc) ForceMetadataRefresh() { r() }

func TestFetchWant_MaxBytes(t *testing.T) {
testCases := map[string]struct {
fw fetchWant
expected int32
}{
"small fetch": {
fw: fetchWant{
startOffset: 0,
endOffset: 10,
estimatedBytesPerRecord: 100,
},
expected: 1_000_000, // minimum fetch size
},
"medium fetch": {
fw: fetchWant{
startOffset: 0,
endOffset: 1000,
estimatedBytesPerRecord: 1000,
},
expected: 1_050_000, // 1000 * 1000 * 1.05
},
"huge fetch with huge bytes per record; overflow risk": {
fw: fetchWant{
startOffset: 0,
endOffset: 2 << 31,
estimatedBytesPerRecord: 2 << 30,
},
expected: math.MaxInt32,
},
"negative product due to overflow": {
fw: fetchWant{
startOffset: 0,
endOffset: math.MaxInt64,
estimatedBytesPerRecord: math.MaxInt32,
},
expected: math.MaxInt32,
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
result := tc.fw.MaxBytes()
assert.Equal(t, tc.expected, result)
assert.GreaterOrEqual(t, result, int32(0), "MaxBytes should never return negative values")
})
}
}

func TestFetchWant_UpdateBytesPerRecord(t *testing.T) {
baseWant := fetchWant{
startOffset: 100,
endOffset: 200,
estimatedBytesPerRecord: 1000,
}

testCases := map[string]struct {
lastFetchBytes int
lastFetchRecords int
expectedBytesPerRecord int
}{
"similar to estimate": {
lastFetchBytes: 10000,
lastFetchRecords: 10,
expectedBytesPerRecord: 1000,
},
"much larger than estimate": {
lastFetchBytes: 100000,
lastFetchRecords: 10,
expectedBytesPerRecord: 2800,
},
"much smaller than estimate": {
lastFetchBytes: 1000,
lastFetchRecords: 10,
expectedBytesPerRecord: 820,
},
"risk of overflow": {
lastFetchBytes: math.MaxInt64,
lastFetchRecords: 1,
expectedBytesPerRecord: math.MaxInt64/5 + int(float64(baseWant.estimatedBytesPerRecord)*0.8),
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
result := baseWant.UpdateBytesPerRecord(tc.lastFetchBytes, tc.lastFetchRecords)

assert.Equal(t, baseWant.startOffset, result.startOffset, "startOffset should not change")
assert.Equal(t, baseWant.endOffset, result.endOffset, "endOffset should not change")

// Check the new bytes per record estimation. Because of large numbers and floats we allow for 0.1% error.
assert.InEpsilon(t, tc.expectedBytesPerRecord, result.estimatedBytesPerRecord, 0.001)

// Verify MaxBytes() doesn't overflow or return negative values
maxBytes := result.MaxBytes()
assert.GreaterOrEqual(t, maxBytes, int32(0), "MaxBytes should never return negative values")
assert.LessOrEqual(t, maxBytes, int32(math.MaxInt32), "MaxBytes should never exceed MaxInt32")
})
}
}
Loading