Skip to content

Commit

Permalink
✨ [watcher/local] Implement stop watching API
Browse files Browse the repository at this point in the history
- Usage of the API is described in doc comments and all the scenarios
  are covered in tests.

Signed-off-by: Manoranjith <[email protected]>
  • Loading branch information
manoranjith committed Sep 16, 2021
1 parent 4544b58 commit ba8bbb9
Show file tree
Hide file tree
Showing 5 changed files with 357 additions and 24 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.0
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954
go.uber.org/goleak v1.1.11 // indirect
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
)
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,13 @@ github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+
github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
Expand Down Expand Up @@ -457,6 +460,7 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY
golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210220033124-5f55cee0dc0d/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d h1:20cMwl2fHAzkJMEA+8J4JgqBQcQGzbisXo31MIeenXI=
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
Expand Down Expand Up @@ -508,8 +512,10 @@ golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210316164454-77fc1eacc6aa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210324051608-47abb6519492/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210420205809-ac73e9fd8988/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912 h1:uCLL3g5wH2xjxVREVuAbP9JM5PPKjRbXKRa6IBjkzmU=
golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
Expand Down Expand Up @@ -553,6 +559,7 @@ golang.org/x/tools v0.0.0-20191216173652-a0e659d51361/go.mod h1:TB2adYChydJhpapK
golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200108203644-89082a384178/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
4 changes: 4 additions & 0 deletions watcher/internal/mocks/AdjudicatorSubscription.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

160 changes: 136 additions & 24 deletions watcher/local/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package local

import (
"context"
"fmt"
"sync"
"time"

Expand All @@ -33,6 +34,16 @@ const (
)

type (
// closer is the interface that wraps the close method.
closer interface {
close() error
}

// Closer is the interface that wraps the Close method.
Closer interface {
Close() error
}

// Watcher implements a local watcher.
Watcher struct {
rs channel.RegisterSubscriber
Expand All @@ -47,12 +58,19 @@ type (
ch struct {
id channel.ID
params *channel.Params
parent channel.ID

parent channel.ID
subChs map[channel.ID]struct{}
archivedSubChStates map[channel.ID]channel.SignedState

registeredVersion uint64
requestLatestTx chan struct{}
latestTx chan channel.Transaction

eventsFromChainSub Closer
eventsToClientSub closer
statesPubSub closer

subChsAccess sync.Mutex
}
)
Expand Down Expand Up @@ -81,9 +99,15 @@ func (w *Watcher) StartWatchingSubChannel(ctx context.Context, parent channel.ID
if !ok {
return nil, nil, errors.New("parent channel not registered with the watcher")
}

parentCh.subChsAccess.Lock()
defer parentCh.subChsAccess.Unlock()
return w.startWatching(ctx, parent, signedState)
statesPub, eventsSub, err := w.startWatching(ctx, parent, signedState)
if err != nil {
return nil, nil, err
}
parentCh.subChs[signedState.State.ID] = struct{}{}
return statesPub, eventsSub, nil
}

func (w *Watcher) startWatching(ctx context.Context, parent channel.ID, signedState channel.SignedState) (
Expand All @@ -102,7 +126,7 @@ func (w *Watcher) startWatching(ctx context.Context, parent channel.ID, signedSt
}
statesPubSub := newStatesPubSub()
eventsToClientPubSub := newAdjudicatorEventsPubSub()
ch := newCh(id, parent, signedState.Params)
ch := newCh(id, parent, signedState.Params, eventsFromChainSub, eventsToClientPubSub, statesPubSub)

w.registry.addUnsafe(ch)

Expand All @@ -116,14 +140,23 @@ func (w *Watcher) startWatching(ctx context.Context, parent channel.ID, signedSt
return statesPubSub, eventsToClientPubSub, nil
}

func newCh(id, parent channel.ID, params *channel.Params) *ch {
func newCh(id, parent channel.ID, params *channel.Params,
eventsFromChainSub Closer, eventsToClientSub, statesPubSub closer) *ch {
return &ch{
id: id,
params: params,
parent: parent,
id: id,
params: params,

parent: parent,
subChs: make(map[channel.ID]struct{}),
archivedSubChStates: make(map[channel.ID]channel.SignedState),

registeredVersion: 0,
requestLatestTx: make(chan struct{}),
latestTx: make(chan channel.Transaction),

eventsFromChainSub: eventsFromChainSub,
eventsToClientSub: eventsToClientSub,
statesPubSub: statesPubSub,
}
}

Expand All @@ -132,20 +165,21 @@ func (ch *ch) retreiveLatestTx() channel.Transaction {
return <-ch.latestTx
}

func handleStatesFromClient(currentTx channel.Transaction, statesSub statesSub, requestLatestTxn chan struct{},
func handleStatesFromClient(currentTx channel.Transaction, statesSub statesSub, requestLatestTx chan struct{},
latestTx chan channel.Transaction) {
var _tx channel.Transaction
var ok bool
for {
select {
case _tx, ok = <-statesSub.statesStream():
if !ok {
// TODO: Read error.
log.WithField("ID", currentTx.State.ID).Info("States sub closed by client. Shutting down handler")
return
}
currentTx = _tx
log.WithField("ID", currentTx.ID).Debugf("Received state from client", currentTx.Version, currentTx.ID)
case <-requestLatestTxn:
log.WithField("ID", currentTx.ID).Debugf("Received state from client %v", currentTx.State)
case <-requestLatestTx:
currentTx = receiveTxUntil(statesSub, time.NewTimer(statesFromClientWaitTime).C, currentTx)
latestTx <- currentTx
}
Expand All @@ -165,7 +199,7 @@ func receiveTxUntil(statesSub statesSub, timeout <-chan time.Time, currentTx cha
return currentTx // states sub was closed, send the latest event.
}
currentTx = _tx
log.WithField("ID", currentTx.ID).Debugf("Received state from client", currentTx.Version, currentTx.ID)
log.WithField("ID", currentTx.ID).Debugf("Received state from client %v", currentTx.State)
case <-timeout:
return currentTx // timer expired, send the latest the event.
}
Expand All @@ -187,7 +221,7 @@ func (w *Watcher) handleEventsFromChain(eventsFromChainSub channel.AdjudicatorSu
defer parent.subChsAccess.Unlock()

log := log.WithFields(log.Fields{"ID": e.ID(), "Version": e.Version()})
log.Debug("Received registered event from chain")
log.Debugf("Received registered event from chain: %v", e)

eventsToClientPubSub.publish(e)

Expand Down Expand Up @@ -215,15 +249,18 @@ func (w *Watcher) handleEventsFromChain(eventsFromChainSub channel.AdjudicatorSu
default:
}
}

err := eventsFromChainSub.Err()
log := log.WithField("ID", thisCh.id)
log.Errorf("Events from chain sub was closed with error: %v", err)
}

// registerDispute collects the latest transaction for the parent channel and
// each of its children. It then registers dispute for the channel tree.
//
// This function assumes the callers has locked the parent channel.
func registerDispute(r *registry, registerer channel.Registerer, parentCh *ch) error {
parentTx := parentCh.retreiveLatestTx()
subStates := retreiveLatestSubStates(r, parentTx)
parentTx, subStates := retreiveLatestSubStates(r, parentCh)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -234,25 +271,28 @@ func registerDispute(r *registry, registerer channel.Registerer, parentCh *ch) e

parentCh.registeredVersion = parentTx.Version
for i := range subStates {
subCh, _ := r.retrieve(parentTx.Allocation.Locked[i].ID)
subCh.registeredVersion = subStates[i].State.Version
subCh, ok := r.retrieve(parentTx.Allocation.Locked[i].ID)
if ok {
subCh.registeredVersion = subStates[i].State.Version
}
}
return nil
}

func retreiveLatestSubStates(r *registry, parentTx channel.Transaction) []channel.SignedState {
func retreiveLatestSubStates(r *registry, parent *ch) (channel.Transaction, []channel.SignedState) {
parentTx := parent.retreiveLatestTx()
subStates := make([]channel.SignedState, len(parentTx.Allocation.Locked))
for i := range parentTx.Allocation.Locked {
// Can be done concurrently.
subCh, _ := r.retrieve(parentTx.Allocation.Locked[i].ID)
subChTx := subCh.retreiveLatestTx()
subStates[i] = channel.SignedState{
Params: subCh.params,
State: subChTx.State,
Sigs: subChTx.Sigs,
subCh, ok := r.retrieve(parentTx.Allocation.Locked[i].ID)
if ok {
subChTx := subCh.retreiveLatestTx()
subStates[i] = makeSignedState(subCh.params, subChTx)
} else {
subStates[i] = parent.archivedSubChStates[parentTx.Allocation.Locked[i].ID]
}
}
return subStates
return parentTx, subStates
}

func makeAdjudicatorReq(params *channel.Params, tx channel.Transaction) channel.AdjudicatorReq {
Expand All @@ -262,3 +302,75 @@ func makeAdjudicatorReq(params *channel.Params, tx channel.Transaction) channel.
Secondary: false,
}
}

// StopWatching stops watching for adjudicator events, closes the pub-sub
// instances and removes the channel from the registry.
//
// Client should invoke stop watching for all the sub-channels before invoking
// for the parent ledger channel.
//
// In case of stop watching for sub-channels, watcher ensures that, when it
// receives a registered event for its parent channel or any other sub-channels
// of the parent channel, it is able to successfully refute with the latest
// states for the ledger channel and all its sub-channels (even if the watcher
// has stopped watching for some of the sub-channel).
func (w *Watcher) StopWatching(id channel.ID) error {
ch, ok := w.retrieve(id)
if !ok {
return errors.New("channel not registered with the watcher")
}

parent := ch.parent
if parent != channel.Zero { // Sub channel.
parentCh, ok := w.retrieve(parent)
if !ok {
// Code MUST NOT reach this point
return errors.New("Fatal error: parent channel not registered with watcher")
}
parentCh.subChsAccess.Lock()
defer parentCh.subChsAccess.Unlock()
if _, ok := parentCh.retreiveLatestTx().SubAlloc(id); ok {
parentCh.archivedSubChStates[id] = makeSignedState(ch.params, ch.retreiveLatestTx())
}
delete(parentCh.subChs, id)
} else { // Ledger channel.
ch.subChsAccess.Lock()
defer ch.subChsAccess.Unlock()

if len(ch.subChs) > 0 {
return fmt.Errorf("cannot de-register when sub-channels are present: %d %v", len(ch.subChs), ch.id)
}
}

errMsg := closePubSubs(ch)
w.remove(ch.id)

if errMsg != "" {
err := errors.New("Stop Watching errors: " + errMsg)
log.WithField("id", id).Error(err.Error())
return err
}
return nil
}

func closePubSubs(ch *ch) string {
errMsg := ""
if err := ch.eventsFromChainSub.Close(); err != nil {
errMsg += fmt.Sprintf("closing events from chain sub: %v:", err)
}
if err := ch.eventsToClientSub.close(); err != nil {
errMsg += fmt.Sprintf("closing events to client pub-sub: %v:", err)
}
if err := ch.statesPubSub.close(); err != nil {
errMsg += fmt.Sprintf("closing states from client pub-sub: %v:", err)
}
return errMsg
}

func makeSignedState(params *channel.Params, tx channel.Transaction) channel.SignedState {
return channel.SignedState{
Params: params,
State: tx.State,
Sigs: tx.Sigs,
}
}
Loading

0 comments on commit ba8bbb9

Please sign in to comment.