Skip to content

Commit

Permalink
Merge pull request #439 from pkg/feature/ReadFromWithConcurrency
Browse files Browse the repository at this point in the history
Export a ReadFromWithConcurrency function that permits ensuring concurrency usage.
  • Loading branch information
drakkan authored May 22, 2021
2 parents de44fbb + 61f5f29 commit 5b98d05
Showing 1 changed file with 23 additions and 10 deletions.
33 changes: 23 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1539,8 +1539,13 @@ func (f *File) WriteAt(b []byte, off int64) (written int, err error) {
return len(b), nil
}

// readFromConcurrent implements ReaderFrom, but works concurrently rather than sequentially.
func (f *File) readFromConcurrent(r io.Reader, remain int64) (read int64, err error) {
// ReadFromWithConcurrency implements ReaderFrom,
// but uses the given concurrency to issue multiple requests at the same time.
//
// Giving a concurrency of less than one will default to the Client’s max concurrency.
//
// Otherwise, the given concurrency will be capped by the Client's max concurrency.
func (f *File) ReadFromWithConcurrency(r io.Reader, concurrency int) (read int64, err error) {
// Split the write into multiple maxPacket sized concurrent writes.
// This allows writes with a suitably large reader
// to transfer data at a much faster rate due to overlapping round trip times.
Expand All @@ -1560,12 +1565,9 @@ func (f *File) readFromConcurrent(r io.Reader, remain int64) (read int64, err er
}
errCh := make(chan rwErr)

concurrency64 := remain/int64(f.c.maxPacket) + 1 // a bad guess, but better than no guess
if concurrency64 > int64(f.c.maxConcurrentRequests) || concurrency64 < 1 {
concurrency64 = int64(f.c.maxConcurrentRequests)
if concurrency > f.c.maxConcurrentRequests || concurrency < 1 {
concurrency = f.c.maxConcurrentRequests
}
// Now that concurrency64 is saturated to an int value, we know this assignment cannot possibly overflow.
concurrency := int(concurrency64)

pool := newBufPool(concurrency, f.c.maxPacket)

Expand Down Expand Up @@ -1694,12 +1696,23 @@ func (f *File) ReadFrom(r io.Reader) (int64, error) {
}

if remain < 0 {
remain = math.MaxInt64
// We can strongly assert that we want default max concurrency here.
return f.ReadFromWithConcurrency(r, f.c.maxConcurrentRequests)
}

if remain > int64(f.c.maxPacket) {
// Only use concurrency, if it would be at least two read/writes.
return f.readFromConcurrent(r, remain)
// Otherwise, only use concurrency, if it would be at least two packets.

// This is the best reasonable guess we can make.
concurrency64 := remain/int64(f.c.maxPacket) + 1

// We need to cap this value to an `int` size value to avoid overflow on 32-bit machines.
// So, we may as well pre-cap it to `f.c.maxConcurrentRequests`.
if concurrency64 > int64(f.c.maxConcurrentRequests) {
concurrency64 = int64(f.c.maxConcurrentRequests)
}

return f.ReadFromWithConcurrency(r, int(concurrency64))
}
}

Expand Down

0 comments on commit 5b98d05

Please sign in to comment.