Skip to content

Commit

Permalink
Share waitForStableBufferedRecords
Browse files Browse the repository at this point in the history
Signed-off-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
dimitarvdimitrov committed Nov 14, 2024
1 parent 009d0b8 commit 47f2827
Showing 1 changed file with 16 additions and 25 deletions.
41 changes: 16 additions & 25 deletions pkg/storage/ingest/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,16 @@ func TestConcurrentFetchers(t *testing.T) {
concurrency = 2
)

waitForStableBufferedRecords := func(t *testing.T, f fetcher) {
previousBufferedRecords := int64(0)
assert.Eventually(t, func() bool {
bufferedRecords := f.BufferedRecords()
stabilized := bufferedRecords == previousBufferedRecords
previousBufferedRecords = bufferedRecords
return stabilized
}, 2*time.Second, 100*time.Millisecond)
}

t.Run("respect context cancellation", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
Expand Down Expand Up @@ -1091,17 +1101,8 @@ func TestConcurrentFetchers(t *testing.T) {
// Create fetchers with tracking of uncompressed bytes
fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, concurrency, recordsPerFetch, maxInflightBytes)

waitForStableBufferedRecords := func() {
previousBufferedRecords := int64(0)
assert.Eventually(t, func() bool {
bufferedRecords := fetchers.BufferedRecords()
stabilized := bufferedRecords == previousBufferedRecords
previousBufferedRecords = bufferedRecords
return stabilized
}, 2*time.Second, 100*time.Millisecond)
}
// Wait for buffered records to stabilize, we expect that they stabilize because the limit is in effect.
waitForStableBufferedRecords()
waitForStableBufferedRecords(t, fetchers)

// Assert that we don't buffer more than maxInflightBytes
assert.LessOrEqualf(t, fetchers.BufferedBytes(), int64(maxInflightBytes), "Should not buffer more than %d bytes of records", maxInflightBytes)
Expand All @@ -1112,7 +1113,7 @@ func TestConcurrentFetchers(t *testing.T) {
require.Greater(t, totalConsumedRecords, 0, "Should have received some records")

// Allow time for more fetches
waitForStableBufferedRecords()
waitForStableBufferedRecords(t, fetchers)

// Assert again that buffered bytes remain under limit
assert.LessOrEqualf(t, fetchers.BufferedRecords(), int64(maxInflightBytes), "Should still not buffer more than %d bytes after consuming some records", maxInflightBytes)
Expand All @@ -1124,7 +1125,7 @@ func TestConcurrentFetchers(t *testing.T) {
}

// Allow time for more fetches
waitForStableBufferedRecords()
waitForStableBufferedRecords(t, fetchers)

pollFetchesAndAssertNoRecords(t, fetchers)
assert.Equal(t, totalProducedRecords, totalConsumedRecords, "Should have received all records eventually")
Expand Down Expand Up @@ -1156,16 +1157,6 @@ func TestConcurrentFetchers(t *testing.T) {
// Create fetchers early to ensure we don't miss any records
fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, concurrency, recordsPerFetch, maxInflightBytes)

waitForStableBufferedRecords := func() {
previousBufferedRecords := int64(0)
assert.Eventually(t, func() bool {
bufferedRecords := fetchers.BufferedRecords()
stabilized := bufferedRecords == previousBufferedRecords
previousBufferedRecords = bufferedRecords
return stabilized
}, 2*time.Second, 100*time.Millisecond)
}

// Produce large records
largeValue := bytes.Repeat([]byte{'a'}, largeRecordSize)
for i := 0; i < largeRecordsCount; i++ {
Expand All @@ -1174,7 +1165,7 @@ func TestConcurrentFetchers(t *testing.T) {

t.Logf("Produced %d large records", largeRecordsCount)

waitForStableBufferedRecords()
waitForStableBufferedRecords(t, fetchers)
t.Log("Buffered records stabilized")

assert.LessOrEqualf(t, fetchers.BufferedBytes(), int64(maxInflightBytes), "Should not buffer more than %d bytes of large records", maxInflightBytes)
Expand Down Expand Up @@ -1203,7 +1194,7 @@ func TestConcurrentFetchers(t *testing.T) {
t.Log("Consumed half of the small records")

// Assert that the buffer is well utilized.
waitForStableBufferedRecords()
waitForStableBufferedRecords(t, fetchers)
t.Log("Buffered records stabilized")

assert.LessOrEqualf(t, fetchers.BufferedBytes(), int64(maxInflightBytes), "Should not buffer more than %d bytes of small records", maxInflightBytes)
Expand All @@ -1221,7 +1212,7 @@ func TestConcurrentFetchers(t *testing.T) {
assert.Equal(t, totalProducedRecords, consumedRecords, "Should have consumed all records")

// Verify no more records are buffered. First wait for the buffered records to stabilize.
waitForStableBufferedRecords()
waitForStableBufferedRecords(t, fetchers)

pollFetchesAndAssertNoRecords(t, fetchers)
})
Expand Down

0 comments on commit 47f2827

Please sign in to comment.