Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement local watcher #182

1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4 h1:Gb2Tyox57NRNuZ2d3rmvB3pcmbu7O1RS3m8WRx7ilrg=
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
Expand Down
72 changes: 72 additions & 0 deletions watcher/internal/mocks/AdjudicatorSubscription.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

67 changes: 67 additions & 0 deletions watcher/internal/mocks/RegisterSubscriber.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions watcher/internal/mocks/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2021 - See NOTICE file for copyright holders.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package mocks contains generated mocks for use in tests.
// These mocks are generated using testify mock framework.
package mocks // import "perun.network/go-perun/watcher/internal/mocks"
80 changes: 80 additions & 0 deletions watcher/local/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,92 @@
package local

import (
"context"
"sync"

"github.com/pkg/errors"

"perun.network/go-perun/channel"
"perun.network/go-perun/watcher"
)

type (
// Watcher implements a local watcher.
Watcher struct {
rs channel.RegisterSubscriber
*registry
}

// Config represents the configuration for initializing a local watcher.
Config struct {
RegisterSubscriber channel.RegisterSubscriber
}

ch struct {
ggwpez marked this conversation as resolved.
Show resolved Hide resolved
id channel.ID
params *channel.Params
parent channel.ID

subChsAccess sync.Mutex
ggwpez marked this conversation as resolved.
Show resolved Hide resolved
}
)

// NewWatcher initializes a local watcher.
//
// Local watcher implements pub-sub interfaces over "go channels".
func NewWatcher(cfg Config) (*Watcher, error) {
w := &Watcher{
rs: cfg.RegisterSubscriber,
registry: newRegistry(),
}
return w, nil
}

// StartWatchingLedgerChannel starts watching for a ledger channel.
func (w *Watcher) StartWatchingLedgerChannel(ctx context.Context, signedState channel.SignedState) (
watcher.StatesPub, watcher.AdjudicatorSub, error) {
return w.startWatching(ctx, channel.Zero, signedState)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the usage channel.Zero. Even though it seems very unlikely that a channel will have ID zero, it would still be better to not make any assumptions about certain ID values.

It would be better to explicitly mark the parent parameter as optional. You could use a pointer for that and check for nil, which is also not super clean but at least it does not require any assumptions about the ID value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have updated the implementation to use nil instead of Channel.Zero See PRs #213 and #214.

}

// StartWatchingSubChannel starts watching for a sub-channel or virtual channel.
func (w *Watcher) StartWatchingSubChannel(ctx context.Context, parent channel.ID, signedState channel.SignedState) (
watcher.StatesPub, watcher.AdjudicatorSub, error) {
parentCh, ok := w.registry.retrieve(parent)
if !ok {
return nil, nil, errors.New("parent channel not registered with the watcher")
}
parentCh.subChsAccess.Lock()
defer parentCh.subChsAccess.Unlock()
return w.startWatching(ctx, parent, signedState)
}

func (w *Watcher) startWatching(ctx context.Context, parent channel.ID, signedState channel.SignedState) (
watcher.StatesPub, watcher.AdjudicatorSub, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The format looks odd. It looks like this is the first line of the function while in fact it is the beginning of the return parameters. If the line gets too long, you can place each parameter on a separate line, like

func (w *Watcher) StartWatchingSubChannel(
	ctx context.Context,
	parent channel.ID,
	signedState channel.SignedState,
) (watcher.StatesPub, watcher.AdjudicatorSub, error) {

I think this is more intuitive to read.

The same comment applies to all functions using that format.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have updated in PR #213.

id := signedState.State.ID
w.registry.lock()
defer w.registry.unlock()

if _, ok := w.registry.retrieveUnsafe(id); ok {
return nil, nil, errors.New("already watching for this channel")
}

_, err := w.rs.Subscribe(ctx, id)
if err != nil {
return nil, nil, errors.WithMessage(err, "subscribing to adjudicator events from blockchain")
}
statesPubSub := newStatesPubSub()
eventsToClientPubSub := newAdjudicatorEventsPubSub()
ch := newCh(id, parent, signedState.Params)

w.registry.addUnsafe(ch)

return statesPubSub, eventsToClientPubSub, nil
}

func newCh(id, parent channel.ID, params *channel.Params) *ch {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this to the file that defines ch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to #213

return &ch{
id: id,
params: params,
parent: parent,
}
}
100 changes: 100 additions & 0 deletions watcher/local/watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2021 - See NOTICE file for copyright holders.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package local_test

import (
"context"
"testing"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

ethChannel "perun.network/go-perun/backend/ethereum/channel"
_ "perun.network/go-perun/backend/ethereum/channel/test" // For initilizing channeltest
"perun.network/go-perun/channel"
channeltest "perun.network/go-perun/channel/test"
"perun.network/go-perun/pkg/test"
"perun.network/go-perun/watcher/internal/mocks"
"perun.network/go-perun/watcher/local"
)

// func init() {
ggwpez marked this conversation as resolved.
Show resolved Hide resolved
// l := logrus.New()
// l.SetLevel(logrus.DebugLevel)
// log.Set(plogrus.FromLogrus(l))
// }

func Test_StartWatching(t *testing.T) {
rng := test.Prng(t)
rs := &mocks.RegisterSubscriber{}
rs.On("Subscribe", mock.Anything, mock.Anything).Return(&ethChannel.RegisteredSub{}, nil)

t.Run("happy/ledger_channel", func(t *testing.T) {
w := newWatcher(t, rs)
params, state := channeltest.NewRandomParamsAndState(rng, channeltest.WithVersion(0))
signedState := makeSignedStateWDummySigs(params, state)

statesPub, eventsSub, err := w.StartWatchingLedgerChannel(context.TODO(), signedState)

require.NoError(t, err)
require.NotNil(t, statesPub)
require.NotNil(t, eventsSub)

_, _, err = w.StartWatchingLedgerChannel(context.TODO(), signedState)
require.Error(t, err, "StartWatching twice for the same channel must error")
})

t.Run("happy/sub_channel", func(t *testing.T) {
w := newWatcher(t, rs)

// Start watching for ledger channel.
parentParams, parentState := channeltest.NewRandomParamsAndState(rng, channeltest.WithVersion(0))
parentSignedState := makeSignedStateWDummySigs(parentParams, parentState)
statesPub, eventsSub, err := w.StartWatchingLedgerChannel(context.TODO(), parentSignedState)
require.NoError(t, err)
require.NotNil(t, statesPub)
require.NotNil(t, eventsSub)

// Start watching for a sub-channel with unknown parent ch id.
childParams, childState := channeltest.NewRandomParamsAndState(rng, channeltest.WithVersion(0))
childSignedState := makeSignedStateWDummySigs(childParams, childState)
randomID := channeltest.NewRandomChannelID(rng)
_, _, err = w.StartWatchingSubChannel(context.TODO(), randomID, childSignedState)
require.Error(t, err, "Start watching for a sub-channel with unknown parent ch id must error")

// Start watching for a sub-channel with known parent ch id.
_, _, err = w.StartWatchingSubChannel(context.TODO(), parentState.ID, childSignedState)
require.NoError(t, err)
require.NotNil(t, statesPub)
require.NotNil(t, eventsSub)

// Repeat Start watching for the sub-channel.
_, _, err = w.StartWatchingSubChannel(context.TODO(), parentState.ID, childSignedState)
require.Error(t, err, "StartWatching twice for the same channel must error")
})
}

func newWatcher(t *testing.T, rs channel.RegisterSubscriber) *local.Watcher {
t.Helper()

w, err := local.NewWatcher(local.Config{RegisterSubscriber: rs})
require.NoError(t, err)
require.NotNil(t, w)
return w
}

func makeSignedStateWDummySigs(params *channel.Params, state *channel.State) channel.SignedState {
return channel.SignedState{Params: params, State: state}
}