Skip to content

Commit

Permalink
Merge branch 'main' of github.com:drpcorg/chotki
Browse files Browse the repository at this point in the history
  • Loading branch information
gritzko committed Apr 24, 2024
2 parents f70620b + b29614e commit f3256b3
Show file tree
Hide file tree
Showing 17 changed files with 746 additions and 525 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ ragel:
lint:
golangci-lint run ./...

.PHONY: tlsgen
tlsgen:
openssl req -x509 -newkey rsa:4096 -keyout key.pem -out cert.pem -days 1

.PHONY: update-pebble
update-pebble:
go mod edit -replace github.com/cockroachdb/pebble=github.com/drpcorg/pebble@master
Expand Down
136 changes: 73 additions & 63 deletions chotki.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package chotki

import (
"context"
"encoding/binary"
"errors"
"fmt"
Expand All @@ -23,10 +24,11 @@ type Batch [][]byte
type Options struct {
pebble.Options

Orig uint64
Name string
RelaxedOrder bool
MaxLogLen int64
Src uint64
Name string
MaxLogLen int64
RelaxedOrder bool
RestoreNetwork bool
}

func (o *Options) SetDefaults() {
Expand All @@ -51,24 +53,20 @@ type Chotki struct {
last rdx.ID
src uint64

db *pebble.DB
dir string

syncs map[rdx.ID]*pebble.Batch
hooks map[rdx.ID][]Hook
hlock sync.Mutex

// queues to broadcast all new packets
outq map[string]toyqueue.DrainCloser

outlock sync.Mutex
lock sync.Mutex

db *pebble.DB
net *toytlv.Transport
dir string
opts Options

orm ORM

types map[rdx.ID]Fields
outq map[string]toyqueue.DrainCloser // queues to broadcast all new packets
syncs map[rdx.ID]*pebble.Batch
hooks map[rdx.ID][]Hook
types map[rdx.ID]Fields
lock sync.Mutex
hlock sync.Mutex
outlock sync.Mutex
}

var (
Expand Down Expand Up @@ -188,9 +186,9 @@ func Open(dirname string, opts Options) (*Chotki, error) {
return nil, err
}

conn := Chotki{
cho := Chotki{
db: db,
src: opts.Orig,
src: opts.Src,
dir: dirname,
opts: opts,
types: make(map[rdx.ID]Fields),
Expand All @@ -199,8 +197,16 @@ func Open(dirname string, opts Options) (*Chotki, error) {
outq: make(map[string]toyqueue.DrainCloser),
}

cho.net = toytlv.NewTransport(func(conn net.Conn) toyqueue.FeedDrainCloser {
return &Syncer{
Host: &cho,
Mode: SyncRWLive,
Name: conn.RemoteAddr().String(),
}
})

if !exists {
id0 := rdx.IDFromSrcSeqOff(opts.Orig, 0, 0)
id0 := rdx.IDFromSrcSeqOff(opts.Src, 0, 0)

init := append(toyqueue.Records(nil), Log0...)
init = append(init, toytlv.Record('Y',
Expand All @@ -209,63 +215,54 @@ func Open(dirname string, opts Options) (*Chotki, error) {
toytlv.Record('S', rdx.Stlv(opts.Name)),
))

if err = conn.Drain(init); err != nil {
if err = cho.Drain(init); err != nil {
return nil, err
}
}

vv, err := conn.VersionVector()
vv, err := cho.VersionVector()
if err != nil {
return nil, err
}

conn.last = vv.GetID(conn.src)

return &conn, nil
}

func (cho *Chotki) OpenTCP(tcp *toytlv.TCPDepot) error {
if cho.db == nil {
return ErrDbClosed
}

tcp.Open(func(conn net.Conn) toyqueue.FeedDrainCloser {
return &Syncer{Host: cho, Name: conn.RemoteAddr().String(), Mode: SyncRWLive}
})
cho.last = vv.GetID(cho.src)

return nil
return &cho, nil
}

func (cho *Chotki) ReOpenTCP(tcp *toytlv.TCPDepot) error {
if cho.db == nil {
return ErrDbClosed
}

if err := cho.OpenTCP(tcp); err != nil {
return err
}
// ...
io := pebble.IterOptions{}
i := cho.db.NewIter(&io)
func (cho *Chotki) RestoreNet(ctx context.Context) error {
i := cho.db.NewIter(&pebble.IterOptions{})
defer i.Close()

for i.SeekGE([]byte{'l'}); i.Valid() && i.Key()[0] == 'L'; i.Next() {
address := string(i.Key()[1:])
err := tcp.Listen(address)
if err != nil {
_, _ = fmt.Fprintln(os.Stderr, err.Error())
}
_ = cho.net.Listen(ctx, address)
}

for i.SeekGE([]byte{'c'}); i.Valid() && i.Key()[0] == 'C'; i.Next() {
address := string(i.Key()[1:])
err := tcp.Connect(address)
if err != nil {
_, _ = fmt.Fprintln(os.Stderr, err.Error())
}
_ = cho.net.Connect(ctx, address)
}

return nil
}

func (cho *Chotki) Listen(ctx context.Context, addr string) error {
return cho.net.Listen(ctx, addr)
}

func (cho *Chotki) Unlisten(addr string) error {
return cho.net.Unlisten(addr)
}

func (cho *Chotki) Connect(ctx context.Context, addr string) error {
return cho.net.Connect(ctx, addr)
}

func (cho *Chotki) Disconnect(addr string) error {
return cho.net.Disconnect(addr)
}

func (cho *Chotki) AddPacketHose(name string) (feed toyqueue.FeedCloser) {
queue := toyqueue.RecordQueue{Limit: SyncOutQueueLimit}
cho.outlock.Lock()
Expand Down Expand Up @@ -425,18 +422,31 @@ func (cho *Chotki) Snapshot() pebble.Reader {

func (cho *Chotki) Close() error {
cho.lock.Lock()
if cho.db == nil {
cho.lock.Unlock()
return ErrClosed
defer cho.lock.Unlock()

if cho.net != nil {
if err := cho.net.Close(); err != nil {
return err
}
}

_ = cho.orm.Close()
if err := cho.db.Close(); err != nil {
return err

if cho.db != nil {
if err := cho.db.Close(); err != nil {
return err
}
}

clear(cho.outq)
clear(cho.syncs)
clear(cho.hooks)
clear(cho.types)

cho.db = nil
// todo
cho.src = 0
cho.last = rdx.ID0
cho.lock.Unlock()

return nil
}

Expand Down
6 changes: 3 additions & 3 deletions chotki_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestChotki_Create(t *testing.T) {
defer cancel()

a, err := Open(dirs[0], Options{
Orig: 0x1a,
Src: 0x1a,
Name: "test replica",
Options: pebble.Options{ErrorIfExists: true},
})
Expand All @@ -62,9 +62,9 @@ func TestChotki_Sync(t *testing.T) {
dirs, clear := testdirs(0xa, 0xb)
defer clear()

a, err := Open(dirs[0], Options{Orig: 0xa, Name: "test replica A"})
a, err := Open(dirs[0], Options{Src: 0xa, Name: "test replica A"})
assert.Nil(t, err)
b, err := Open(dirs[1], Options{Orig: 0xb, Name: "test replica B"})
b, err := Open(dirs[1], Options{Src: 0xb, Name: "test replica B"})
assert.Nil(t, err)

synca := Syncer{Host: a, Mode: SyncRW, Name: "a"}
Expand Down
6 changes: 3 additions & 3 deletions examples/object_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestORMExample(t *testing.T) {
defer os.RemoveAll("cho1e")
defer os.RemoveAll("cho1f")

a, err := chotki.Open("cho1e", chotki.Options{Orig: 0x1e, Name: "test replica"})
a, err := chotki.Open("cho1e", chotki.Options{Src: 0x1e, Name: "test replica"})
assert.Nil(t, err)
tid, err := a.NewClass(rdx.ID0,
chotki.Field{Name: "Name", RdxType: rdx.String},
Expand All @@ -31,7 +31,7 @@ func TestORMExample(t *testing.T) {
err = a.Close()
assert.Nil(t, err)

a, err = chotki.Open("cho1e", chotki.Options{Orig: 0x1e, Name: "test replica"})
a, err = chotki.Open("cho1e", chotki.Options{Src: 0x1e, Name: "test replica"})
assert.Nil(t, err)

var exa Example
Expand All @@ -45,7 +45,7 @@ func TestORMExample(t *testing.T) {
exa.Score = 103
// todo save the object

b, err := chotki.Open("cho1f", chotki.Options{Orig: 0x1f, Name: "another test replica"})
b, err := chotki.Open("cho1f", chotki.Options{Src: 0x1f, Name: "another test replica"})
assert.Nil(t, err)

syncera := chotki.Syncer{Host: a, Mode: chotki.SyncRW}
Expand Down
2 changes: 1 addition & 1 deletion examples/objects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
func TestTypes(t *testing.T) {
defer os.RemoveAll("cho1a")

a, err := chotki.Open("cho1a", chotki.Options{Orig: 0x1a, Name: "test replica A"})
a, err := chotki.Open("cho1a", chotki.Options{Src: 0x1a, Name: "test replica A"})
assert.Nil(t, err)

var tid, oid rdx.ID
Expand Down
4 changes: 2 additions & 2 deletions examples/plain_object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func TestPlainObjectORM(t *testing.T) {
defer os.RemoveAll("cho10")

a, err := chotki.Open("cho10", chotki.Options{Orig: 0x10, Name: "test replica A"})
a, err := chotki.Open("cho10", chotki.Options{Src: 0x10, Name: "test replica A"})
assert.Nil(t, err)
orma := a.ObjectMapper()

Expand All @@ -36,7 +36,7 @@ func TestPlainObjectORM(t *testing.T) {
//a.DumpAll()
_ = a.Close()

a2, err := chotki.Open("cho10", chotki.Options{Orig: 0x10, Name: "test replica A"})
a2, err := chotki.Open("cho10", chotki.Options{Src: 0x10, Name: "test replica A"})
assert.Nil(t, err)
//a2.DumpAll()
orma2 := a2.ObjectMapper()
Expand Down
21 changes: 5 additions & 16 deletions repl/commands.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"errors"
"fmt"
"os"
Expand Down Expand Up @@ -56,7 +57,7 @@ func (repl *REPL) CommandCreate(arg *rdx.RDX) (id rdx.ID, err error) {

dirname := chotki.ReplicaDirName(src.Src())
repl.Host, err = chotki.Open(dirname, chotki.Options{
Orig: src.Src(),
Src: src.Src(),
Name: name,
Options: pebble.Options{ErrorIfExists: true},
})
Expand All @@ -78,7 +79,7 @@ func (repl *REPL) CommandOpen(arg *rdx.RDX) (rdx.ID, error) {

var err error
repl.Host, err = chotki.Open(dirname, chotki.Options{
Orig: src0.Src(),
Src: src0.Src(),
Options: pebble.Options{ErrorIfNotExists: true},
})
if err != nil {
Expand Down Expand Up @@ -110,10 +111,6 @@ func (repl *REPL) CommandDump(arg *rdx.RDX) (id rdx.ID, err error) {
}

func (repl *REPL) CommandClose(arg *rdx.RDX) (id rdx.ID, err error) {
if repl.tcp != nil {
_ = repl.tcp.Close()
repl.tcp = nil
}
if repl.snap != nil {
_ = repl.snap.Close()
repl.snap = nil
Expand Down Expand Up @@ -322,11 +319,7 @@ func (repl *REPL) CommandListen(arg *rdx.RDX) (id rdx.ID, err error) {
}
addr := rdx.Snative(rdx.Sparse(string(arg.Text)))
if err == nil {
if repl.tcp == nil {
repl.tcp = &toytlv.TCPDepot{}
repl.Host.OpenTCP(repl.tcp)
}
err = repl.tcp.Listen(addr)
err = repl.Host.Listen(context.Background(), addr)
}
return
}
Expand All @@ -339,11 +332,7 @@ func (repl *REPL) CommandConnect(arg *rdx.RDX) (id rdx.ID, err error) {
}
addr := rdx.Snative(rdx.Sparse(string(arg.Text)))
if err == nil {
if repl.tcp == nil {
repl.tcp = &toytlv.TCPDepot{}
repl.Host.OpenTCP(repl.tcp)
}
err = repl.tcp.Connect(addr)
err = repl.Host.Connect(context.Background(), addr)
}
return
}
Expand Down
2 changes: 0 additions & 2 deletions repl/repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/cockroachdb/pebble"
"github.com/drpcorg/chotki"
"github.com/drpcorg/chotki/rdx"
"github.com/drpcorg/chotki/toytlv"
"github.com/ergochat/readline"
"io"
"os"
Expand All @@ -16,7 +15,6 @@ import (
// REPL per se.
type REPL struct {
Host *chotki.Chotki
tcp *toytlv.TCPDepot
rl *readline.Instance
snap pebble.Reader
}
Expand Down
Loading

0 comments on commit f3256b3

Please sign in to comment.