Skip to content

Commit

Permalink
Merge pull request #554 from onflow/gregor/missing-block
Browse files Browse the repository at this point in the history
Event recovery from a missing block
  • Loading branch information
sideninja authored Sep 17, 2024
2 parents 7b75617 + 6dd8b6d commit f3f1838
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 57 deletions.
10 changes: 6 additions & 4 deletions models/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ var (

// General errors

ErrInternal = errors.New("internal error")
ErrInvalid = errors.New("invalid")
ErrRecoverable = errors.New("recoverable")
ErrDisconnected = NewRecoverableError(errors.New("disconnected"))
ErrInternal = errors.New("internal error")
ErrInvalid = errors.New("invalid")
ErrRecoverable = errors.New("recoverable")
ErrDisconnected = NewRecoverableError(errors.New("disconnected"))
ErrMissingBlock = errors.New("missing block")
ErrMissingTransactions = errors.New("missing transactions")

// Transaction errors

Expand Down
13 changes: 10 additions & 3 deletions models/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/onflow/flow-go-sdk"
"github.com/onflow/flow-go/fvm/evm/events"
evmTypes "github.com/onflow/flow-go/fvm/evm/types"

errs "github.com/onflow/flow-evm-gateway/models/errors"
)

// isBlockExecutedEvent checks whether the given event contains block executed data.
Expand Down Expand Up @@ -115,9 +117,13 @@ func decodeCadenceEvents(events flow.BlockEvents) (*CadenceEvents, error) {
}
}

// safety check, we can't have an empty block with transactions
// safety check, we have a missing block in the events
if e.block == nil && len(e.transactions) > 0 {
return nil, fmt.Errorf("EVM block can not be nil if transactions are present, Flow block: %d", events.Height)
return nil, fmt.Errorf(
"%w EVM block nil at flow block: %d",
errs.ErrMissingBlock,
events.Height,
)
}

if e.block != nil {
Expand All @@ -127,7 +133,8 @@ func decodeCadenceEvents(events flow.BlockEvents) (*CadenceEvents, error) {
}
if e.block.TransactionHashRoot != txHashes.RootHash() {
return nil, fmt.Errorf(
"block %d references missing transaction/s",
"%w EVM block %d references missing transaction/s",
errs.ErrMissingTransactions,
e.block.Height,
)
}
Expand Down
116 changes: 79 additions & 37 deletions services/ingestion/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ingestion

import (
"context"
"errors"
"fmt"

"github.com/onflow/cadence/runtime/common"
Expand Down Expand Up @@ -33,6 +34,9 @@ type RPCSubscriber struct {
chain flowGo.ChainID
heartbeatInterval uint64
logger zerolog.Logger

recovery bool
recoveredEvents []flow.Event
}

func NewRPCSubscriber(
Expand Down Expand Up @@ -107,26 +111,26 @@ func (r *RPCSubscriber) Subscribe(ctx context.Context, height uint64) <-chan mod
// Subscribing to EVM specific events and handle any disconnection errors
// as well as context cancellations.
func (r *RPCSubscriber) subscribe(ctx context.Context, height uint64, opts ...access.SubscribeOption) <-chan models.BlockEvents {
events := make(chan models.BlockEvents)
eventsChan := make(chan models.BlockEvents)

_, err := r.client.GetBlockHeaderByHeight(ctx, height)
if err != nil {
err = fmt.Errorf("failed to subscribe for events, the block height %d doesn't exist: %w", height, err)
events <- models.NewBlockEventsError(err)
return events
eventsChan <- models.NewBlockEventsError(err)
return eventsChan
}

evs, errChan, err := r.client.SubscribeEventsByBlockHeight(ctx, height, r.blocksFilter(), opts...)
eventStream, errChan, err := r.client.SubscribeEventsByBlockHeight(ctx, height, r.blocksFilter(), opts...)
if err != nil {
events <- models.NewBlockEventsError(
eventsChan <- models.NewBlockEventsError(
fmt.Errorf("failed to subscribe to events by block height: %d, with: %w", height, err),
)
return events
return eventsChan
}

go func() {
defer func() {
close(events)
close(eventsChan)
}()

for ctx.Err() == nil {
Expand All @@ -135,49 +139,47 @@ func (r *RPCSubscriber) subscribe(ctx context.Context, height uint64, opts ...ac
r.logger.Info().Msg("event ingestion received done signal")
return

case blockEvents, ok := <-evs:
case blockEvents, ok := <-eventStream:
if !ok {
var err error
err = errs.ErrDisconnected
if ctx.Err() != nil {
err = ctx.Err()
}
events <- models.NewBlockEventsError(err)
eventsChan <- models.NewBlockEventsError(err)
return
}

evts := models.NewBlockEvents(blockEvents)
if evts.Err != nil {
r.logger.Warn().Err(err).Msgf(
"failed to parse EVM block events for Flow height: %d, retrying with gRPC API...",
blockEvents.Height,
)
// call the `GetEventsForHeightRange` gRPC API endpoint to fetch
// the EVM-related events, when event streaming returned an
// inconsistent response.
events <- r.fetchBlockEvents(ctx, blockEvents)
} else {
events <- models.NewBlockEvents(blockEvents)
evmEvents := models.NewBlockEvents(blockEvents)
// if events contain an error, or we are in a recovery mode
if evmEvents.Err != nil || r.recovery {
evmEvents = r.recover(ctx, blockEvents, evmEvents.Err)
// if we are still in recovery go to the next event
if r.recovery {
continue
}
}

eventsChan <- evmEvents

case err, ok := <-errChan:
if !ok {
var err error
err = errs.ErrDisconnected
if ctx.Err() != nil {
err = ctx.Err()
}
events <- models.NewBlockEventsError(err)
eventsChan <- models.NewBlockEventsError(err)
return
}

events <- models.NewBlockEventsError(fmt.Errorf("%w: %w", errs.ErrDisconnected, err))
eventsChan <- models.NewBlockEventsError(fmt.Errorf("%w: %w", errs.ErrDisconnected, err))
return
}
}
}()

return events
return eventsChan
}

// backfill will use the provided height and with the client for the provided spork will start backfilling
Expand Down Expand Up @@ -265,22 +267,18 @@ func (r *RPCSubscriber) blocksFilter() flow.EventFilter {
}
}

// fetchBlockEvents is used as a backup mechanism for fetching EVM-related
// fetchMissingData is used as a backup mechanism for fetching EVM-related
// events, when the event streaming API returns an inconsistent response.
// An inconsistent response could be an EVM block that references EVM
// transactions which are not present in the response.
// Under the hood, it uses the `GetEventsForHeightRange` gRPC API endpoint,
// making sure that we receive the expected events length for each event type
// and Flow height.
func (r *RPCSubscriber) fetchBlockEvents(
// transactions which are not present in the response. It falls back
// to using grpc requests instead of streaming.
func (r *RPCSubscriber) fetchMissingData(
ctx context.Context,
blockEvents flow.BlockEvents,
) models.BlockEvents {
blkEvents := flow.BlockEvents{
BlockID: blockEvents.BlockID,
Height: blockEvents.Height,
BlockTimestamp: blockEvents.BlockTimestamp,
}
// remove existing events
blockEvents.Events = nil

for _, eventType := range r.blocksFilter().EventTypes {
recoveredEvents, err := r.client.GetEventsForHeightRange(
ctx,
Expand All @@ -302,8 +300,52 @@ func (r *RPCSubscriber) fetchBlockEvents(
)
}

blkEvents.Events = append(blkEvents.Events, recoveredEvents[0].Events...)
blockEvents.Events = append(blockEvents.Events, recoveredEvents[0].Events...)
}

return models.NewBlockEvents(blockEvents)
}

// accumulateEventsMissingBlock will keep receiving transaction events until it can produce a valid
// EVM block event containing a block and transactions. At that point it will reset the recovery mode
// and return the valid block events.
func (r *RPCSubscriber) accumulateEventsMissingBlock(events flow.BlockEvents) models.BlockEvents {
r.recoveredEvents = append(r.recoveredEvents, events.Events...)
events.Events = r.recoveredEvents

recovered := models.NewBlockEvents(events)
r.recovery = recovered.Err != nil

if !r.recovery {
r.recoveredEvents = nil
}

return recovered
}

// recover tries to recover from an invalid data sent over the event stream.
//
// An invalid data can be a cause of corrupted index or network issue from the source,
// in which case we might miss one of the events (missing transaction), or it can be
// due to a failure from the system transaction which commits an EVM block, which results
// in missing EVM block event but present transactions.
func (r *RPCSubscriber) recover(
ctx context.Context,
events flow.BlockEvents,
err error,
) models.BlockEvents {
r.logger.Warn().Err(err).Msgf(
"failed to parse EVM block events for Flow height: %d, entering recovery",
events.Height,
)

if errors.Is(err, errs.ErrMissingBlock) || r.recovery {
return r.accumulateEventsMissingBlock(events)
}

if errors.Is(err, errs.ErrMissingTransactions) {
return r.fetchMissingData(ctx, events)
}

return models.NewBlockEvents(blkEvents)
return models.NewBlockEventsError(err)
}
88 changes: 88 additions & 0 deletions services/ingestion/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ingestion

import (
"context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -66,6 +67,93 @@ func Test_Subscribing(t *testing.T) {
require.Equal(t, uint64(endHeight), prevHeight)
}

func Test_MissingBlockEvent(t *testing.T) {
const endHeight = uint64(20)
const startHeight = uint64(1)
const missingBlockHeight = uint64(10)
const foundBlockHeight = uint64(15)

currentClient, clientEvents := testutils.SetupClient(startHeight, endHeight)

client, err := requester.NewCrossSporkClient(
currentClient,
nil,
zerolog.New(zerolog.NewTestWriter(t)),
flowGo.Previewnet,
)
require.NoError(t, err)

subscriber := NewRPCSubscriber(client, 100, flowGo.Previewnet, zerolog.Nop())

events := subscriber.Subscribe(context.Background(), 1)

missingHashes := make([]gethCommon.Hash, 0)

go func() {
defer close(clientEvents)

for i := startHeight; i <= endHeight; i++ {
txCdc, txEvent, tx, _, _ := newTransaction(i)
blockCdc, _, blockEvent, _ := newBlock(i, []gethCommon.Hash{tx.Hash()})

if i == foundBlockHeight {
missingHashes = append(missingHashes, tx.Hash())
blockCdc, _, _, _ = newBlock(i, missingHashes)
}

blockEvents := []flow.Event{
{Value: txCdc, Type: string(txEvent.Etype)},
{Value: blockCdc, Type: string(blockEvent.Etype)},
}

if i > missingBlockHeight && i < foundBlockHeight {
blockEvents = blockEvents[:1] // remove block
missingHashes = append(missingHashes, tx.Hash())
}

clientEvents <- flow.BlockEvents{
Height: i,
Events: blockEvents,
}
}
}()

var prevHeight uint64
for ev := range events {
if prevHeight == endHeight {
require.ErrorIs(t, ev.Err, errs.ErrDisconnected)
break
}

require.NoError(t, ev.Err)
block := ev.Events.Block()
require.NotNil(t, block) // make sure all have blocks
// make sure all normal blocks have 1 tx
if block.Height != foundBlockHeight {
require.Len(t, ev.Events.Transactions(), 1)
}
// the block that was missing has all txs
if block.Height == foundBlockHeight {
// the missing block has all the transaction in between when it was missing
require.Len(t, ev.Events.Transactions(), int(foundBlockHeight-missingBlockHeight))
for i, h := range missingHashes {
found := false
for _, tx := range ev.Events.Transactions() {
if h.Cmp(tx.Hash()) == 0 {
found = true
}
}
require.True(t, found, fmt.Sprintf("required hash not found at index %d %s", i, h.String()))
}
}

prevHeight = ev.Events.CadenceHeight()
}

// this makes sure we indexed all the events
require.Equal(t, endHeight, prevHeight)
}

// Test that back-up fetching of EVM events is triggered when the
// Event Streaming API returns an inconsistent response.
// This scenario tests the happy path, when the back-up fetching of
Expand Down
Loading

0 comments on commit f3f1838

Please sign in to comment.