Skip to content

Commit

Permalink
refactor tcp transport
Browse files Browse the repository at this point in the history
  • Loading branch information
mrdimidium committed Apr 22, 2024
1 parent 3a8772b commit 7d1e3b3
Show file tree
Hide file tree
Showing 7 changed files with 415 additions and 413 deletions.
54 changes: 32 additions & 22 deletions chotki.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package chotki

import (
"context"
"encoding/binary"
"errors"
"fmt"
Expand Down Expand Up @@ -53,7 +54,7 @@ type Chotki struct {
src uint64

db *pebble.DB
net *toytlv.TCPDepot
net *toytlv.Transport
dir string
opts Options

Expand Down Expand Up @@ -188,36 +189,20 @@ func Open(dirname string, opts Options) (*Chotki, error) {
src: opts.Src,
dir: dirname,
opts: opts,
net: &toytlv.TCPDepot{},
types: make(map[rdx.ID]Fields),
hooks: make(map[rdx.ID][]Hook),
syncs: make(map[rdx.ID]*pebble.Batch),
outq: make(map[string]toyqueue.DrainCloser),
}

cho.net.Open(func(conn net.Conn) toyqueue.FeedDrainCloser {
cho.net = toytlv.NewTransport(func(conn net.Conn) toyqueue.FeedDrainCloser {
return &Syncer{
Host: &cho,
Mode: SyncRWLive,
Name: conn.RemoteAddr().String(),
}
})

if opts.RestoreNetwork {
i := cho.db.NewIter(&pebble.IterOptions{})
defer i.Close()

for i.SeekGE([]byte{'l'}); i.Valid() && i.Key()[0] == 'L'; i.Next() {
address := string(i.Key()[1:])
_ = cho.net.Listen(address)
}

for i.SeekGE([]byte{'c'}); i.Valid() && i.Key()[0] == 'C'; i.Next() {
address := string(i.Key()[1:])
_ = cho.net.Connect(address)
}
}

if !exists {
id0 := rdx.IDFromSrcSeqOff(opts.Src, 0, 0)

Expand All @@ -243,12 +228,37 @@ func Open(dirname string, opts Options) (*Chotki, error) {
return &cho, nil
}

func (cho *Chotki) Listen(addr string) error {
return cho.net.Listen(addr)
func (cho *Chotki) RestoreNet(ctx context.Context) error {
i := cho.db.NewIter(&pebble.IterOptions{})
defer i.Close()

for i.SeekGE([]byte{'l'}); i.Valid() && i.Key()[0] == 'L'; i.Next() {
address := string(i.Key()[1:])
_ = cho.net.Listen(ctx, address)
}

for i.SeekGE([]byte{'c'}); i.Valid() && i.Key()[0] == 'C'; i.Next() {
address := string(i.Key()[1:])
_ = cho.net.Connect(ctx, address)
}

return nil
}

func (cho *Chotki) Listen(ctx context.Context, addr string) error {
return cho.net.Listen(ctx, addr)
}

func (cho *Chotki) Unlisten(addr string) error {
return cho.net.Unlisten(addr)
}

func (cho *Chotki) Connect(ctx context.Context, addr string) error {
return cho.net.Connect(ctx, addr)
}

func (cho *Chotki) Connect(addr string) error {
return cho.net.Connect(addr)
func (cho *Chotki) Disconnect(addr string) error {
return cho.net.Disconnect(addr)
}

func (cho *Chotki) AddPacketHose(name string) (feed toyqueue.FeedCloser) {
Expand Down
5 changes: 3 additions & 2 deletions repl/commands.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"errors"
"fmt"
"os"
Expand Down Expand Up @@ -316,7 +317,7 @@ func (repl *REPL) CommandListen(arg *rdx.RDX) (id rdx.ID, err error) {
}
addr := rdx.Snative(rdx.Sparse(string(arg.Text)))
if err == nil {
err = repl.Host.Listen(addr)
err = repl.Host.Listen(context.Background(), addr)
}
return
}
Expand All @@ -329,7 +330,7 @@ func (repl *REPL) CommandConnect(arg *rdx.RDX) (id rdx.ID, err error) {
}
addr := rdx.Snative(rdx.Sparse(string(arg.Text)))
if err == nil {
err = repl.Host.Connect(addr)
err = repl.Host.Connect(context.Background(), addr)
}
return
}
Expand Down
194 changes: 194 additions & 0 deletions toytlv/peer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package toytlv

import (
"io"
"log/slog"
"net"
"sync"
"sync/atomic"
"time"

"github.com/drpcorg/chotki/toyqueue"
)

type ConnType = uint

const (
TCP ConnType = iota + 1
TLS
QUIC
)

const (
TYPICAL_MTU = 1500
MAX_OUT_QUEUE_LEN = 1 << 20 // 16MB of pointers is a lot

MAX_RETRY_PERIOD = time.Minute
MIN_RETRY_PERIOD = time.Second / 2
)

type Jack func(conn net.Conn) toyqueue.FeedDrainCloser

type Peer struct {
conn atomic.Pointer[net.Conn]
inout toyqueue.FeedDrainCloser
reconnect func() (net.Conn, error)

mu sync.Mutex
co sync.Cond

Protocol ConnType
KeepAlive bool
}

func (tcp *Peer) doRead() {
err := tcp.read()

if err != nil && err != ErrDisconnected {
_ = tcp.Close()

// TODO: error handling
slog.Error("couldn't read from conn", "err", err)
}
}

func (tcp *Peer) read() error {
var buf []byte
var err error

for {
conn := tcp.conn.Load()
if conn == nil {
break
}

if buf, err = appendRead(buf, *conn, TYPICAL_MTU); err != nil {
return err
}

var recs toyqueue.Records
if recs, buf, err = Split(buf); err != nil {
return err
} else if len(recs) == 0 {
time.Sleep(time.Millisecond)
continue
}

if err = tcp.inout.Drain(recs); err != nil {
return err
}
}

return nil
}

func (tcp *Peer) doWrite() {
var err error
var recs toyqueue.Records
for err == nil {
conn := tcp.conn.Load()
if conn == nil {
break
}

recs, err = tcp.inout.Feed()
b := net.Buffers(recs)
for len(b) > 0 && err == nil {
_, err = b.WriteTo(*conn)
}
}
if err != nil {
tcp.Close() // TODO err
}
}

func (tcp *Peer) Drain(recs toyqueue.Records) error {
return tcp.inout.Drain(recs)
}

func (tcp *Peer) Feed() (toyqueue.Records, error) {
return tcp.inout.Feed()
}

func (tcp *Peer) keepTalking() {
go tcp.doWrite()
go tcp.doRead()

talkBackoff, connBackoff := MIN_RETRY_PERIOD, MIN_RETRY_PERIOD

for tcp.reconnect == nil {
conntime := time.Now()

atLeast5min := conntime.Add(time.Minute * 5)
if atLeast5min.After(time.Now()) {
talkBackoff *= 2 // connected, tried to talk, failed => wait more
if talkBackoff > MAX_RETRY_PERIOD {
talkBackoff = MAX_RETRY_PERIOD
}
}

for {
if conn := tcp.conn.Load(); conn == nil {
break
}

time.Sleep(connBackoff + talkBackoff)
conn, err := tcp.reconnect()
if err != nil {
connBackoff = connBackoff * 2
if connBackoff > MAX_RETRY_PERIOD/2 {
connBackoff = MAX_RETRY_PERIOD
}
} else {
tcp.conn.Store(&conn)
connBackoff = MIN_RETRY_PERIOD
}
}
}
}

func (tcp *Peer) Close() error {
tcp.mu.Lock()
defer tcp.mu.Unlock()

// TODO writer closes on complete | 1 sec expired
if conn := tcp.conn.Swap(nil); conn != nil {
if err := (*conn).Close(); err != nil {
return err
}

tcp.co.Broadcast()
}

return nil
}

func roundPage(l int) int {
if (l & 0xfff) != 0 {
l = (l & ^0xfff) + 0x1000
}
return l
}

// appendRead reads data from io.Reader into the *spare space* of the provided buffer,
// i.e. those cap(buf)-len(buf) vacant bytes. If the spare space is smaller than
// lenHint, allocates (as reading less bytes might be unwise).
func appendRead(buf []byte, rdr io.Reader, lenHint int) ([]byte, error) {
avail := cap(buf) - len(buf)
if avail < lenHint {
want := roundPage(len(buf) + lenHint)
newbuf := make([]byte, want)
copy(newbuf[:], buf)
buf = newbuf[:len(buf)]
}
idle := buf[len(buf):cap(buf)]
n, err := rdr.Read(idle)
if err != nil {
return buf, err
}
if n == 0 {
return buf, io.EOF
}
buf = buf[:len(buf)+n]
return buf, nil
}
Loading

0 comments on commit 7d1e3b3

Please sign in to comment.