Skip to content

Commit

Permalink
cleanup chotki.go
Browse files Browse the repository at this point in the history
  • Loading branch information
mrdimidium committed Apr 29, 2024
1 parent 9c0dd68 commit 02ffd7a
Show file tree
Hide file tree
Showing 8 changed files with 339 additions and 365 deletions.
520 changes: 210 additions & 310 deletions chotki.go

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions examples/object_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ func TestObjectExample(t *testing.T) {
assert.Equal(t, decl[2].RdxType, rdx.Integer)

ex := Example{}
i := a.ObjectIterator(oid)
i := a.ObjectIterator(oid, nil)
assert.NotNil(t, i)
err = ex.Load(i)
assert.Nil(t, err)
assert.Equal(t, "Petrov", ex.Name)
assert.Equal(t, int64(42), ex.Score)

i2 := a.ObjectIterator(oid)
i2 := a.ObjectIterator(oid, nil)
assert.NotNil(t, i2)
ex.Score = 44
var changes protocol.Records
Expand All @@ -62,7 +62,7 @@ func TestObjectExample(t *testing.T) {
assert.Equal(t, oid+rdx.ProInc, eid)

ex2 := Example{}
i3 := a.ObjectIterator(oid)
i3 := a.ObjectIterator(oid, nil)
assert.NotNil(t, i3)
err = ex2.Load(i3)
assert.Nil(t, err)
Expand Down Expand Up @@ -100,7 +100,7 @@ func TestObjectExamleWithORM(t *testing.T) {
assert.Nil(t, err)

var exa Example
ita := a.ObjectIterator(rdx.IDFromString("1e-2"))
ita := a.ObjectIterator(rdx.IDFromString("1e-2"), nil)
assert.NotNil(t, ita)
err = exa.Load(ita)
assert.Nil(t, err)
Expand All @@ -120,7 +120,7 @@ func TestObjectExamleWithORM(t *testing.T) {
err = protocol.Pump(&syncera, &syncerb)
assert.Equal(t, io.EOF, err)

itb := b.ObjectIterator(rdx.IDFromString("1e-2"))
itb := b.ObjectIterator(rdx.IDFromString("1e-2"), nil)
assert.NotNil(t, itb)

var exb Example
Expand Down
107 changes: 90 additions & 17 deletions objects.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package chotki

import (
"encoding/binary"
"fmt"
"unicode/utf8"

Expand All @@ -10,27 +11,56 @@ import (
"github.com/pkg/errors"
)

var ErrBadTypeDescription = errors.New("bad type description")
func OKey(id rdx.ID, rdt byte) (key []byte) {
var ret = [16]byte{'O'}
key = binary.BigEndian.AppendUint64(ret[:1], uint64(id))
key = append(key, rdt)
return
}

func hasUnsafeChars(text string) bool {
for _, l := range text {
if l < ' ' {
return true
}
const LidLKeyLen = 1 + 8 + 1

func OKeyIdRdt(key []byte) (id rdx.ID, rdt byte) {
if len(key) != LidLKeyLen {
return rdx.BadId, 0
}
return false

id = rdx.IDFromBytes(key[1 : LidLKeyLen-1])
rdt = key[LidLKeyLen-1]
return
}

var ErrObjectUnknown = errors.New("unknown object")
var ErrTypeUnknown = errors.New("unknown object type")
var ErrUnknownFieldInAType = errors.New("unknown field for the type")
var ErrBadValueForAType = errors.New("bad value for the type")
func VKey(id rdx.ID) (key []byte) {
var ret = [16]byte{'V'}
block := id & ^SyncBlockMask
key = binary.BigEndian.AppendUint64(ret[:1], uint64(block))
key = append(key, 'V')
return
}

func VKeyId(key []byte) rdx.ID {
if len(key) != LidLKeyLen {
return rdx.BadId
}
return rdx.IDFromBytes(key[1:])
}

type Field struct {
Name string
RdxType byte
}

func (f Field) Valid() bool {
for _, l := range f.Name { // has unsafe chars
if l < ' ' {
return false
}
}

return (f.RdxType >= 'A' && f.RdxType <= 'Z' &&
len(f.Name) > 0 && utf8.ValidString(f.Name))
}

type Fields []Field

func (f Fields) Find(name string) (ndx int) {
Expand All @@ -42,11 +72,37 @@ func (f Fields) Find(name string) (ndx int) {
return -1
}

func (f Field) Valid() bool {
return f.RdxType >= 'A' && f.RdxType <= 'Z' && len(f.Name) > 0 && utf8.ValidString(f.Name) && !hasUnsafeChars(f.Name)
func ObjectKeyRange(oid rdx.ID) (fro, til []byte) {
oid = oid & ^rdx.OffMask
return OKey(oid, 'O'), OKey(oid+rdx.ProInc, 0)
}

var ErrBadClass = errors.New("bad class description")
// returns nil for "not found"
func (cho *Chotki) ObjectIterator(oid rdx.ID, snap *pebble.Snapshot) *pebble.Iterator {
fro, til := ObjectKeyRange(oid)
io := pebble.IterOptions{
LowerBound: fro,
UpperBound: til,
}
var it *pebble.Iterator
if snap != nil {
it = snap.NewIter(&io)
} else {
it = cho.db.NewIter(&io)
}

if it.SeekGE(fro) { // fixme
id, rdt := OKeyIdRdt(it.Key())
if rdt == 'O' && id == oid {
// An iterator is returned from a function, it cannot be closed
return it
}
}
if it != nil {
_ = it.Close()
}
return nil
}

// todo []Field -> map[uint64]Field
func (cho *Chotki) ClassFields(cid rdx.ID) (fields Fields, err error) {
Expand Down Expand Up @@ -93,7 +149,7 @@ func (cho *Chotki) ClassFields(cid rdx.ID) (fields Fields, err error) {
}

func (cho *Chotki) ObjectFieldsByClass(oid rdx.ID, form []string) (tid rdx.ID, tlvs protocol.Records, err error) {
it := cho.ObjectIterator(oid)
it := cho.ObjectIterator(oid, nil)
if it == nil {
return rdx.BadId, nil, ErrObjectUnknown
}
Expand All @@ -119,7 +175,7 @@ func (cho *Chotki) ObjectFieldsByClass(oid rdx.ID, form []string) (tid rdx.ID, t
}

func (cho *Chotki) ObjectFields(oid rdx.ID) (tid rdx.ID, decl Fields, fact protocol.Records, err error) {
it := cho.ObjectIterator(oid)
it := cho.ObjectIterator(oid, nil)
if it == nil {
err = ErrObjectUnknown
return
Expand Down Expand Up @@ -150,7 +206,7 @@ func (cho *Chotki) ObjectFields(oid rdx.ID) (tid rdx.ID, decl Fields, fact proto
}

func (cho *Chotki) ObjectFieldsTLV(oid rdx.ID) (tid rdx.ID, tlv protocol.Records, err error) {
it := cho.ObjectIterator(oid)
it := cho.ObjectIterator(oid, nil)
if it == nil {
return rdx.BadId, nil, ErrObjectUnknown
}
Expand Down Expand Up @@ -358,6 +414,23 @@ func (cho *Chotki) ObjectFieldMapTermId(fid rdx.ID) (themap rdx.MapTR, err error
return
}

func (cho *Chotki) GetFieldTLV(id rdx.ID) (rdt byte, tlv []byte) {
key := OKey(id, 'A')
it := cho.db.NewIter(&pebble.IterOptions{
LowerBound: []byte{'O'},
UpperBound: []byte{'P'},
})
defer it.Close()
if it.SeekGE(key) {
fact, r := OKeyIdRdt(it.Key())
if fact == id {
tlv = it.Value()
rdt = r
}
}
return
}

func EditTLV(off uint64, rdt byte, tlv []byte) (edit []byte) {
edit = append(edit, protocol.TinyRecord('F', rdx.ZipUint64(off))...)
edit = append(edit, protocol.Record(rdt, tlv)...)
Expand Down
13 changes: 7 additions & 6 deletions orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ func (orm *ORM) Save(objs ...NativeObject) (err error) {
return ErrObjectUnknown
}
var it *pebble.Iterator
it, err = ObjectIterator(id, orm.Snap)
if err != nil {
it = orm.Host.ObjectIterator(id, orm.Snap)
if it == nil {
err = ErrObjectUnknown
break
}
cid := rdx.IDFromZipBytes(it.Value())
Expand Down Expand Up @@ -159,9 +160,9 @@ func (orm *ORM) UpdateObject(obj NativeObject, snap *pebble.Snapshot) error {
if id == rdx.BadId {
return ErrObjectUnknown
}
it, err := ObjectIterator(id, snap)
if err != nil {
return err
it := orm.Host.ObjectIterator(id, snap)
if it == nil {
return ErrObjectUnknown
}
seq := orm.Snap.Seq()
for it.Next() {
Expand All @@ -175,7 +176,7 @@ func (orm *ORM) UpdateObject(obj NativeObject, snap *pebble.Snapshot) error {
}
}
_ = it.Close()
return err
return nil
}

func (orm *ORM) UpdateAll() (err error) {
Expand Down
18 changes: 9 additions & 9 deletions packets.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (

func (cho *Chotki) UpdateVTree(id, ref rdx.ID, pb *pebble.Batch) (err error) {
v := protocol.Record('V', id.ZipBytes())
err = pb.Merge(VKey(ref), v, &WriteOptions)
err = pb.Merge(VKey(ref), v, &pebbleWriteOptions)
if err == nil {
err = pb.Merge(VKey(rdx.ID0), v, &WriteOptions)
err = pb.Merge(VKey(rdx.ID0), v, &pebbleWriteOptions)
}
return
}
Expand All @@ -27,7 +27,7 @@ func (cho *Chotki) ApplyD(id, ref rdx.ID, body []byte, batch *pebble.Batch) (err
d := rdx.UnzipUint64(dzip)
at := ref + rdx.ID(d) // fixme
rdt, bare, rest = protocol.TakeAny(rest)
err = batch.Merge(OKey(at, rdt), bare, &WriteOptions)
err = batch.Merge(OKey(at, rdt), bare, &pebbleWriteOptions)
}
return
}
Expand All @@ -36,7 +36,7 @@ func (cho *Chotki) ApplyH(id, ref rdx.ID, body []byte, batch *pebble.Batch) (err
_, rest := protocol.Take('M', body)
var vbody []byte
vbody, _ = protocol.Take('V', rest)
err = batch.Merge(VKey(rdx.ID0), vbody, &WriteOptions)
err = batch.Merge(VKey(rdx.ID0), vbody, &pebbleWriteOptions)
return
}

Expand All @@ -51,7 +51,7 @@ func (cho *Chotki) ApplyV(id, ref rdx.ID, body []byte, batch *pebble.Batch) (err
if !rdx.VValid(rec) {
err = ErrBadVPacket
} else {
err = batch.Merge(key, rec, &WriteOptions)
err = batch.Merge(key, rec, &pebbleWriteOptions)
}
}
return
Expand All @@ -65,15 +65,15 @@ func (cho *Chotki) ApplyC(id, ref rdx.ID, body []byte, batch *pebble.Batch) (err
err = batch.Merge(
OKey(id, 'C'),
desc,
&WriteOptions)
&pebbleWriteOptions)
return
}

func (cho *Chotki) ApplyOY(lot byte, id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error) {
err = batch.Merge(
OKey(id, lot),
ref.ZipBytes(),
&WriteOptions)
&pebbleWriteOptions)
rest := body
var fid rdx.ID
for fno := rdx.ID(1); len(rest) > 0 && err == nil; fno++ {
Expand Down Expand Up @@ -103,7 +103,7 @@ func (cho *Chotki) ApplyOY(lot byte, id, ref rdx.ID, body []byte, batch *pebble.
err = batch.Merge(
fkey,
rebar,
&WriteOptions)
&pebbleWriteOptions)
rest = rest[rlen:]
}
if err == nil {
Expand Down Expand Up @@ -144,7 +144,7 @@ func (cho *Chotki) ApplyE(id, r rdx.ID, body []byte, batch *pebble.Batch, calls
err = batch.Merge(
fkey,
rebar,
&WriteOptions)
&pebbleWriteOptions)
hook, ok := cho.hooks.Load(fid)
if ok {
for _, h := range hook {
Expand Down
20 changes: 10 additions & 10 deletions protocol/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (n *Net) Listen(ctx context.Context, addr string) error {
}
n.listens.Store(addr, listener)

n.log.Debug("tlv: listening", "addr", addr)
n.log.Debug("net: listening", "addr", addr)

n.wg.Add(1)
go func() {
Expand Down Expand Up @@ -154,15 +154,15 @@ func (n *Net) KeepConnecting(ctx context.Context, addr string) {

conn, err := n.createConn(ctx, addr)
if err != nil {
n.log.Error("couldn't connect", "addr", addr, "err", err)
n.log.Error("net: couldn't connect", "addr", addr, "err", err)

time.Sleep(connBackoff)
connBackoff = min(MAX_RETRY_PERIOD, connBackoff*2)

continue
}

n.log.Debug("tlv: connected", "addr", addr)
n.log.Debug("net: connected", "addr", addr)

connBackoff = MIN_RETRY_PERIOD
n.keepPeer(ctx, addr, conn)
Expand Down Expand Up @@ -190,12 +190,12 @@ func (n *Net) KeepListening(ctx context.Context, addr string) {
}

// reconnects are the client's problem, just continue
n.log.Error("tlv: couldn't accept request", "addr", addr, "err", err)
n.log.Error("net: couldn't accept request", "addr", addr, "err", err)
continue
}

remoteAddr := conn.RemoteAddr().String()
n.log.Debug("tlv: accept connection", "addr", addr, "remoteAddr", remoteAddr)
n.log.Debug("net: accept connection", "addr", addr, "remoteAddr", remoteAddr)

n.wg.Add(1)
go func() {
Expand All @@ -206,11 +206,11 @@ func (n *Net) KeepListening(ctx context.Context, addr string) {

if l, ok := n.listens.LoadAndDelete(addr); ok {
if err := l.Close(); err != nil && !errors.Is(err, net.ErrClosed) {
n.log.Error("tlv: couldn't correct close listener", "addr", addr, "err", err)
n.log.Error("net: couldn't correct close listener", "addr", addr, "err", err)
}
}

n.log.Debug("tlv: listener closed", "addr", addr)
n.log.Debug("net: listener closed", "addr", addr)
}

func (n *Net) keepPeer(ctx context.Context, addr string, conn net.Conn) {
Expand All @@ -220,13 +220,13 @@ func (n *Net) keepPeer(ctx context.Context, addr string, conn net.Conn) {

readErr, wrireErr, closeErr := peer.Keep(ctx)
if readErr != nil {
n.log.Error("tlv: couldn't read from peer", "addr", addr, "err", readErr)
n.log.Error("net: couldn't read from peer", "addr", addr, "err", readErr)
}
if wrireErr != nil {
n.log.Error("tlv: couldn't write to peer", "addr", addr, "err", wrireErr)
n.log.Error("net: couldn't write to peer", "addr", addr, "err", wrireErr)
}
if closeErr != nil {
n.log.Error("tlv: couldn't correct close peer", "addr", addr, "err", closeErr)
n.log.Error("net: couldn't correct close peer", "addr", addr, "err", closeErr)
}
}

Expand Down
Loading

0 comments on commit 02ffd7a

Please sign in to comment.