Skip to content

Commit

Permalink
✨ [watcher/local] Relay progressed events to client
Browse files Browse the repository at this point in the history
Signed-off-by: Manoranjith <[email protected]>
  • Loading branch information
manoranjith committed Sep 16, 2021
1 parent 6373839 commit 4544b58
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 0 deletions.
3 changes: 3 additions & 0 deletions watcher/local/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ func (w *Watcher) handleEventsFromChain(eventsFromChainSub channel.AdjudicatorSu
log.Debug("Registered successfully")
}
}()
case *channel.ProgressedEvent:
log.Debugf("Received progressed event from chain: %v", e)
eventsToClientPubSub.publish(e)
default:
}
}
Expand Down
64 changes: 64 additions & 0 deletions watcher/local/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,22 @@ func Test_Watcher_Working(t *testing.T) {

rs.AssertExpectations(t)
})

t.Run("happy/progressed_event", func(t *testing.T) {
params, txs := randomTxsForSingleCh(rng, 3)
adjSub, trigger := setupAdjudicatorSub(makeProgressedEvent(txs[2])...)

rs := &mocks.RegisterSubscriber{}
rs.On("Subscribe", mock.Anything, mock.Anything).Return(adjSub, nil)
w := newWatcher(t, rs)
// Start watching and publish states.
statesPub, eventsForClient := startWatchingForLedgerChannel(t, w, makeSignedStateWDummySigs(params, txs[0].State))
require.NoError(t, statesPub.Publish(txs[1]))

// Trigger events and assert.
triggerAdjEventAndExpectNotif(t, trigger, eventsForClient)
rs.AssertExpectations(t)
})
})

t.Run("ledger_channel_with_sub_channel", func(t *testing.T) {
Expand Down Expand Up @@ -181,6 +197,7 @@ func Test_Watcher_Working(t *testing.T) {

rs.AssertExpectations(t)
})
// nolint: dupl
t.Run("happy/newer_than_latest_state_registered", func(t *testing.T) {
parentParams, parentTxs := randomTxsForSingleCh(rng, 3)
childParams, childTxs := randomTxsForSingleCh(rng, 3)
Expand Down Expand Up @@ -286,6 +303,37 @@ func Test_Watcher_Working(t *testing.T) {

rs.AssertExpectations(t)
})

// nolint: dupl
t.Run("happy/progressed_event", func(t *testing.T) {
parentParams, parentTxs := randomTxsForSingleCh(rng, 3)
childParams, childTxs := randomTxsForSingleCh(rng, 3)
parentTxs[2].Allocation.Locked = []channel.SubAlloc{{ID: childTxs[0].ID}} // Add sub-channel to allocation.

adjSubParent, triggerParent := setupAdjudicatorSub(makeProgressedEvent(parentTxs[2])...)
adjSubChild, triggerChild := setupAdjudicatorSub(makeProgressedEvent(childTxs[2])...)

rs := &mocks.RegisterSubscriber{}
rs.On("Subscribe", mock.Anything, mock.Anything).Return(adjSubParent, nil).Once()
rs.On("Subscribe", mock.Anything, mock.Anything).Return(adjSubChild, nil).Once()

w := newWatcher(t, rs)
// Parent: Start watching and publish states.
parentSignedState := makeSignedStateWDummySigs(parentParams, parentTxs[0].State)
statesPubParent, eventsForClientParent := startWatchingForLedgerChannel(t, w, parentSignedState)
require.NoError(t, statesPubParent.Publish(parentTxs[1]))

// Child: Start watching and publish states.
childSignedState := makeSignedStateWDummySigs(childParams, childTxs[0].State)
statesPubChild, eventsForClientChild := startWatchingForSubChannel(t, w, childSignedState, parentTxs[0].State.ID)
require.NoError(t, statesPubChild.Publish(childTxs[1]))

// Parent, Child: Trigger events.
triggerAdjEventAndExpectNotif(t, triggerParent, eventsForClientParent)
triggerAdjEventAndExpectNotif(t, triggerChild, eventsForClientChild)

rs.AssertExpectations(t)
})
})
}

Expand Down Expand Up @@ -430,6 +478,22 @@ func makeRegisteredEvent(txs ...channel.Transaction) []channel.AdjudicatorEvent
return events
}

func makeProgressedEvent(txs ...channel.Transaction) []channel.AdjudicatorEvent {
events := make([]channel.AdjudicatorEvent, len(txs))
for i, tx := range txs {
events[i] = &channel.ProgressedEvent{
State: tx.State,
Idx: channel.Index(0),
AdjudicatorEventBase: channel.AdjudicatorEventBase{
IDV: tx.State.ID,
TimeoutV: &channel.ElapsedTimeout{},
VersionV: tx.State.Version,
},
}
}
return events
}

func startWatchingForLedgerChannel(t *testing.T, w *local.Watcher, signedState channel.SignedState) (
watcher.StatesPub, watcher.AdjudicatorSub) {
statesPub, eventsSub, err := w.StartWatchingLedgerChannel(context.TODO(), signedState)
Expand Down

0 comments on commit 4544b58

Please sign in to comment.