diff --git a/README.md b/README.md index 9f072ff..e4566dd 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ API used to listen for connections or connect to servers. ## Getting started ### Prerequisites -**As of go-raknet version 1.13.0, go-raknet requires at least Go 1.21**. Version 1.12.1 of go-raknet is +**As of go-raknet version 1.14.0, go-raknet requires at least Go 1.22**. Version 1.12.1 of go-raknet is the last version of the library that supports Go 1.18 and above. ### Usage diff --git a/binary.go b/binary.go index 9d47079..7e61464 100644 --- a/binary.go +++ b/binary.go @@ -9,6 +9,13 @@ import ( // uint32, but is an alias for the sake of clarity. type uint24 uint32 +// Inc increments a uint24 and returns the old value. +func (u *uint24) Inc() (old uint24) { + ret := *u + *u += 1 + return ret +} + // readUint24 reads 3 bytes from the buffer passed and combines it into a // uint24. If there were no 3 bytes to read, an error is returned. func readUint24(buf *bytes.Buffer) (uint24, error) { diff --git a/conn.go b/conn.go index 82b21d9..23cd74d 100644 --- a/conn.go +++ b/conn.go @@ -2,7 +2,6 @@ package raknet import ( "bytes" - "context" "encoding" "errors" "fmt" @@ -10,6 +9,7 @@ import ( "io" "net" "net/netip" + "slices" "sync" "sync/atomic" "time" @@ -85,11 +85,6 @@ type Conn struct { // datagram sequence number. retransmission *resendMap - // readDeadline is a channel that receives a time.Time after a specific - // time. It is used to listen for timeouts in Read after calling - // SetReadDeadline. - readDeadline <-chan time.Time - lastActivity atomic.Pointer[time.Time] } @@ -111,8 +106,8 @@ func newConn(conn net.PacketConn, raddr net.Addr, mtu uint16, h connectionHandle packetQueue: newPacketQueue(), retransmission: newRecoveryQueue(), buf: bytes.NewBuffer(make([]byte, 0, mtu)), - ackBuf: bytes.NewBuffer(make([]byte, 0, 256)), - nackBuf: bytes.NewBuffer(make([]byte, 0, 256)), + ackBuf: bytes.NewBuffer(make([]byte, 0, 128)), + nackBuf: bytes.NewBuffer(make([]byte, 0, 64)), } t := time.Now() c.lastActivity.Store(&t) @@ -142,34 +137,34 @@ func (conn *Conn) startTicking() { case t := <-ticker.C: i++ conn.flushACKs() - if i%2 == 0 { - // Ping the other end periodically to prevent timeouts. - _ = conn.send(&message.ConnectedPing{ClientTimestamp: timestamp()}) - } if i%3 == 0 { conn.checkResend(t) } - if i%5 == 0 { - conn.mu.Lock() - if t.Sub(*conn.lastActivity.Load()) > time.Second*5+conn.retransmission.rtt()*2 { - // No activity for too long: Start timeout. - _ = conn.Close() - } - conn.mu.Unlock() - } if unix := conn.closing.Load(); unix != 0 { before := acksLeft conn.mu.Lock() acksLeft = len(conn.retransmission.unacknowledged) conn.mu.Unlock() + if before != 0 && acksLeft == 0 { conn.closeImmediately() } - since := time.Since(time.Unix(unix, 0)) - if (acksLeft == 0 && since > time.Second) || since > time.Second*8 { + if (acksLeft == 0 && since > time.Second) || since > time.Second*5 { conn.closeImmediately() } + continue + } + if i%5 == 0 { + // Ping the other end periodically to prevent timeouts. + _ = conn.send(&message.ConnectedPing{ClientTimestamp: timestamp()}) + + conn.mu.Lock() + if t.Sub(*conn.lastActivity.Load()) > time.Second*5+conn.retransmission.rtt()*2 { + // No activity for too long: Start timeout. + _ = conn.Close() + } + conn.mu.Unlock() } case <-conn.closed: return @@ -227,7 +222,7 @@ func (conn *Conn) Write(b []byte) (n int, err error) { default: conn.mu.Lock() defer conn.mu.Unlock() - n, err := conn.write(b) + n, err = conn.write(b) return n, conn.error(err, "write") } } @@ -239,8 +234,7 @@ func (conn *Conn) Write(b []byte) (n int, err error) { // Write, write will not lock. func (conn *Conn) write(b []byte) (n int, err error) { fragments := split(b, conn.effectiveMTU()) - orderIndex := conn.orderIndex - conn.orderIndex++ + orderIndex := conn.orderIndex.Inc() splitID := uint16(conn.splitID) if len(fragments) > 1 { @@ -258,9 +252,7 @@ func (conn *Conn) write(b []byte) (n int, err error) { copy(pk.content, content) pk.orderIndex = orderIndex - pk.messageIndex = conn.messageIndex - conn.messageIndex++ - + pk.messageIndex = conn.messageIndex.Inc() if pk.split = len(fragments) > 1; pk.split { // If there were more than one fragment, the pk was split, so we // need to make sure we set the appropriate fields. @@ -284,13 +276,11 @@ func (conn *Conn) Read(b []byte) (n int, err error) { select { case pk := <-conn.packets: if len(b) < len(*pk) { - err = conn.error(errBufferTooSmall, "read") + err = conn.error(ErrBufferTooSmall, "read") } return copy(b, *pk), err case <-conn.closed: return 0, conn.error(net.ErrClosed, "read") - case <-conn.readDeadline: - return 0, conn.error(context.DeadlineExceeded, "read") } } @@ -303,8 +293,6 @@ func (conn *Conn) ReadPacket() (b []byte, err error) { return *pk, err case <-conn.closed: return nil, conn.error(net.ErrClosed, "read") - case <-conn.readDeadline: - return nil, conn.error(context.DeadlineExceeded, "read") } } @@ -338,35 +326,14 @@ func (conn *Conn) LocalAddr() net.Addr { return conn.conn.LocalAddr() } -// SetReadDeadline sets the read deadline of the connection. An error is -// returned only if the time passed is before time.Now(). Calling -// SetReadDeadline means the next Read call that exceeds the deadline will fail -// and return an error. Setting the read deadline to the default value of -// time.Time removes the deadline. -func (conn *Conn) SetReadDeadline(t time.Time) error { - if t.IsZero() { - conn.readDeadline = make(chan time.Time) - return nil - } - if t.Before(time.Now()) { - panic(fmt.Errorf("read deadline cannot be before now")) - } - conn.readDeadline = time.After(time.Until(t)) - return nil -} +// SetReadDeadline is unimplemented. It always returns ErrNotSupported. +func (conn *Conn) SetReadDeadline(time.Time) error { return ErrNotSupported } -// SetWriteDeadline has no behaviour. It is merely there to satisfy the -// net.Conn interface. -func (conn *Conn) SetWriteDeadline(time.Time) error { - return nil -} +// SetWriteDeadline is unimplemented. It always returns ErrNotSupported. +func (conn *Conn) SetWriteDeadline(time.Time) error { return ErrNotSupported } -// SetDeadline sets the deadline of the connection for both Read and Write. -// SetDeadline is equivalent to calling both SetReadDeadline and -// SetWriteDeadline. -func (conn *Conn) SetDeadline(t time.Time) error { - return conn.SetReadDeadline(t) -} +// SetDeadline is unimplemented. It always returns ErrNotSupported. +func (conn *Conn) SetDeadline(time.Time) error { return ErrNotSupported } // Latency returns a rolling average of rtt between the sending and the // receiving end of the connection. The rtt returned is updated continuously @@ -541,26 +508,14 @@ func (conn *Conn) receiveSplitPacket(p *packet) error { } m[p.splitIndex] = p.content - size := 0 - for _, fragment := range m { - if len(fragment) == 0 { - // We haven't yet received all split fragments, so we cannot add the - // packets together yet. - return nil - } - // First we calculate the total size required to hold the content of the - // combined content. - size += len(fragment) - } - - content := make([]byte, 0, size) - for _, fragment := range m { - content = append(content, fragment...) + if slices.ContainsFunc(m, func(i []byte) bool { return len(i) == 0 }) { + // We haven't yet received all split fragments, so we cannot add the + // packets together yet. + return nil } + p.content = slices.Concat(m...) delete(conn.splits, p.splitID) - - p.content = content return conn.receivePacket(p) } @@ -611,11 +566,10 @@ func (conn *Conn) handleACK(b []byte) error { } for _, sequenceNumber := range ack.packets { // Take out all stored packets from the recovery queue. - p, ok := conn.retransmission.acknowledge(sequenceNumber) - if ok { + if p, ok := conn.retransmission.acknowledge(sequenceNumber); ok { // Clear the packet and return it to the pool so that it may be // re-used. - p.content = nil + p.content = p.content[:0] packetPool.Put(p) } } @@ -654,9 +608,8 @@ func (conn *Conn) resend(sequenceNumbers []uint24) (err error) { // passed. It is assigned a new sequence number and added to the retransmission. func (conn *Conn) sendDatagram(pk *packet) error { conn.buf.WriteByte(bitFlagDatagram | bitFlagNeedsBAndAS) - newSeqNum := conn.seq - conn.seq++ - writeUint24(conn.buf, newSeqNum) + seq := conn.seq.Inc() + writeUint24(conn.buf, seq) pk.write(conn.buf) // We then send the pk to the connection. @@ -665,7 +618,7 @@ func (conn *Conn) sendDatagram(pk *packet) error { } // We then re-add the pk to the recovery queue in case the new one gets // lost too, in which case we need to resend it again. - conn.retransmission.add(newSeqNum, pk) + conn.retransmission.add(seq, pk) conn.buf.Reset() return nil } diff --git a/err.go b/err.go index 8840648..6b5bae4 100644 --- a/err.go +++ b/err.go @@ -6,8 +6,15 @@ import ( ) var ( - errBufferTooSmall = errors.New("a message sent was larger than the buffer used to receive the message into") - errListenerClosed = errors.New("use of closed listener") + // ErrBufferTooSmall is returned when Conn.Read is called with a byte slice + // that is too small to contain the packet to be read. + ErrBufferTooSmall = errors.New("a message sent was larger than the buffer used to receive the message into") + // ErrListenerClosed is returned when Listener.Accept is called on a closed + // listener. + ErrListenerClosed = errors.New("use of closed listener") + // ErrNotSupported is returned for deadline methods of a Conn, which are not + // supported on a raknet.Conn. + ErrNotSupported = errors.New("feature not supported") ) // error wraps the error passed into a net.OpError with the op as operation and diff --git a/listener.go b/listener.go index 66fb539..836d1eb 100644 --- a/listener.go +++ b/listener.go @@ -110,7 +110,7 @@ func Listen(address string) (*Listener, error) { func (listener *Listener) Accept() (net.Conn, error) { conn, ok := <-listener.incoming if !ok { - return nil, &net.OpError{Op: "accept", Net: "raknet", Source: nil, Addr: nil, Err: errListenerClosed} + return nil, &net.OpError{Op: "accept", Net: "raknet", Source: nil, Addr: nil, Err: ErrListenerClosed} } return conn, nil } diff --git a/resend_map.go b/resend_map.go index 3eee405..c884c3c 100644 --- a/resend_map.go +++ b/resend_map.go @@ -61,13 +61,13 @@ func (m *resendMap) remove(index uint24, mul int) (*packet, bool) { // into the recovery queue and the taking out of it again. It is measured over // the last delayRecordCount values add in. func (m *resendMap) rtt() time.Duration { - const averageDuration = time.Second * 5 + const maxRTT = time.Second * 5 var ( total, records time.Duration now = time.Now() ) for t, rtt := range m.delays { - if now.Sub(t) > averageDuration { + if now.Sub(t) > maxRTT { delete(m.delays, t) continue }