Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
janezpodhostnik committed Nov 13, 2024
1 parent 6c193ad commit fd3f08d
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 104 deletions.
29 changes: 15 additions & 14 deletions cmd/run/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"math/big"
"os"
"os/signal"
"runtime/pprof"
"strings"
"syscall"
"time"
Expand All @@ -29,15 +28,7 @@ import (
var Cmd = &cobra.Command{
Use: "run",
Short: "Runs the EVM Gateway Node",
Run: func(*cobra.Command, []string) {

f, err := os.Create("cpu.pprof")
if err != nil {
log.Fatal().Err(err).Msg("could not create cpu profile")
}
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()

Run: func(command *cobra.Command, _ []string) {
// create multi-key account
if _, exists := os.LookupEnv("MULTIKEY_MODE"); exists {
bootstrap.RunCreateMultiKeyAccount()
Expand All @@ -49,13 +40,15 @@ var Cmd = &cobra.Command{
os.Exit(1)
}

ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(command.Context())
done := make(chan struct{})
ready := make(chan struct{})
go func() {
if err := bootstrap.Run(ctx, cfg, ready); err != nil {
defer close(done)
err := bootstrap.Run(ctx, cfg, ready)
if err != nil {
log.Err(err).Msg("failed to run bootstrap")
cancel()
os.Exit(1)
}
}()

Expand All @@ -64,7 +57,15 @@ var Cmd = &cobra.Command{
osSig := make(chan os.Signal, 1)
signal.Notify(osSig, syscall.SIGINT, syscall.SIGTERM)

<-osSig
select {
case <-osSig:
log.Info().Msg("OS Signal to shutdown received, shutting down")
cancel()
case <-done:
log.Info().Msg("done, shutting down")
cancel()
}

log.Info().Msg("OS Signal to shutdown received, shutting down")
cancel()
},
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/sethvargo/go-retry v0.2.3
github.com/spf13/cobra v1.8.1
github.com/stretchr/testify v1.9.0
go.uber.org/ratelimit v0.3.1
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a
golang.org/x/sync v0.8.0
google.golang.org/grpc v1.63.2
Expand All @@ -35,6 +36,7 @@ require (
github.com/SaveTheRbtz/mph v0.1.1-0.20240117162131-4166ec7869bc // indirect
github.com/StackExchange/wmi v1.2.1 // indirect
github.com/VictoriaMetrics/fastcache v1.12.2 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.10.0 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o=
github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down Expand Up @@ -762,6 +764,8 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/ratelimit v0.3.1 h1:K4qVE+byfv/B3tC+4nYWP7v/6SimcO7HzHekoMNBma0=
go.uber.org/ratelimit v0.3.1/go.mod h1:6euWsTB6U/Nb3X++xEUXA8ciPJvr19Q/0h1+oDcJhRk=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
Expand Down
18 changes: 9 additions & 9 deletions services/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,17 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error {
Int("cadence-event-length", events.Length()).
Msg("received new cadence evm events")

batch := e.store.NewBatch()
defer func(batch *pebbleDB.Batch) {
err := batch.Close()
if err != nil {
e.log.Fatal().Err(err).Msg("failed to close batch")
}
}(batch)

// if heartbeat interval with no data still update the cadence height
if events.Empty() {
if err := e.blocks.SetLatestCadenceHeight(events.CadenceHeight(), nil); err != nil {
if err := e.blocks.SetLatestCadenceHeight(events.CadenceHeight(), batch); err != nil {
return fmt.Errorf(
"failed to update to latest cadence height: %d, during events ingestion: %w",
events.CadenceHeight(),
Expand All @@ -176,14 +184,6 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error {
return nil // nothing else to do this was heartbeat event with not event payloads
}

batch := e.store.NewBatch()
defer func(batch *pebbleDB.Batch) {
err := batch.Close()
if err != nil {
e.log.Fatal().Err(err).Msg("failed to close batch")
}
}(batch)

// Step 1: Re-execute all transactions on the latest EVM block

// Step 1.1: Notify the `BlocksProvider` of the newly received EVM block
Expand Down
150 changes: 75 additions & 75 deletions services/ingestion/event_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"sort"
"time"

"github.com/onflow/cadence/common"
"github.com/onflow/flow-go/fvm/evm/events"
Expand Down Expand Up @@ -198,8 +197,7 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha
// and check for each event it receives whether we reached the end, if we reach the end it will increase
// the height by one (next height), and check if we are still in previous sporks, if so repeat everything,
// otherwise return.
func (r *RPCEventSubscriber) backfill(ctx context.Context, height uint64) <-chan models.BlockEvents {
// TODO(JanezP): if we are backfilling, its more efficient to request events in a batch
func (r *RPCEventSubscriber) backfill(ctx context.Context, currentHeight uint64) <-chan models.BlockEvents {
eventsChan := make(chan models.BlockEvents)

go func() {
Expand All @@ -208,106 +206,108 @@ func (r *RPCEventSubscriber) backfill(ctx context.Context, height uint64) <-chan
}()

for {
// check if the current height is still in past sporks, and if not return since we are done with backfilling
if !r.client.IsPastSpork(height) {
// check if the current currentHeight is still in past sporks, and if not return since we are done with backfilling
if !r.client.IsPastSpork(currentHeight) {
r.logger.Info().
Uint64("height", height).
Uint64("height", currentHeight).
Msg("completed backfilling")

return
}

latestHeight, err := r.client.GetLatestHeightForSpork(ctx, height)
currentHeight, err := r.backfillSpork(ctx, currentHeight, eventsChan)
if err != nil {
r.logger.Error().Err(err).Msg("error backfilling spork")
eventsChan <- models.NewBlockEventsError(err)
return
}

r.logger.Info().
Uint64("start-height", height).
Uint64("last-spork-height", latestHeight).
Msg("backfilling spork")
Uint64("next-height", currentHeight).
Msg("reached the end of spork, checking next spork")
}
}()

ticker := time.NewTicker(time.Millisecond * 10)
return eventsChan
}

maxRange := uint64(249)
for height < latestHeight {
// maxRangeForGetEvents is the maximum range of blocks that can be fetched using the GetEventsForHeightRange method.
const maxRangeForGetEvents = uint64(249)

// TODO: do rate limiting better
<-ticker.C
func (r *RPCEventSubscriber) backfillSpork(ctx context.Context, fromHeight uint64, eventsChan chan<- models.BlockEvents) (uint64, error) {
evmAddress := common.Address(systemcontracts.SystemContractsForChain(r.chain).EVMContract.Address)

startHeight := height
r.logger.Debug().Msg(fmt.Sprintf("backfilling [%d / %d]...", startHeight, latestHeight))
endHeight := height + maxRange
if endHeight > latestHeight {
endHeight = latestHeight
}
lastHeight, err := r.client.GetLatestHeightForSpork(ctx, fromHeight)
if err != nil {
eventsChan <- models.NewBlockEventsError(err)
return 0, err
}

evmAddress := common.Address(systemcontracts.SystemContractsForChain(r.chain).EVMContract.Address)
blockExecutedEvent := common.NewAddressLocation(
nil,
evmAddress,
string(events.EventTypeBlockExecuted),
).ID()

transactionExecutedEvent := common.NewAddressLocation(
nil,
evmAddress,
string(events.EventTypeTransactionExecuted),
).ID()

//
blocks, err := r.client.GetEventsForHeightRange(ctx, blockExecutedEvent, startHeight, endHeight)
if err != nil {
r.logger.Error().Err(err).Msg("failed to get block events")
eventsChan <- models.NewBlockEventsError(err)
return
}
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].Height < blocks[j].Height
})
r.logger.Info().
Uint64("start-height", fromHeight).
Uint64("last-spork-height", lastHeight).
Msg("backfilling spork")

transactions, err := r.client.GetEventsForHeightRange(ctx, transactionExecutedEvent, startHeight, endHeight)
if err != nil {
r.logger.Error().Err(err).Msg("failed to get block events")
eventsChan <- models.NewBlockEventsError(err)
return
}
for fromHeight < lastHeight {
r.logger.Debug().Msg(fmt.Sprintf("backfilling [%d / %d] ...", fromHeight, lastHeight))

sort.Slice(transactions, func(i, j int) bool {
return transactions[i].Height < transactions[j].Height
})
startHeight := fromHeight
endHeight := fromHeight + maxRangeForGetEvents
if endHeight > lastHeight {
endHeight = lastHeight
}

if len(transactions) != len(blocks) {
r.logger.Error().Msg("transactions and blocks have different length")
eventsChan <- models.NewBlockEventsError(err)
return
}
blockExecutedEvent := common.NewAddressLocation(
nil,
evmAddress,
string(events.EventTypeBlockExecuted),
).ID()

for i := range transactions {
if transactions[i].Height != blocks[i].Height {
r.logger.Error().Msg("transactions and blocks have different height")
eventsChan <- models.NewBlockEventsError(err)
return
}
// append the transaction events to the block events
blocks[i].Events = append(blocks[i].Events, transactions[i].Events...)
transactionExecutedEvent := common.NewAddressLocation(
nil,
evmAddress,
string(events.EventTypeTransactionExecuted),
).ID()

evmEvents := models.NewBlockEvents(blocks[i])
height = evmEvents.Events.CadenceHeight() + 1
blocks, err := r.client.GetEventsForHeightRange(ctx, blockExecutedEvent, startHeight, endHeight)
if err != nil {
return 0, fmt.Errorf("failed to get block events: %w", err)
}

}
transactions, err := r.client.GetEventsForHeightRange(ctx, transactionExecutedEvent, startHeight, endHeight)
if err != nil {
return 0, fmt.Errorf("failed to get block events: %w", err)
}

// sort both, just in case
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].Height < blocks[j].Height
})
sort.Slice(transactions, func(i, j int) bool {
return transactions[i].Height < transactions[j].Height
})

if len(transactions) != len(blocks) {
return 0, fmt.Errorf("transactions and blocks have different length")
}

for i := range transactions {
if transactions[i].Height != blocks[i].Height {
return 0, fmt.Errorf("transactions and blocks have different height")
}
ticker.Stop()

r.logger.Info().
Uint64("next-height", height).
Msg("reached the end of spork, checking next spork")
// append the transaction events to the block events
blocks[i].Events = append(blocks[i].Events, transactions[i].Events...)

evmEvents := models.NewBlockEvents(blocks[i])
eventsChan <- evmEvents

// advance the height
fromHeight = evmEvents.Events.CadenceHeight() + 1
}
}()

return eventsChan
}
return fromHeight, nil
}

// fetchMissingData is used as a backup mechanism for fetching EVM-related
Expand Down
30 changes: 24 additions & 6 deletions services/requester/cross-spork_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,35 @@ import (
"fmt"

"github.com/onflow/cadence"
errs "github.com/onflow/flow-evm-gateway/models/errors"
"github.com/onflow/flow-go-sdk"
"github.com/onflow/flow-go-sdk/access"
flowGo "github.com/onflow/flow-go/model/flow"
"github.com/rs/zerolog"
"go.uber.org/ratelimit"
"golang.org/x/exp/slices"

errs "github.com/onflow/flow-evm-gateway/models/errors"
)

type sporkClient struct {
firstHeight uint64
lastHeight uint64
client access.Client
firstHeight uint64
lastHeight uint64
client access.Client
getEventsForHeightRangeLimiter ratelimit.Limiter
}

// contains checks if the provided height is withing the range of available heights
func (s *sporkClient) contains(height uint64) bool {
return height >= s.firstHeight && height <= s.lastHeight
}

func (s *sporkClient) GetEventsForHeightRange(
ctx context.Context, eventType string, startHeight uint64, endHeight uint64,
) ([]flow.BlockEvents, error) {
s.getEventsForHeightRangeLimiter.Take()

return s.client.GetEventsForHeightRange(ctx, eventType, startHeight, endHeight)
}

type sporkClients []*sporkClient

// addSpork will add a new spork host defined by the first and last height boundary in that spork.
Expand All @@ -48,6 +57,8 @@ func (s *sporkClients) add(logger zerolog.Logger, client access.Client) error {
firstHeight: info.NodeRootBlockHeight,
lastHeight: header.Height,
client: client,
// TODO (JanezP): Make this configurable
getEventsForHeightRangeLimiter: ratelimit.New(100, ratelimit.WithoutSlack),
})

// make sure clients are always sorted
Expand Down Expand Up @@ -218,10 +229,17 @@ func (c *CrossSporkClient) SubscribeEventsByBlockHeight(
func (c *CrossSporkClient) GetEventsForHeightRange(
ctx context.Context, eventType string, startHeight uint64, endHeight uint64,
) ([]flow.BlockEvents, error) {
// TODO: also make sure the endHeight is not too high
client, err := c.getClientForHeight(startHeight)
if err != nil {
return nil, err
}
endClient, err := c.getClientForHeight(endHeight)
if err != nil {
return nil, err
}
// there is one client reference per spork, so we can compare the clients
if endClient != client {
return nil, fmt.Errorf("invalid height range, end height %d is not in the same spork as start height %d", endHeight, startHeight)
}
return client.GetEventsForHeightRange(ctx, eventType, startHeight, endHeight)
}
Loading

0 comments on commit fd3f08d

Please sign in to comment.