Skip to content

Commit

Permalink
conn.go: Removed (half broken, useless) deadline methods and cleaned …
Browse files Browse the repository at this point in the history
…up logic in (re)sending.
  • Loading branch information
Sandertv committed May 3, 2024
1 parent 9df78e9 commit e76728a
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 90 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ API used to listen for connections or connect to servers.
## Getting started

### Prerequisites
**As of go-raknet version 1.13.0, go-raknet requires at least Go 1.21**. Version 1.12.1 of go-raknet is
**As of go-raknet version 1.14.0, go-raknet requires at least Go 1.22**. Version 1.12.1 of go-raknet is
the last version of the library that supports Go 1.18 and above.

### Usage
Expand Down
7 changes: 7 additions & 0 deletions binary.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ import (
// uint32, but is an alias for the sake of clarity.
type uint24 uint32

// Inc increments a uint24 and returns the old value.
func (u *uint24) Inc() (old uint24) {
ret := *u
*u += 1
return ret
}

// readUint24 reads 3 bytes from the buffer passed and combines it into a
// uint24. If there were no 3 bytes to read, an error is returned.
func readUint24(buf *bytes.Buffer) (uint24, error) {
Expand Down
121 changes: 37 additions & 84 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package raknet

import (
"bytes"
"context"
"encoding"
"errors"
"fmt"
"github.com/sandertv/go-raknet/internal/message"
"io"
"net"
"net/netip"
"slices"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -85,11 +85,6 @@ type Conn struct {
// datagram sequence number.
retransmission *resendMap

// readDeadline is a channel that receives a time.Time after a specific
// time. It is used to listen for timeouts in Read after calling
// SetReadDeadline.
readDeadline <-chan time.Time

lastActivity atomic.Pointer[time.Time]
}

Expand All @@ -111,8 +106,8 @@ func newConn(conn net.PacketConn, raddr net.Addr, mtu uint16, h connectionHandle
packetQueue: newPacketQueue(),
retransmission: newRecoveryQueue(),
buf: bytes.NewBuffer(make([]byte, 0, mtu)),
ackBuf: bytes.NewBuffer(make([]byte, 0, 256)),
nackBuf: bytes.NewBuffer(make([]byte, 0, 256)),
ackBuf: bytes.NewBuffer(make([]byte, 0, 128)),
nackBuf: bytes.NewBuffer(make([]byte, 0, 64)),
}
t := time.Now()
c.lastActivity.Store(&t)
Expand Down Expand Up @@ -142,34 +137,34 @@ func (conn *Conn) startTicking() {
case t := <-ticker.C:
i++
conn.flushACKs()
if i%2 == 0 {
// Ping the other end periodically to prevent timeouts.
_ = conn.send(&message.ConnectedPing{ClientTimestamp: timestamp()})
}
if i%3 == 0 {
conn.checkResend(t)
}
if i%5 == 0 {
conn.mu.Lock()
if t.Sub(*conn.lastActivity.Load()) > time.Second*5+conn.retransmission.rtt()*2 {
// No activity for too long: Start timeout.
_ = conn.Close()
}
conn.mu.Unlock()
}
if unix := conn.closing.Load(); unix != 0 {
before := acksLeft
conn.mu.Lock()
acksLeft = len(conn.retransmission.unacknowledged)
conn.mu.Unlock()

if before != 0 && acksLeft == 0 {
conn.closeImmediately()
}

since := time.Since(time.Unix(unix, 0))
if (acksLeft == 0 && since > time.Second) || since > time.Second*8 {
if (acksLeft == 0 && since > time.Second) || since > time.Second*5 {
conn.closeImmediately()
}
continue
}
if i%5 == 0 {
// Ping the other end periodically to prevent timeouts.
_ = conn.send(&message.ConnectedPing{ClientTimestamp: timestamp()})

conn.mu.Lock()
if t.Sub(*conn.lastActivity.Load()) > time.Second*5+conn.retransmission.rtt()*2 {
// No activity for too long: Start timeout.
_ = conn.Close()
}
conn.mu.Unlock()
}
case <-conn.closed:
return
Expand Down Expand Up @@ -227,7 +222,7 @@ func (conn *Conn) Write(b []byte) (n int, err error) {
default:
conn.mu.Lock()
defer conn.mu.Unlock()
n, err := conn.write(b)
n, err = conn.write(b)
return n, conn.error(err, "write")
}
}
Expand All @@ -239,8 +234,7 @@ func (conn *Conn) Write(b []byte) (n int, err error) {
// Write, write will not lock.
func (conn *Conn) write(b []byte) (n int, err error) {
fragments := split(b, conn.effectiveMTU())
orderIndex := conn.orderIndex
conn.orderIndex++
orderIndex := conn.orderIndex.Inc()

splitID := uint16(conn.splitID)
if len(fragments) > 1 {
Expand All @@ -258,9 +252,7 @@ func (conn *Conn) write(b []byte) (n int, err error) {
copy(pk.content, content)

pk.orderIndex = orderIndex
pk.messageIndex = conn.messageIndex
conn.messageIndex++

pk.messageIndex = conn.messageIndex.Inc()
if pk.split = len(fragments) > 1; pk.split {
// If there were more than one fragment, the pk was split, so we
// need to make sure we set the appropriate fields.
Expand All @@ -284,13 +276,11 @@ func (conn *Conn) Read(b []byte) (n int, err error) {
select {
case pk := <-conn.packets:
if len(b) < len(*pk) {
err = conn.error(errBufferTooSmall, "read")
err = conn.error(ErrBufferTooSmall, "read")
}
return copy(b, *pk), err
case <-conn.closed:
return 0, conn.error(net.ErrClosed, "read")
case <-conn.readDeadline:
return 0, conn.error(context.DeadlineExceeded, "read")
}
}

Expand All @@ -303,8 +293,6 @@ func (conn *Conn) ReadPacket() (b []byte, err error) {
return *pk, err
case <-conn.closed:
return nil, conn.error(net.ErrClosed, "read")
case <-conn.readDeadline:
return nil, conn.error(context.DeadlineExceeded, "read")
}
}

Expand Down Expand Up @@ -338,35 +326,14 @@ func (conn *Conn) LocalAddr() net.Addr {
return conn.conn.LocalAddr()
}

// SetReadDeadline sets the read deadline of the connection. An error is
// returned only if the time passed is before time.Now(). Calling
// SetReadDeadline means the next Read call that exceeds the deadline will fail
// and return an error. Setting the read deadline to the default value of
// time.Time removes the deadline.
func (conn *Conn) SetReadDeadline(t time.Time) error {
if t.IsZero() {
conn.readDeadline = make(chan time.Time)
return nil
}
if t.Before(time.Now()) {
panic(fmt.Errorf("read deadline cannot be before now"))
}
conn.readDeadline = time.After(time.Until(t))
return nil
}
// SetReadDeadline is unimplemented. It always returns ErrNotSupported.
func (conn *Conn) SetReadDeadline(time.Time) error { return ErrNotSupported }

// SetWriteDeadline has no behaviour. It is merely there to satisfy the
// net.Conn interface.
func (conn *Conn) SetWriteDeadline(time.Time) error {
return nil
}
// SetWriteDeadline is unimplemented. It always returns ErrNotSupported.
func (conn *Conn) SetWriteDeadline(time.Time) error { return ErrNotSupported }

// SetDeadline sets the deadline of the connection for both Read and Write.
// SetDeadline is equivalent to calling both SetReadDeadline and
// SetWriteDeadline.
func (conn *Conn) SetDeadline(t time.Time) error {
return conn.SetReadDeadline(t)
}
// SetDeadline is unimplemented. It always returns ErrNotSupported.
func (conn *Conn) SetDeadline(time.Time) error { return ErrNotSupported }

// Latency returns a rolling average of rtt between the sending and the
// receiving end of the connection. The rtt returned is updated continuously
Expand Down Expand Up @@ -541,26 +508,14 @@ func (conn *Conn) receiveSplitPacket(p *packet) error {
}
m[p.splitIndex] = p.content

size := 0
for _, fragment := range m {
if len(fragment) == 0 {
// We haven't yet received all split fragments, so we cannot add the
// packets together yet.
return nil
}
// First we calculate the total size required to hold the content of the
// combined content.
size += len(fragment)
}

content := make([]byte, 0, size)
for _, fragment := range m {
content = append(content, fragment...)
if slices.ContainsFunc(m, func(i []byte) bool { return len(i) == 0 }) {
// We haven't yet received all split fragments, so we cannot add the
// packets together yet.
return nil
}
p.content = slices.Concat(m...)

delete(conn.splits, p.splitID)

p.content = content
return conn.receivePacket(p)
}

Expand Down Expand Up @@ -611,11 +566,10 @@ func (conn *Conn) handleACK(b []byte) error {
}
for _, sequenceNumber := range ack.packets {
// Take out all stored packets from the recovery queue.
p, ok := conn.retransmission.acknowledge(sequenceNumber)
if ok {
if p, ok := conn.retransmission.acknowledge(sequenceNumber); ok {
// Clear the packet and return it to the pool so that it may be
// re-used.
p.content = nil
p.content = p.content[:0]
packetPool.Put(p)
}
}
Expand Down Expand Up @@ -654,9 +608,8 @@ func (conn *Conn) resend(sequenceNumbers []uint24) (err error) {
// passed. It is assigned a new sequence number and added to the retransmission.
func (conn *Conn) sendDatagram(pk *packet) error {
conn.buf.WriteByte(bitFlagDatagram | bitFlagNeedsBAndAS)
newSeqNum := conn.seq
conn.seq++
writeUint24(conn.buf, newSeqNum)
seq := conn.seq.Inc()
writeUint24(conn.buf, seq)
pk.write(conn.buf)

// We then send the pk to the connection.
Expand All @@ -665,7 +618,7 @@ func (conn *Conn) sendDatagram(pk *packet) error {
}
// We then re-add the pk to the recovery queue in case the new one gets
// lost too, in which case we need to resend it again.
conn.retransmission.add(newSeqNum, pk)
conn.retransmission.add(seq, pk)
conn.buf.Reset()
return nil
}
Expand Down
11 changes: 9 additions & 2 deletions err.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,15 @@ import (
)

var (
errBufferTooSmall = errors.New("a message sent was larger than the buffer used to receive the message into")
errListenerClosed = errors.New("use of closed listener")
// ErrBufferTooSmall is returned when Conn.Read is called with a byte slice
// that is too small to contain the packet to be read.
ErrBufferTooSmall = errors.New("a message sent was larger than the buffer used to receive the message into")
// ErrListenerClosed is returned when Listener.Accept is called on a closed
// listener.
ErrListenerClosed = errors.New("use of closed listener")
// ErrNotSupported is returned for deadline methods of a Conn, which are not
// supported on a raknet.Conn.
ErrNotSupported = errors.New("feature not supported")
)

// error wraps the error passed into a net.OpError with the op as operation and
Expand Down
2 changes: 1 addition & 1 deletion listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func Listen(address string) (*Listener, error) {
func (listener *Listener) Accept() (net.Conn, error) {
conn, ok := <-listener.incoming
if !ok {
return nil, &net.OpError{Op: "accept", Net: "raknet", Source: nil, Addr: nil, Err: errListenerClosed}
return nil, &net.OpError{Op: "accept", Net: "raknet", Source: nil, Addr: nil, Err: ErrListenerClosed}
}
return conn, nil
}
Expand Down
4 changes: 2 additions & 2 deletions resend_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ func (m *resendMap) remove(index uint24, mul int) (*packet, bool) {
// into the recovery queue and the taking out of it again. It is measured over
// the last delayRecordCount values add in.
func (m *resendMap) rtt() time.Duration {
const averageDuration = time.Second * 5
const maxRTT = time.Second * 5
var (
total, records time.Duration
now = time.Now()
)
for t, rtt := range m.delays {
if now.Sub(t) > averageDuration {
if now.Sub(t) > maxRTT {
delete(m.delays, t)
continue
}
Expand Down

0 comments on commit e76728a

Please sign in to comment.