Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

script: Add watch command #60

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions any_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ func (t AnyTable) queryIndex(txn ReadTxn, index string, key string) (indexReadTx
return itxn, rawKey, err
}

func (t AnyTable) Changes(txn WriteTxn) (anyChangeIterator, error) {
return t.Meta.anyChanges(txn)
}

func (t AnyTable) TableHeader() []string {
zero := t.Meta.proto()
if tw, ok := zero.(TableWritable); ok {
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ module github.com/cilium/statedb
go 1.23

require (
github.com/cilium/hive v0.0.0-20241009102328-2ab688845f23
github.com/cilium/hive v0.0.0-20241011093954-8df06c41a157
github.com/cilium/stream v0.0.0-20240209152734-a0792b51812d
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/cilium/hive v0.0.0-20241009102328-2ab688845f23 h1:RQSJdQVdxE9puF18G5RGZZi2jhBb2dtA6zI+HHMyY+Y=
github.com/cilium/hive v0.0.0-20241009102328-2ab688845f23/go.mod h1:pI2GJ1n3SLKIQVFrKF7W6A6gb6BQkZ+3Hp4PAEo5SuI=
github.com/cilium/hive v0.0.0-20241011093954-8df06c41a157 h1:8UuDJ7JPPoCaDfZ/WkU/aP3FtNCwdNQe+7fbzP+lZrk=
github.com/cilium/hive v0.0.0-20241011093954-8df06c41a157/go.mod h1:pI2GJ1n3SLKIQVFrKF7W6A6gb6BQkZ+3Hp4PAEo5SuI=
github.com/cilium/stream v0.0.0-20240209152734-a0792b51812d h1:p6MgATaKEB9o7iAsk9rlzXNDMNCeKPAkx4Y8f+Zq8X8=
github.com/cilium/stream v0.0.0-20240209152734-a0792b51812d/go.mod h1:3VLiLgs8wfjirkuYqos4t0IBPQ+sXtf3tFkChLm6ARM=
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
Expand All @@ -24,6 +24,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
Expand Down
163 changes: 158 additions & 5 deletions script.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ import (
"regexp"
"slices"
"strings"
"text/tabwriter"
"time"

"github.com/cilium/hive"
"github.com/cilium/hive/script"
"github.com/liggitt/tabwriter"
"golang.org/x/time/rate"
"gopkg.in/yaml.v3"
)

Expand All @@ -35,6 +36,7 @@ func ScriptCommands(db *DB) hive.ScriptCmdOut {
"prefix": PrefixCmd(db),
"list": ListCmd(db),
"lowerbound": LowerBoundCmd(db),
"watch": WatchCmd(db),
"initialized": InitializedCmd(db),
}
subCmdsList := strings.Join(slices.Collect(maps.Keys(subCmds)), ", ")
Expand Down Expand Up @@ -77,7 +79,7 @@ func TablesCmd(db *DB) script.Cmd {
func(s *script.State, args ...string) (script.WaitFunc, error) {
txn := db.ReadTxn()
tbls := db.GetTables(txn)
w := tabwriter.NewWriter(s.LogWriter(), 5, 4, 3, ' ', 0)
w := newTabWriter(s.LogWriter())
fmt.Fprintf(w, "Name\tObject count\tDeleted objects\tIndexes\tInitializers\tGo type\tLast WriteTxn\n")
for _, tbl := range tbls {
idxs := strings.Join(tbl.Indexes(), ", ")
Expand Down Expand Up @@ -207,7 +209,7 @@ func CompareCmd(db *DB) script.Cmd {
return script.Command(
script.CmdUsage{
Summary: "Compare table",
Args: "table file (-timeout=<dur>) (-grep=<pattern>)",
Args: "(-timeout=<dur>) (-grep=<pattern>) table file",
},
func(s *script.State, args ...string) (script.WaitFunc, error) {
flags := newCmdFlagSet()
Expand Down Expand Up @@ -264,7 +266,7 @@ func CompareCmd(db *DB) script.Cmd {
// Create the diff between 'lines' and the rows in the table.
equal := true
var diff bytes.Buffer
w := tabwriter.NewWriter(&diff, 5, 4, 3, ' ', 0)
w := newTabWriter(&diff)
fmt.Fprintf(w, " %s\n", joinByPositions(columnNames, columnPositions))

objs, watch := tbl.AllWatch(db.ReadTxn())
Expand Down Expand Up @@ -509,6 +511,65 @@ func runQueryCmd(query int, db *DB, s *script.State, args []string) (script.Wait
}, nil
}

func WatchCmd(db *DB) script.Cmd {
return script.Command(
script.CmdUsage{
Summary: "Watch a table for changes",
Args: "table",
},
func(s *script.State, args ...string) (script.WaitFunc, error) {
if len(args) < 1 {
return nil, fmt.Errorf("expected table name")
}

tbl, _, err := getTable(db, args[0])
if err != nil {
return nil, err
}
wtxn := db.WriteTxn(tbl.Meta)
iter, err := tbl.Changes(wtxn)
wtxn.Commit()
if err != nil {
return nil, err
}

header := tbl.TableHeader()
if header == nil {
return nil, fmt.Errorf("objects in table %q not TableWritable", tbl.Meta.Name())
}
tw := newTabWriter(&strikethroughWriter{w: s.LogWriter()})
fmt.Fprintf(tw, "%s\n", strings.Join(header, "\t"))

limiter := rate.NewLimiter(10.0, 1)
for {
if err := limiter.Wait(s.Context()); err != nil {
break
}
changes, watch := iter.nextAny(db.ReadTxn())
for change := range changes {
row := change.Object.(TableWritable).TableRow()
if change.Deleted {
fmt.Fprintf(tw, "%s (deleted)%s", strings.Join(row, "\t"), magicStrikethroughNewline)
} else {
fmt.Fprintf(tw, "%s\n", strings.Join(row, "\t"))
}
}
tw.Flush()
if err := s.FlushLog(); err != nil {
return nil, err
}
select {
case <-watch:
case <-s.Context().Done():
return nil, nil
}
}
return nil, nil

},
)
}

func firstOfSeq2[A, B any](it iter.Seq2[A, B]) iter.Seq2[A, B] {
return func(yield func(a A, b B) bool) {
for a, b := range it {
Expand Down Expand Up @@ -576,7 +637,7 @@ func writeObjects(tbl *AnyTable, it iter.Seq2[any, Revision], w io.Writer, colum
if err != nil {
return err
}
tw := tabwriter.NewWriter(w, 5, 4, 3, ' ', 0)
tw := newTabWriter(w)
fmt.Fprintf(tw, "%s\n", strings.Join(header, "\t"))

for obj := range it {
Expand Down Expand Up @@ -682,3 +743,95 @@ func joinByPositions(row []string, positions []int) string {
}
return w.String()
}

// strikethroughWriter writes a line of text that is striken through
// if the line contains the magic character at the end before \n.
// This is used to strike through a tab-formatted line without messing
// up with the widths of the cells.
type strikethroughWriter struct {
buf []byte
strikethrough bool
w io.Writer
}

var (
// Magic character to use at the end of the line to denote that this should be
// striken through.
// This is to avoid messing up the width calculations in the tab writer, which
// would happen if ANSI codes were used directly.
magicStrikethrough = byte('\xfe')
magicStrikethroughNewline = "\xfe\n"
)

func stripTrailingWhitespace(buf []byte) []byte {
idx := bytes.LastIndexFunc(
buf,
func(r rune) bool {
return r != ' ' && r != '\t'
},
)
if idx > 0 {
return buf[:idx+1]
}
return buf
}

func (s *strikethroughWriter) Write(p []byte) (n int, err error) {
write := func(bs []byte) {
if err == nil {
_, e := s.w.Write(bs)
if e != nil {
err = e
}
}
}
for _, c := range p {
switch c {
case '\n':
s.buf = stripTrailingWhitespace(s.buf)

if s.strikethrough {
write(beginStrikethrough)
write(s.buf)
write(endStrikethrough)
} else {
write(s.buf)
}
write(newline)

s.buf = s.buf[:0] // reset len for reuse.
s.strikethrough = false

if err != nil {
return 0, err
}

case magicStrikethrough:
s.strikethrough = true

default:
s.buf = append(s.buf, c)
}
}
return len(p), nil
}

var (
// Use color red and the strikethrough escape
beginStrikethrough = []byte("\033[9m\033[31m")
pippolo84 marked this conversation as resolved.
Show resolved Hide resolved
endStrikethrough = []byte("\033[0m")
newline = []byte("\n")
)

var _ io.Writer = &strikethroughWriter{}

func newTabWriter(out io.Writer) *tabwriter.Writer {
const (
minWidth = 5
width = 4
padding = 3
padChar = ' '
flags = tabwriter.RememberWidths
)
return tabwriter.NewWriter(out, minWidth, width, padding, padChar, flags)
}
Loading