diff --git a/Makefile b/Makefile index 6061e7aa..710493d9 100644 --- a/Makefile +++ b/Makefile @@ -15,6 +15,10 @@ check-tidy: go mod tidy git diff --exit-code +.PHONY: fix-lint +fix-lint: + golangci-lint run -v --fix ./... + .PHONY: generate generate: go get -d github.com/vektra/mockery/v2@v2.21.4 @@ -22,6 +26,8 @@ generate: mockery --dir=storage --name=ReceiptIndexer --output=storage/mocks mockery --dir=storage --name=TransactionIndexer --output=storage/mocks mockery --dir=storage --name=AccountIndexer --output=storage/mocks + mockery --dir=storage --name=TraceIndexer --output=storage/mocks + mockery --all --dir=services/traces --output=services/traces/mocks mockery --all --dir=services/ingestion --output=services/ingestion/mocks mockery --dir=models --name=Engine --output=models/mocks diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index d1e88bd7..eff29668 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -19,6 +19,7 @@ import ( "github.com/onflow/flow-evm-gateway/models" "github.com/onflow/flow-evm-gateway/services/ingestion" "github.com/onflow/flow-evm-gateway/services/requester" + "github.com/onflow/flow-evm-gateway/services/traces" "github.com/onflow/flow-evm-gateway/storage" storageErrs "github.com/onflow/flow-evm-gateway/storage/errors" "github.com/onflow/flow-evm-gateway/storage/pebble" @@ -38,19 +39,12 @@ func Start(ctx context.Context, cfg *config.Config) error { transactions := pebble.NewTransactions(pebbleDB) receipts := pebble.NewReceipts(pebbleDB) accounts := pebble.NewAccounts(pebbleDB) + trace := pebble.NewTraces(pebbleDB) blocksBroadcaster := broadcast.NewBroadcaster() transactionsBroadcaster := broadcast.NewBroadcaster() logsBroadcaster := broadcast.NewBroadcaster() - // if database is not initialized require init height - if _, err := blocks.LatestCadenceHeight(); errors.Is(err, storageErrs.ErrNotInitialized) { - if err := blocks.InitHeights(cfg.InitCadenceHeight); err != nil { - return fmt.Errorf("failed to init the database: %w", err) - } - logger.Info().Msg("database initialized with 0 evm and cadence heights") - } - // this should only be used locally or for testing if cfg.ForceStartCadenceHeight != 0 { logger.Warn().Uint64("height", cfg.ForceStartCadenceHeight).Msg("force setting starting Cadence height!!!") @@ -86,6 +80,20 @@ func Start(ctx context.Context, cfg *config.Config) error { return err } + // if database is not initialized require init height + if _, err := blocks.LatestCadenceHeight(); errors.Is(err, storageErrs.ErrNotInitialized) { + cadenceHeight := cfg.InitCadenceHeight + cadenceBlock, err := client.GetBlockHeaderByHeight(ctx, cadenceHeight) + if err != nil { + return err + } + + if err := blocks.InitHeights(cadenceHeight, cadenceBlock.ID); err != nil { + return fmt.Errorf("failed to init the database: %w", err) + } + logger.Info().Msg("database initialized with 0 evm and cadence heights") + } + go func() { err := startServer( ctx, @@ -114,6 +122,7 @@ func Start(ctx context.Context, cfg *config.Config) error { transactions, receipts, accounts, + trace, blocksBroadcaster, transactionsBroadcaster, logsBroadcaster, @@ -134,6 +143,7 @@ func startIngestion( transactions storage.TransactionIndexer, receipts storage.ReceiptIndexer, accounts storage.AccountIndexer, + trace storage.TraceIndexer, blocksBroadcaster *broadcast.Broadcaster, transactionsBroadcaster *broadcast.Broadcaster, logsBroadcaster *broadcast.Broadcaster, @@ -163,8 +173,44 @@ func startIngestion( Uint64("missed-heights", blk.Height-latestCadenceHeight). Msg("indexing cadence height information") - subscriber := ingestion.NewRPCSubscriber(client, cfg.HeartbeatInterval, cfg.FlowNetworkID, logger) - engine := ingestion.NewEventIngestionEngine( + subscriber := ingestion.NewRPCSubscriber( + client, + cfg.HeartbeatInterval, + cfg.FlowNetworkID, + logger, + ) + + if cfg.TracesEnabled { + downloader, err := traces.NewGCPDownloader(cfg.TracesBucketName, logger) + if err != nil { + return err + } + + initHeight, err := blocks.LatestEVMHeight() + if err != nil { + return err + } + tracesEngine := traces.NewTracesIngestionEngine( + initHeight, + blocksBroadcaster, + blocks, + trace, + downloader, + logger, + ) + + go func() { + err = tracesEngine.Run(ctx) + if err != nil { + logger.Error().Err(err).Msg("traces ingestion engine failed to run") + panic(err) + } + }() + + <-tracesEngine.Ready() + } + + eventEngine := ingestion.NewEventIngestionEngine( subscriber, blocks, receipts, @@ -176,17 +222,18 @@ func startIngestion( logger, ) const retries = 15 - restartable := models.NewRestartableEngine(engine, retries, logger) + restartableEventEngine := models.NewRestartableEngine(eventEngine, retries, logger) go func() { - err = restartable.Run(ctx) + err = restartableEventEngine.Run(ctx) if err != nil { - logger.Error().Err(err).Msg("ingestion engine failed to run") + logger.Error().Err(err).Msg("event ingestion engine failed to run") panic(err) } }() - <-restartable.Ready() // wait for engine to be ready + // wait for ingestion engines to be ready + <-restartableEventEngine.Ready() logger.Info().Msg("ingestion start up successful") return nil diff --git a/config/config.go b/config/config.go index cb82968f..3ce6d298 100644 --- a/config/config.go +++ b/config/config.go @@ -78,6 +78,10 @@ type Config struct { ForceStartCadenceHeight uint64 // HeartbeatInterval sets custom heartbeat interval for events HeartbeatInterval uint64 + // TracesBucketName sets the GCP bucket name where transaction traces are being stored. + TracesBucketName string + // TracesEnabled sets whether the node is supporting transaction traces. + TracesEnabled bool } func FromFlags() (*Config, error) { @@ -110,6 +114,7 @@ func FromFlags() (*Config, error) { flag.IntVar(&streamTimeout, "stream-timeout", 3, "Defines the timeout in seconds the server waits for the event to be sent to the client") flag.Uint64Var(&forceStartHeight, "force-start-height", 0, "Force set starting Cadence height. This should only be used locally or for testing, never in production.") flag.StringVar(&filterExpiry, "filter-expiry", "5m", "Filter defines the time it takes for an idle filter to expire") + flag.StringVar(&cfg.TracesBucketName, "traces-gcp-bucket", "", "GCP bucket name where transaction traces are stored") flag.Parse() if coinbase == "" { @@ -212,6 +217,8 @@ func FromFlags() (*Config, error) { cfg.ForceStartCadenceHeight = forceStartHeight } + cfg.TracesEnabled = cfg.TracesBucketName != "" + // todo validate Config values return cfg, nil } diff --git a/go.mod b/go.mod index 664553cb..f0dfbfb3 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/onflow/flow-evm-gateway go 1.22 require ( + cloud.google.com/go/storage v1.36.0 github.com/cockroachdb/pebble v0.0.0-20230928194634-aa077af62593 github.com/goccy/go-json v0.10.2 github.com/hashicorp/golang-lru/v2 v2.0.7 @@ -13,12 +14,17 @@ require ( github.com/rs/cors v1.8.0 github.com/rs/zerolog v1.31.0 github.com/sethvargo/go-limiter v1.0.0 + github.com/sethvargo/go-retry v0.2.3 github.com/stretchr/testify v1.9.0 golang.org/x/exp v0.0.0-20240119083558-1b970713d09a golang.org/x/sync v0.6.0 ) require ( + cloud.google.com/go v0.112.0 // indirect + cloud.google.com/go/compute v1.24.0 // indirect + cloud.google.com/go/compute/metadata v0.2.3 // indirect + cloud.google.com/go/iam v1.1.6 // indirect github.com/DataDog/zstd v1.5.2 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect github.com/SaveTheRbtz/mph v0.1.1-0.20240117162131-4166ec7869bc // indirect @@ -48,6 +54,7 @@ require ( github.com/ef-ds/deque v1.0.4 // indirect github.com/ethereum/c-kzg-4844 v0.4.0 // indirect github.com/ethereum/go-ethereum v1.13.10 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/fxamacker/cbor/v2 v2.4.1-0.20230228173756-c0c9f774e40c // indirect github.com/fxamacker/circlehash v0.3.0 // indirect @@ -60,9 +67,13 @@ require ( github.com/gofrs/flock v0.8.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/glog v1.2.0 // indirect + github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect + github.com/google/s2a-go v0.1.7 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect + github.com/googleapis/gax-go/v2 v2.12.0 // indirect github.com/gorilla/websocket v1.5.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect @@ -138,7 +149,6 @@ require ( github.com/psiemens/sconfig v0.1.0 // indirect github.com/rivo/uniseg v0.4.4 // indirect github.com/rogpeppe/go-internal v1.10.0 // indirect - github.com/sethvargo/go-retry v0.2.3 // indirect github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect github.com/slok/go-http-metrics v0.10.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect @@ -163,6 +173,9 @@ require ( github.com/vmihailenco/tagparser v0.1.1 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/zeebo/blake3 v0.2.3 // indirect + go.opencensus.io v0.24.0 // indirect + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect go.opentelemetry.io/otel v1.24.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 // indirect @@ -176,13 +189,16 @@ require ( golang.org/x/crypto v0.19.0 // indirect golang.org/x/mod v0.14.0 // indirect golang.org/x/net v0.21.0 // indirect + golang.org/x/oauth2 v0.17.0 // indirect golang.org/x/sys v0.17.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.17.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect gonum.org/v1/gonum v0.14.0 // indirect + google.golang.org/api v0.162.0 // indirect google.golang.org/appengine v1.6.8 // indirect + google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect google.golang.org/grpc v1.63.2 // indirect diff --git a/go.sum b/go.sum index db4996b4..3ee833f7 100644 --- a/go.sum +++ b/go.sum @@ -1116,6 +1116,8 @@ github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20230428030218-4003588d1b74/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa h1:jQCWAUqqlij9Pgj2i/PB79y4KOPYVyFYdROxgaCwdTQ= +github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq8dk6e9PdstVsDgu9RuyIIJqAaF//0IM= github.com/cockroachdb/datadriven v1.0.0/go.mod h1:5Ib8Meh+jk1RlHIXej6Pzevx/NLlNvQB9pmSBZErGA4= github.com/cockroachdb/datadriven v1.0.2/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU= github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f h1:otljaYPt5hWxV3MUfO5dFPFiOXg9CyG5/kCfayTqsJ4= @@ -1250,6 +1252,8 @@ github.com/envoyproxy/protoc-gen-validate v0.10.0/go.mod h1:DRjgyB0I43LtJapqN6Ni github.com/envoyproxy/protoc-gen-validate v0.10.1/go.mod h1:DRjgyB0I43LtJapqN6NiRwroiAU2PaFuvk/vjgh61ss= github.com/envoyproxy/protoc-gen-validate v1.0.1/go.mod h1:0vj8bNkYbSTNS2PIyH87KZaeN4x9zpL9Qt8fQC7d+vs= github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE= +github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= +github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw= github.com/ethereum/c-kzg-4844 v0.4.0 h1:3MS1s4JtA868KpJxroZoepdV0ZKBp3u/O5HcZ7R3nlY= github.com/ethereum/c-kzg-4844 v0.4.0/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1aQputP83wc0= @@ -1485,10 +1489,12 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/ github.com/google/gofuzz v1.1.1-0.20200604201612-c04b05f3adfa/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8= github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo= +github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/martian/v3 v3.2.1/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk= +github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw= github.com/google/martian/v3 v3.3.2/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= @@ -1534,7 +1540,6 @@ github.com/googleapis/enterprise-certificate-proxy v0.2.4/go.mod h1:AwSRAtLfXpU5 github.com/googleapis/enterprise-certificate-proxy v0.2.5/go.mod h1:RxW0N9901Cko1VOCW3SXCpWP+mlIEkk2tP7jnHy9a3w= github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs= github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0= -github.com/googleapis/gax-go v0.0.0-20161107002406-da06d194a00e h1:CYRpN206UTHUinz3VJoLaBdy1gEGeJNsqT0mvswDcMw= github.com/googleapis/gax-go v0.0.0-20161107002406-da06d194a00e/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= diff --git a/models/events.go b/models/events.go index 48b84307..4fd13634 100644 --- a/models/events.go +++ b/models/events.go @@ -1,12 +1,13 @@ package models import ( + "strings" + "github.com/onflow/cadence" "github.com/onflow/flow-go-sdk" "github.com/onflow/flow-go/fvm/evm/types" gethTypes "github.com/onflow/go-ethereum/core/types" "golang.org/x/exp/slices" - "strings" ) // isBlockExecutedEvent checks whether the given event contains block executed data. @@ -103,6 +104,11 @@ func (c *CadenceEvents) CadenceHeight() uint64 { return c.events.Height } +// CadenceBlockID returns the Flow Cadence block ID. +func (c *CadenceEvents) CadenceBlockID() flow.Identifier { + return c.events.BlockID +} + // Length of the Cadence events emitted. func (c *CadenceEvents) Length() int { return len(c.events.Events) diff --git a/models/mocks/Engine.go b/models/mocks/Engine.go index 2127b294..2954041b 100644 --- a/models/mocks/Engine.go +++ b/models/mocks/Engine.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.38.0. DO NOT EDIT. +// Code generated by mockery v2.21.4. DO NOT EDIT. package mocks @@ -17,10 +17,6 @@ type Engine struct { func (_m *Engine) Done() <-chan struct{} { ret := _m.Called() - if len(ret) == 0 { - panic("no return value specified for Done") - } - var r0 <-chan struct{} if rf, ok := ret.Get(0).(func() <-chan struct{}); ok { r0 = rf() @@ -37,10 +33,6 @@ func (_m *Engine) Done() <-chan struct{} { func (_m *Engine) Ready() <-chan struct{} { ret := _m.Called() - if len(ret) == 0 { - panic("no return value specified for Ready") - } - var r0 <-chan struct{} if rf, ok := ret.Get(0).(func() <-chan struct{}); ok { r0 = rf() @@ -57,10 +49,6 @@ func (_m *Engine) Ready() <-chan struct{} { func (_m *Engine) Run(ctx context.Context) error { ret := _m.Called(ctx) - if len(ret) == 0 { - panic("no return value specified for Run") - } - var r0 error if rf, ok := ret.Get(0).(func(context.Context) error); ok { r0 = rf(ctx) @@ -76,12 +64,13 @@ func (_m *Engine) Stop() { _m.Called() } -// NewEngine creates a new instance of Engine. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewEngine(t interface { +type mockConstructorTestingTNewEngine interface { mock.TestingT Cleanup(func()) -}) *Engine { +} + +// NewEngine creates a new instance of Engine. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewEngine(t mockConstructorTestingTNewEngine) *Engine { mock := &Engine{} mock.Mock.Test(t) diff --git a/services/ingestion/engine.go b/services/ingestion/engine.go index a193462c..4985824b 100644 --- a/services/ingestion/engine.go +++ b/services/ingestion/engine.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/onflow/flow-go-sdk" "github.com/onflow/flow-go/engine" "github.com/onflow/flow-go/fvm/evm/types" gethTypes "github.com/onflow/go-ethereum/core/types" @@ -144,7 +145,7 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error { return err } for _, block := range blocks { - if err := e.indexBlock(events.CadenceHeight(), block); err != nil { + if err := e.indexBlock(events.CadenceHeight(), events.CadenceBlockID(), block); err != nil { return err } } @@ -162,7 +163,7 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error { return nil } -func (e *Engine) indexBlock(cadenceHeight uint64, block *types.Block) error { +func (e *Engine) indexBlock(cadenceHeight uint64, cadenceID flow.Identifier, block *types.Block) error { if block == nil { // safety check shouldn't happen return fmt.Errorf("can't process empty block") } @@ -185,12 +186,13 @@ func (e *Engine) indexBlock(cadenceHeight uint64, block *types.Block) error { Str("hash", blockHash.Hex()). Uint64("evm-height", block.Height). Uint64("cadence-height", cadenceHeight). + Str("cadence-id", cadenceID.String()). Str("parent-hash", block.ParentBlockHash.String()). Strs("tx-hashes", txHashes). Msg("new evm block executed event") // todo should probably be batch in the same as bellow tx - if err := e.blocks.Store(cadenceHeight, block); err != nil { + if err := e.blocks.Store(cadenceHeight, cadenceID, block); err != nil { return err } diff --git a/services/ingestion/engine_test.go b/services/ingestion/engine_test.go index 23273663..1b524cfa 100644 --- a/services/ingestion/engine_test.go +++ b/services/ingestion/engine_test.go @@ -3,15 +3,16 @@ package ingestion import ( "context" "encoding/hex" - "github.com/onflow/flow-evm-gateway/services/ingestion/mocks" "math/big" "testing" + "github.com/onflow/flow-evm-gateway/services/ingestion/mocks" + "github.com/onflow/cadence" "github.com/onflow/cadence/runtime/common" + "github.com/onflow/flow-evm-gateway/models" - storageMock "github.com/onflow/flow-evm-gateway/storage/mocks" "github.com/onflow/flow-go-sdk" broadcast "github.com/onflow/flow-go/engine" "github.com/onflow/flow-go/fvm/evm/types" @@ -21,6 +22,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + + storageMock "github.com/onflow/flow-evm-gateway/storage/mocks" ) func TestSerialBlockIngestion(t *testing.T) { @@ -78,8 +81,8 @@ func TestSerialBlockIngestion(t *testing.T) { require.NoError(t, err) blocks. - On("Store", mock.AnythingOfType("uint64"), mock.AnythingOfType("*types.Block")). - Return(func(h uint64, storeBlock *types.Block) error { + On("Store", mock.AnythingOfType("uint64"), mock.Anything, mock.AnythingOfType("*types.Block")). + Return(func(h uint64, id flow.Identifier, storeBlock *types.Block) error { assert.Equal(t, block, storeBlock) assert.Equal(t, cadenceHeight, h) storedCounter++ @@ -154,8 +157,8 @@ func TestSerialBlockIngestion(t *testing.T) { require.NoError(t, err) blocks. - On("Store", mock.AnythingOfType("uint64"), mock.AnythingOfType("*types.Block")). - Return(func(h uint64, storeBlock *types.Block) error { + On("Store", mock.AnythingOfType("uint64"), mock.Anything, mock.AnythingOfType("*types.Block")). + Return(func(h uint64, id flow.Identifier, storeBlock *types.Block) error { assert.Equal(t, block, storeBlock) assert.Equal(t, cadenceHeight, h) return nil @@ -198,6 +201,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) { transactions := &storageMock.TransactionIndexer{} latestHeight := uint64(10) nextHeight := latestHeight + 1 + blockID := flow.Identifier{0x01} blocks := &storageMock.BlockIndexer{} blocks. @@ -252,9 +256,10 @@ func TestBlockAndTransactionIngestion(t *testing.T) { }() blocks. - On("Store", mock.AnythingOfType("uint64"), mock.AnythingOfType("*types.Block")). - Return(func(h uint64, storeBlock *types.Block) error { + On("Store", mock.AnythingOfType("uint64"), mock.Anything, mock.AnythingOfType("*types.Block")). + Return(func(h uint64, id flow.Identifier, storeBlock *types.Block) error { assert.Equal(t, block, storeBlock) + assert.Equal(t, blockID, id) assert.Equal(t, nextHeight, h) return nil }). @@ -289,7 +294,8 @@ func TestBlockAndTransactionIngestion(t *testing.T) { Type: string(txEvent.Etype), Value: txCdc, }}, - Height: nextHeight, + Height: nextHeight, + BlockID: blockID, }) close(eventsChan) @@ -350,8 +356,8 @@ func TestBlockAndTransactionIngestion(t *testing.T) { blocksFirst := false // flag indicating we stored block first blocks. - On("Store", mock.AnythingOfType("uint64"), mock.AnythingOfType("*types.Block")). - Return(func(h uint64, storeBlock *types.Block) error { + On("Store", mock.AnythingOfType("uint64"), mock.Anything, mock.AnythingOfType("*types.Block")). + Return(func(h uint64, id flow.Identifier, storeBlock *types.Block) error { blocksFirst = true return nil }). @@ -451,8 +457,8 @@ func TestBlockAndTransactionIngestion(t *testing.T) { // add new block for each height blocks. - On("Store", mock.AnythingOfType("uint64"), mock.AnythingOfType("*types.Block")). - Return(func(h uint64, storeBlock *types.Block) error { + On("Store", mock.AnythingOfType("uint64"), mock.Anything, mock.AnythingOfType("*types.Block")). + Return(func(h uint64, id flow.Identifier, storeBlock *types.Block) error { assert.Equal(t, block, storeBlock) assert.Equal(t, evmHeight, block.Height) assert.Equal(t, latestCadenceHeight+1, h) diff --git a/services/ingestion/mocks/EventSubscriber.go b/services/ingestion/mocks/EventSubscriber.go index 6a42a2e7..1c72b85c 100644 --- a/services/ingestion/mocks/EventSubscriber.go +++ b/services/ingestion/mocks/EventSubscriber.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.38.0. DO NOT EDIT. +// Code generated by mockery v2.21.4. DO NOT EDIT. package mocks @@ -19,10 +19,6 @@ type EventSubscriber struct { func (_m *EventSubscriber) Subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents { ret := _m.Called(ctx, height) - if len(ret) == 0 { - panic("no return value specified for Subscribe") - } - var r0 <-chan models.BlockEvents if rf, ok := ret.Get(0).(func(context.Context, uint64) <-chan models.BlockEvents); ok { r0 = rf(ctx, height) @@ -35,12 +31,13 @@ func (_m *EventSubscriber) Subscribe(ctx context.Context, height uint64) <-chan return r0 } -// NewEventSubscriber creates a new instance of EventSubscriber. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewEventSubscriber(t interface { +type mockConstructorTestingTNewEventSubscriber interface { mock.TestingT Cleanup(func()) -}) *EventSubscriber { +} + +// NewEventSubscriber creates a new instance of EventSubscriber. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewEventSubscriber(t mockConstructorTestingTNewEventSubscriber) *EventSubscriber { mock := &EventSubscriber{} mock.Mock.Test(t) diff --git a/services/traces/downloader.go b/services/traces/downloader.go new file mode 100644 index 00000000..fe46e21b --- /dev/null +++ b/services/traces/downloader.go @@ -0,0 +1,82 @@ +package traces + +import ( + "context" + "encoding/json" + "fmt" + "io" + "time" + + "cloud.google.com/go/storage" + "github.com/onflow/flow-go-sdk" + "github.com/onflow/go-ethereum/common" + "github.com/rs/zerolog" +) + +const downloadTimeout = 5 * time.Minute + +type Downloader interface { + // Download traces or returning an error with the failure + Download(txID common.Hash, blockIO flow.Identifier) (json.RawMessage, error) +} + +var _ Downloader = &GCPDownloader{} + +type GCPDownloader struct { + client *storage.Client + logger zerolog.Logger + bucket *storage.BucketHandle +} + +func NewGCPDownloader(bucketName string, logger zerolog.Logger) (*GCPDownloader, error) { + if bucketName == "" { + return nil, fmt.Errorf("must provide bucket name") + } + + ctx := context.Background() + client, err := storage.NewClient(ctx) + if err != nil { + return nil, fmt.Errorf("storage.NewClient: %w", err) + } + + bucket := client.Bucket(bucketName) + // try accessing buckets to validate settings + if _, err = bucket.Attrs(ctx); err != nil { + return nil, fmt.Errorf("error accessing bucket: %s, make sure bucket exists: %w", bucketName, err) + } + + return &GCPDownloader{ + client: client, + logger: logger, + bucket: bucket, + }, nil +} + +func (g *GCPDownloader) Download(txID common.Hash, blockID flow.Identifier) (json.RawMessage, error) { + l := g.logger.With(). + Str("tx-id", txID.String()). + Str("cadence-block-id", blockID.String()). + Logger() + + l.Debug().Msg("downloading transaction trace") + + ctx, cancel := context.WithTimeout(context.Background(), downloadTimeout) + defer cancel() + + id := fmt.Sprintf("%s-%s", blockID.String(), txID.String()) + + rc, err := g.bucket.Object(id).NewReader(ctx) + if err != nil { + return nil, fmt.Errorf("failed to download id %s: %w", id, err) + } + defer rc.Close() + + trace, err := io.ReadAll(rc) + if err != nil { + return nil, fmt.Errorf("failed to read trace id %s: %w", id, err) + } + + l.Info().Int("trace-size", len(trace)).Msg("transaction trace downloaded") + + return trace, nil +} diff --git a/services/traces/engine.go b/services/traces/engine.go new file mode 100644 index 00000000..ae024e66 --- /dev/null +++ b/services/traces/engine.go @@ -0,0 +1,139 @@ +package traces + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/onflow/flow-go-sdk" + "github.com/onflow/flow-go/engine" + "github.com/onflow/flow-go/fvm/evm/types" + gethCommon "github.com/onflow/go-ethereum/common" + "github.com/rs/zerolog" + "github.com/sethvargo/go-retry" + + "github.com/onflow/flow-evm-gateway/models" + "github.com/onflow/flow-evm-gateway/storage" +) + +var _ models.Engine = &Engine{} + +type Engine struct { + logger zerolog.Logger + status *models.EngineStatus + blocksBroadcaster *engine.Broadcaster + blocks storage.BlockIndexer + traces storage.TraceIndexer + downloader Downloader + currentHeight *atomic.Uint64 +} + +func NewTracesIngestionEngine( + initEVMHeight uint64, + blocksBroadcaster *engine.Broadcaster, + blocks storage.BlockIndexer, + traces storage.TraceIndexer, + downloader Downloader, + logger zerolog.Logger, +) *Engine { + height := &atomic.Uint64{} + height.Store(initEVMHeight) + + return &Engine{ + status: models.NewEngineStatus(), + logger: logger.With().Str("component", "trace-ingestion").Logger(), + currentHeight: height, + blocksBroadcaster: blocksBroadcaster, + blocks: blocks, + traces: traces, + downloader: downloader, + } +} + +func (e *Engine) Run(ctx context.Context) error { + // subscribe to new blocks + e.blocksBroadcaster.Subscribe(e) + + e.status.MarkReady() + return nil +} + +// Notify is a handler that is being used to subscribe for new EVM block notifications. +// This method should be non-blocking. +func (e *Engine) Notify() { + // proceed indexing the next height + height := e.currentHeight.Add(1) + + l := e.logger.With().Uint64("evm-height", height).Logger() + + block, err := e.blocks.GetByHeight(height) + if err != nil { + l.Error().Err(err).Msg("failed to get block") + return + } + + cadenceID, err := e.blocks.GetCadenceID(height) + if err != nil { + l.Error().Err(err).Msg("failed to get cadence block ID") + return + } + + go e.indexBlockTraces(block, cadenceID) +} + +// indexBlockTraces iterates the block transaction hashes and tries to download the traces +func (e *Engine) indexBlockTraces(evmBlock *types.Block, cadenceBlockID flow.Identifier) { + ctx, cancel := context.WithTimeout(context.Background(), downloadTimeout) + defer cancel() + + const maxConcurrentDownloads = 5 // limit number of concurrent downloads + limiter := make(chan struct{}, maxConcurrentDownloads) + + wg := sync.WaitGroup{} + + for _, h := range evmBlock.TransactionHashes { + wg.Add(1) + limiter <- struct{}{} // acquire a slot + + go func(h gethCommon.Hash) { + defer wg.Done() + defer func() { <-limiter }() // release a slot after done + + l := e.logger.With(). + Str("tx-id", h.String()). + Str("cadence-block-id", cadenceBlockID.String()). + Logger() + + err := retry.Fibonacci(ctx, time.Second*1, func(ctx context.Context) error { + trace, err := e.downloader.Download(h, cadenceBlockID) + if err != nil { + l.Warn().Err(err).Msg("retrying failed download") + return retry.RetryableError(err) + } + + return e.traces.StoreTransaction(h, trace) + }) + + if err != nil { + l.Error().Err(err).Msg("failed to download trace") + return + } + l.Info().Msg("trace downloaded successfully") + }(h) + } + + wg.Wait() +} + +func (e *Engine) Stop() { + e.status.MarkStopped() +} + +func (e *Engine) Done() <-chan struct{} { + return e.status.IsDone() +} + +func (e *Engine) Ready() <-chan struct{} { + return e.status.IsReady() +} diff --git a/services/traces/engine_test.go b/services/traces/engine_test.go new file mode 100644 index 00000000..33fff570 --- /dev/null +++ b/services/traces/engine_test.go @@ -0,0 +1,275 @@ +package traces + +import ( + "context" + "encoding/json" + "fmt" + "slices" + "strings" + "testing" + "time" + + "github.com/onflow/flow-go-sdk" + broadcast "github.com/onflow/flow-go/engine" + "github.com/onflow/flow-go/fvm/evm/types" + gethCommon "github.com/onflow/go-ethereum/common" + "github.com/rs/zerolog" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-evm-gateway/services/traces/mocks" + storageMock "github.com/onflow/flow-evm-gateway/storage/mocks" +) + +// this test makes sure once a notification for a new block is triggered +// the block transaction hashes are iterated, and for each a trace is +// downloaded and stored. +func TestTraceIngestion(t *testing.T) { + t.Run("successful single block ingestion", func(t *testing.T) { + blockBroadcaster := broadcast.NewBroadcaster() + blocks := &storageMock.BlockIndexer{} + trace := &storageMock.TraceIndexer{} + downloader := &mocks.Downloader{} + + txTrace := func(id gethCommon.Hash) json.RawMessage { + return json.RawMessage(fmt.Sprintf(`{ + "id": "%s", + "from":"0x42fdd562221741a1db62a0f69a5a680367f07e33", + "gas":"0x15f900", + "gasUsed":"0x387dc", + "to":"0xca11bde05977b3631167028862be2a173976ca11" + }`, id.String())) + } + + latestHeight := uint64(0) + blockID := flow.Identifier{0x09} + hashes := []gethCommon.Hash{{0x1}, {0x2}, {0x3}} + + blocks. + On("GetByHeight", mock.Anything). + Return(func(height uint64) (*types.Block, error) { + require.Equal(t, latestHeight+1, height) // make sure it gets next block + block := storageMock.NewBlock(height) + block.TransactionHashes = hashes + return block, nil + }) + + blocks. + On("GetCadenceID", mock.Anything). + Return(func(height uint64) (flow.Identifier, error) { + require.Equal(t, latestHeight+1, height) + return blockID, nil + }) + + downloadedHashes := make(map[gethCommon.Hash]struct{}) + downloader. + On("Download", mock.Anything, mock.Anything). + Return(func(txID gethCommon.Hash, blkID flow.Identifier) (json.RawMessage, error) { + require.Equal(t, blockID, blkID) + downloadedHashes[txID] = struct{}{} + time.Sleep(time.Millisecond * 200) // simulate download delay + return txTrace(txID), nil + }) + + stored := make(chan gethCommon.Hash, len(hashes)) + trace. + On("StoreTransaction", mock.Anything, mock.Anything). + Return(func(ID gethCommon.Hash, trace json.RawMessage) error { + require.Equal(t, txTrace(ID), trace) + stored <- ID + return nil + }) + + engine := NewTracesIngestionEngine(latestHeight, blockBroadcaster, blocks, trace, downloader, zerolog.Nop()) + + err := engine.Run(context.Background()) + require.NoError(t, err) + + blockBroadcaster.Publish() + + // make sure stored was called as many times as block contained hashes + require.Eventuallyf(t, func() bool { + return len(stored) == len(hashes) + }, time.Second, time.Millisecond*50, "index not run") + + close(stored) + storedHashes := make([]string, 0) + for h := range stored { + storedHashes = append(storedHashes, h.String()) + } + + // make sure we stored all the hashes in the block + for _, h := range hashes { + require.True(t, slices.Contains(storedHashes, h.String())) + } + }) + + t.Run("successful multiple blocks ingestion", func(t *testing.T) { + blockBroadcaster := broadcast.NewBroadcaster() + blocks := &storageMock.BlockIndexer{} + trace := &storageMock.TraceIndexer{} + downloader := &mocks.Downloader{} + + txTrace := func(id gethCommon.Hash) json.RawMessage { + return json.RawMessage(fmt.Sprintf(`{ + "id": "%s", + "from":"0x42fdd562221741a1db62a0f69a5a680367f07e33", + "gas":"0x15f900", + "gasUsed":"0x387dc", + "to":"0xca11bde05977b3631167028862be2a173976ca11" + }`, id.String())) + } + + latestHeight := uint64(0) + + const blockCount = 10 + const txCount = 50 + + // generate mock blocks, each with mock transactions + mockBlocks := make([]*types.Block, blockCount+1) + mockCadenceIDs := make([]flow.Identifier, blockCount+1) + + for i := range mockBlocks { + b := storageMock.NewBlock(uint64(i)) + cid := flow.Identifier{byte(i + 10)} + + h := make([]gethCommon.Hash, txCount) + for j := range h { + h[j] = gethCommon.Hash{byte(j), byte(i)} + } + + b.TransactionHashes = h + mockBlocks[i] = b + mockCadenceIDs[i] = cid + } + + blocks. + On("GetByHeight", mock.Anything). + Return(func(height uint64) (*types.Block, error) { + latestHeight++ + require.Equal(t, latestHeight, height) // make sure it gets next block + require.Less(t, int(height), len(mockBlocks)) + return mockBlocks[height], nil + }) + + blocks. + On("GetCadenceID", mock.Anything). + Return(func(height uint64) (flow.Identifier, error) { + require.Equal(t, latestHeight, height) + require.Less(t, int(height), len(mockCadenceIDs)) + return mockCadenceIDs[height], nil + }) + + downloadedIDs := make(chan string, blockCount*txCount) + downloader. + On("Download", mock.Anything, mock.Anything). + Return(func(txID gethCommon.Hash, blkID flow.Identifier) (json.RawMessage, error) { + id := fmt.Sprintf("%s-%s", blkID.String(), txID.String()) + downloadedIDs <- id + time.Sleep(time.Millisecond * 200) // simulate download delay + return txTrace(txID), nil + }) + + stored := make(chan gethCommon.Hash, blockCount*txCount) + trace. + On("StoreTransaction", mock.Anything, mock.Anything). + Return(func(ID gethCommon.Hash, trace json.RawMessage) error { + require.Equal(t, txTrace(ID), trace) + stored <- ID + return nil + }) + + engine := NewTracesIngestionEngine(latestHeight, blockBroadcaster, blocks, trace, downloader, zerolog.Nop()) + + err := engine.Run(context.Background()) + require.NoError(t, err) + + for i := 0; i < blockCount; i++ { + blockBroadcaster.Publish() + time.Sleep(time.Millisecond * 100) // simulate block delay + } + + // make sure download was called as many times as all blocks times the hashes it contained + require.Eventuallyf(t, func() bool { + return len(downloadedIDs) == blockCount*txCount + }, time.Second*10, time.Millisecond*100, "traces not downloaded") + + close(downloadedIDs) + + // make sure stored was called as many times as all blocks times the hashes it contained + require.Eventuallyf(t, func() bool { + return len(stored) == blockCount*txCount + }, time.Second*10, time.Millisecond*100, "traces not indexed") + + close(stored) + + // make sure we downloaded and indexed all the hashes in the block + for id := range downloadedIDs { + found := false + for _, b := range mockBlocks { + for _, h := range b.TransactionHashes { + txID := strings.Split(id, "-")[1] + if txID == h.String() { + found = true + break + } + } + if found { + break + } + } + require.True(t, found, fmt.Sprintf("id %s not found", id)) + } + }) + + t.Run("failed download retries", func(t *testing.T) { + blockBroadcaster := broadcast.NewBroadcaster() + blocks := &storageMock.BlockIndexer{} + downloader := &mocks.Downloader{} + trace := &storageMock.TraceIndexer{} + logger := zerolog.New(zerolog.NewTestWriter(t)) + + latestHeight := uint64(0) + blockID := flow.Identifier{0x09} + hashes := []gethCommon.Hash{{0x1}} + + blocks. + On("GetByHeight", mock.Anything). + Return(func(height uint64) (*types.Block, error) { + require.Equal(t, latestHeight+1, height) // make sure it gets next block + block := storageMock.NewBlock(height) + block.TransactionHashes = hashes + return block, nil + }) + + blocks. + On("GetCadenceID", mock.Anything). + Return(func(height uint64) (flow.Identifier, error) { + require.Equal(t, latestHeight+1, height) + return blockID, nil + }) + + const retriesNum = 3 + downloads := make(chan struct{}, retriesNum) + downloader. + On("Download", mock.Anything, mock.Anything). + Return(func(txID gethCommon.Hash, blkID flow.Identifier) (json.RawMessage, error) { + downloads <- struct{}{} + return nil, fmt.Errorf("failed download") + }) + + engine := NewTracesIngestionEngine(latestHeight, blockBroadcaster, blocks, trace, downloader, logger) + + err := engine.Run(context.Background()) + require.NoError(t, err) + + blockBroadcaster.Publish() + + // make sure stored was called as many times as block contained hashes + require.Eventuallyf(t, func() bool { + return len(downloads) == retriesNum + }, time.Second*10, time.Millisecond*200, "download not retried") + + close(downloads) + }) +} diff --git a/services/traces/mocks/Downloader.go b/services/traces/mocks/Downloader.go new file mode 100644 index 00000000..a9e08614 --- /dev/null +++ b/services/traces/mocks/Downloader.go @@ -0,0 +1,58 @@ +// Code generated by mockery v2.21.4. DO NOT EDIT. + +package mocks + +import ( + flow "github.com/onflow/flow-go-sdk" + common "github.com/onflow/go-ethereum/common" + + json "encoding/json" + + mock "github.com/stretchr/testify/mock" +) + +// Downloader is an autogenerated mock type for the Downloader type +type Downloader struct { + mock.Mock +} + +// Download provides a mock function with given fields: txID, blockIO +func (_m *Downloader) Download(txID common.Hash, blockIO flow.Identifier) (json.RawMessage, error) { + ret := _m.Called(txID, blockIO) + + var r0 json.RawMessage + var r1 error + if rf, ok := ret.Get(0).(func(common.Hash, flow.Identifier) (json.RawMessage, error)); ok { + return rf(txID, blockIO) + } + if rf, ok := ret.Get(0).(func(common.Hash, flow.Identifier) json.RawMessage); ok { + r0 = rf(txID, blockIO) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(json.RawMessage) + } + } + + if rf, ok := ret.Get(1).(func(common.Hash, flow.Identifier) error); ok { + r1 = rf(txID, blockIO) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +type mockConstructorTestingTNewDownloader interface { + mock.TestingT + Cleanup(func()) +} + +// NewDownloader creates a new instance of Downloader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewDownloader(t mockConstructorTestingTNewDownloader) *Downloader { + mock := &Downloader{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/storage/index.go b/storage/index.go index 0fae0a58..78f3e3f9 100644 --- a/storage/index.go +++ b/storage/index.go @@ -3,17 +3,20 @@ package storage import ( "math/big" - "github.com/onflow/flow-evm-gateway/models" + "github.com/goccy/go-json" + "github.com/onflow/flow-go-sdk" "github.com/onflow/flow-go/fvm/evm/types" "github.com/onflow/go-ethereum/common" gethTypes "github.com/onflow/go-ethereum/core/types" + + "github.com/onflow/flow-evm-gateway/models" ) type BlockIndexer interface { - // Store provided EVM block with the matching Cadence height. + // Store provided EVM block with the matching Cadence height and Cadence Block ID. // Expected errors: // - errors.Duplicate if the block already exists - Store(cadenceHeight uint64, block *types.Block) error + Store(cadenceHeight uint64, cadenceID flow.Identifier, block *types.Block) error // GetByHeight returns an EVM block stored by EVM height. // Expected errors: @@ -48,6 +51,13 @@ type BlockIndexer interface { // to the Cadence height. // - errors.NotFound if the height is not found GetCadenceHeight(evmHeight uint64) (uint64, error) + + // GetCadenceID returns the Cadence block ID that matches the + // provided EVM height. Each EVM block indexed contains a link to the + // Cadence block ID. Multiple EVM heights can point to the same + // Cadence block ID. + // - errors.NotFound if the height is not found + GetCadenceID(evmHeight uint64) (flow.Identifier, error) } type ReceiptIndexer interface { @@ -97,3 +107,10 @@ type AccountIndexer interface { // GetBalance gets an account balance. If no balance was indexed it returns 0. GetBalance(address *common.Address) (*big.Int, error) } + +type TraceIndexer interface { + // StoreTransaction will index transaction trace by the transaction ID. + StoreTransaction(ID common.Hash, trace json.RawMessage) error + // GetTransaction will retrieve transaction trace by the transaction ID. + GetTransaction(ID common.Hash) (json.RawMessage, error) +} diff --git a/storage/index_testsuite.go b/storage/index_testsuite.go index da073b95..e46b145e 100644 --- a/storage/index_testsuite.go +++ b/storage/index_testsuite.go @@ -4,14 +4,19 @@ import ( "fmt" "math/big" - "github.com/onflow/flow-evm-gateway/models" - "github.com/onflow/flow-evm-gateway/storage/errors" - "github.com/onflow/flow-evm-gateway/storage/mocks" + "github.com/goccy/go-json" evmEmulator "github.com/onflow/flow-go/fvm/evm/emulator" + goTypes "github.com/onflow/flow-go/fvm/evm/types" + + "github.com/onflow/flow-go-sdk" "github.com/onflow/go-ethereum/common" "github.com/onflow/go-ethereum/core/types" "github.com/onflow/go-ethereum/crypto" "github.com/stretchr/testify/suite" + + "github.com/onflow/flow-evm-gateway/models" + "github.com/onflow/flow-evm-gateway/storage/errors" + "github.com/onflow/flow-evm-gateway/storage/mocks" ) type BlockTestSuite struct { @@ -22,8 +27,9 @@ type BlockTestSuite struct { func (b *BlockTestSuite) TestGet() { b.Run("existing block", func() { height := uint64(1) + flowID := flow.Identifier{0x01} block := mocks.NewBlock(height) - err := b.Blocks.Store(height+1, block) + err := b.Blocks.Store(height+1, flowID, block) b.Require().NoError(err) ID, err := block.Hash() @@ -55,17 +61,18 @@ func (b *BlockTestSuite) TestStore() { block := mocks.NewBlock(10) b.Run("success", func() { - err := b.Blocks.Store(2, block) + flowID := flow.Identifier{0x01} + err := b.Blocks.Store(2, flowID, block) b.Require().NoError(err) // we allow overwriting blocks to make the actions idempotent - err = b.Blocks.Store(2, block) + err = b.Blocks.Store(2, flowID, block) b.Require().NoError(err) }) b.Run("store multiple blocks, and get one", func() { for i := 0; i < 10; i++ { - err := b.Blocks.Store(uint64(i+5), mocks.NewBlock(uint64(10+i))) + err := b.Blocks.Store(uint64(i+5), flow.Identifier{byte(i)}, mocks.NewBlock(uint64(10+i))) b.Require().NoError(err) } @@ -85,19 +92,43 @@ func (b *BlockTestSuite) TestHeights() { b.Run("last EVM height", func() { for i := 0; i < 5; i++ { lastHeight := uint64(100 + i) - err := b.Blocks.Store(lastHeight+10, mocks.NewBlock(lastHeight)) + err := b.Blocks.Store(lastHeight+10, flow.Identifier{byte(i)}, mocks.NewBlock(lastHeight)) b.Require().NoError(err) last, err := b.Blocks.LatestEVMHeight() b.Require().NoError(err) b.Require().Equal(lastHeight, last) + + last, err = b.Blocks.LatestEVMHeight() // second time it should get it from cache + b.Require().NoError(err) + b.Require().Equal(lastHeight, last) + } + }) + + b.Run("get height by ID", func() { + evmHeights := []uint64{10, 11, 12, 13} + cadenceIDs := []flow.Identifier{{0x01}, {0x02}, {0x03}, {0x04}} + blocks := make([]*goTypes.Block, 4) + + for i, evmHeight := range evmHeights { + blocks[i] = mocks.NewBlock(evmHeight) + err := b.Blocks.Store(uint64(i), cadenceIDs[i], blocks[i]) + b.Require().NoError(err) + } + + for i := range evmHeights { + id, err := blocks[i].Hash() + b.Require().NoError(err) + evm, err := b.Blocks.GetHeightByID(id) + b.Require().NoError(err) + b.Assert().Equal(evmHeights[i], evm) } }) b.Run("last Cadence height", func() { for i := 0; i < 5; i++ { lastHeight := uint64(100 + i) - err := b.Blocks.Store(lastHeight, mocks.NewBlock(lastHeight-10)) + err := b.Blocks.Store(lastHeight, flow.Identifier{byte(i)}, mocks.NewBlock(lastHeight-10)) b.Require().NoError(err) last, err := b.Blocks.LatestCadenceHeight() @@ -110,7 +141,7 @@ func (b *BlockTestSuite) TestHeights() { evmHeights := []uint64{10, 11, 12, 13} cadenceHeights := []uint64{20, 24, 26, 27} for i, evmHeight := range evmHeights { - err := b.Blocks.Store(cadenceHeights[i], mocks.NewBlock(evmHeight)) + err := b.Blocks.Store(cadenceHeights[i], flow.Identifier{byte(i)}, mocks.NewBlock(evmHeight)) b.Require().NoError(err) } @@ -120,6 +151,21 @@ func (b *BlockTestSuite) TestHeights() { b.Assert().Equal(cadenceHeights[i], cadence) } }) + + b.Run("Cadence ID from EVM height", func() { + evmHeights := []uint64{10, 11, 12, 13} + cadenceIDs := []flow.Identifier{{0x01}, {0x02}, {0x03}, {0x04}} + for i, evmHeight := range evmHeights { + err := b.Blocks.Store(uint64(i), cadenceIDs[i], mocks.NewBlock(evmHeight)) + b.Require().NoError(err) + } + + for i, evmHeight := range evmHeights { + cadence, err := b.Blocks.GetCadenceID(evmHeight) + b.Require().NoError(err) + b.Assert().Equal(cadenceIDs[i], cadence) + } + }) } type ReceiptTestSuite struct { @@ -411,3 +457,47 @@ func (a *AccountTestSuite) TestNonce() { } }) } + +type TraceTestSuite struct { + suite.Suite + TraceIndexer TraceIndexer +} + +func (s *TraceTestSuite) TestStore() { + s.Run("store new trace", func() { + id := common.Hash{0x01} + trace := json.RawMessage(`{ "test": "foo" }`) + err := s.TraceIndexer.StoreTransaction(id, trace) + s.Require().NoError(err) + }) + + s.Run("overwrite existing trace", func() { + for i := 0; i < 2; i++ { + id := common.Hash{0x01} + trace := json.RawMessage(`{ "test": "foo" }`) + err := s.TraceIndexer.StoreTransaction(id, trace) + s.Require().NoError(err) + } + }) +} + +func (s *TraceTestSuite) TestGet() { + s.Run("get existing trace", func() { + id := common.Hash{0x01} + trace := json.RawMessage(`{ "test": "foo" }`) + + err := s.TraceIndexer.StoreTransaction(id, trace) + s.Require().NoError(err) + + val, err := s.TraceIndexer.GetTransaction(id) + s.Require().NoError(err) + s.Require().Equal(trace, val) + }) + + s.Run("get not found trace", func() { + id := common.Hash{0x02} + val, err := s.TraceIndexer.GetTransaction(id) + s.Require().ErrorIs(err, errors.ErrNotFound) + s.Require().Nil(val) + }) +} diff --git a/storage/mocks/AccountIndexer.go b/storage/mocks/AccountIndexer.go index e949b1ee..fe241914 100644 --- a/storage/mocks/AccountIndexer.go +++ b/storage/mocks/AccountIndexer.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.38.0. DO NOT EDIT. +// Code generated by mockery v2.21.4. DO NOT EDIT. package mocks @@ -22,10 +22,6 @@ type AccountIndexer struct { func (_m *AccountIndexer) GetBalance(address *common.Address) (*big.Int, error) { ret := _m.Called(address) - if len(ret) == 0 { - panic("no return value specified for GetBalance") - } - var r0 *big.Int var r1 error if rf, ok := ret.Get(0).(func(*common.Address) (*big.Int, error)); ok { @@ -52,10 +48,6 @@ func (_m *AccountIndexer) GetBalance(address *common.Address) (*big.Int, error) func (_m *AccountIndexer) GetNonce(address *common.Address) (uint64, error) { ret := _m.Called(address) - if len(ret) == 0 { - panic("no return value specified for GetNonce") - } - var r0 uint64 var r1 error if rf, ok := ret.Get(0).(func(*common.Address) (uint64, error)); ok { @@ -80,10 +72,6 @@ func (_m *AccountIndexer) GetNonce(address *common.Address) (uint64, error) { func (_m *AccountIndexer) Update(tx models.Transaction, receipt *types.Receipt) error { ret := _m.Called(tx, receipt) - if len(ret) == 0 { - panic("no return value specified for Update") - } - var r0 error if rf, ok := ret.Get(0).(func(models.Transaction, *types.Receipt) error); ok { r0 = rf(tx, receipt) @@ -94,12 +82,13 @@ func (_m *AccountIndexer) Update(tx models.Transaction, receipt *types.Receipt) return r0 } -// NewAccountIndexer creates a new instance of AccountIndexer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewAccountIndexer(t interface { +type mockConstructorTestingTNewAccountIndexer interface { mock.TestingT Cleanup(func()) -}) *AccountIndexer { +} + +// NewAccountIndexer creates a new instance of AccountIndexer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewAccountIndexer(t mockConstructorTestingTNewAccountIndexer) *AccountIndexer { mock := &AccountIndexer{} mock.Mock.Test(t) diff --git a/storage/mocks/BlockIndexer.go b/storage/mocks/BlockIndexer.go index 9a77ee5f..6f6dba45 100644 --- a/storage/mocks/BlockIndexer.go +++ b/storage/mocks/BlockIndexer.go @@ -1,9 +1,11 @@ -// Code generated by mockery v2.38.0. DO NOT EDIT. +// Code generated by mockery v2.21.4. DO NOT EDIT. package mocks import ( + flow "github.com/onflow/flow-go-sdk" common "github.com/onflow/go-ethereum/common" + mock "github.com/stretchr/testify/mock" types "github.com/onflow/flow-go/fvm/evm/types" @@ -18,10 +20,6 @@ type BlockIndexer struct { func (_m *BlockIndexer) GetByHeight(height uint64) (*types.Block, error) { ret := _m.Called(height) - if len(ret) == 0 { - panic("no return value specified for GetByHeight") - } - var r0 *types.Block var r1 error if rf, ok := ret.Get(0).(func(uint64) (*types.Block, error)); ok { @@ -48,10 +46,6 @@ func (_m *BlockIndexer) GetByHeight(height uint64) (*types.Block, error) { func (_m *BlockIndexer) GetByID(ID common.Hash) (*types.Block, error) { ret := _m.Called(ID) - if len(ret) == 0 { - panic("no return value specified for GetByID") - } - var r0 *types.Block var r1 error if rf, ok := ret.Get(0).(func(common.Hash) (*types.Block, error)); ok { @@ -78,10 +72,6 @@ func (_m *BlockIndexer) GetByID(ID common.Hash) (*types.Block, error) { func (_m *BlockIndexer) GetCadenceHeight(evmHeight uint64) (uint64, error) { ret := _m.Called(evmHeight) - if len(ret) == 0 { - panic("no return value specified for GetCadenceHeight") - } - var r0 uint64 var r1 error if rf, ok := ret.Get(0).(func(uint64) (uint64, error)); ok { @@ -102,14 +92,36 @@ func (_m *BlockIndexer) GetCadenceHeight(evmHeight uint64) (uint64, error) { return r0, r1 } +// GetCadenceID provides a mock function with given fields: evmHeight +func (_m *BlockIndexer) GetCadenceID(evmHeight uint64) (flow.Identifier, error) { + ret := _m.Called(evmHeight) + + var r0 flow.Identifier + var r1 error + if rf, ok := ret.Get(0).(func(uint64) (flow.Identifier, error)); ok { + return rf(evmHeight) + } + if rf, ok := ret.Get(0).(func(uint64) flow.Identifier); ok { + r0 = rf(evmHeight) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(flow.Identifier) + } + } + + if rf, ok := ret.Get(1).(func(uint64) error); ok { + r1 = rf(evmHeight) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetHeightByID provides a mock function with given fields: ID func (_m *BlockIndexer) GetHeightByID(ID common.Hash) (uint64, error) { ret := _m.Called(ID) - if len(ret) == 0 { - panic("no return value specified for GetHeightByID") - } - var r0 uint64 var r1 error if rf, ok := ret.Get(0).(func(common.Hash) (uint64, error)); ok { @@ -134,10 +146,6 @@ func (_m *BlockIndexer) GetHeightByID(ID common.Hash) (uint64, error) { func (_m *BlockIndexer) LatestCadenceHeight() (uint64, error) { ret := _m.Called() - if len(ret) == 0 { - panic("no return value specified for LatestCadenceHeight") - } - var r0 uint64 var r1 error if rf, ok := ret.Get(0).(func() (uint64, error)); ok { @@ -162,10 +170,6 @@ func (_m *BlockIndexer) LatestCadenceHeight() (uint64, error) { func (_m *BlockIndexer) LatestEVMHeight() (uint64, error) { ret := _m.Called() - if len(ret) == 0 { - panic("no return value specified for LatestEVMHeight") - } - var r0 uint64 var r1 error if rf, ok := ret.Get(0).(func() (uint64, error)); ok { @@ -190,10 +194,6 @@ func (_m *BlockIndexer) LatestEVMHeight() (uint64, error) { func (_m *BlockIndexer) SetLatestCadenceHeight(height uint64) error { ret := _m.Called(height) - if len(ret) == 0 { - panic("no return value specified for SetLatestCadenceHeight") - } - var r0 error if rf, ok := ret.Get(0).(func(uint64) error); ok { r0 = rf(height) @@ -204,17 +204,13 @@ func (_m *BlockIndexer) SetLatestCadenceHeight(height uint64) error { return r0 } -// Store provides a mock function with given fields: cadenceHeight, block -func (_m *BlockIndexer) Store(cadenceHeight uint64, block *types.Block) error { - ret := _m.Called(cadenceHeight, block) - - if len(ret) == 0 { - panic("no return value specified for Store") - } +// Store provides a mock function with given fields: cadenceHeight, cadenceID, block +func (_m *BlockIndexer) Store(cadenceHeight uint64, cadenceID flow.Identifier, block *types.Block) error { + ret := _m.Called(cadenceHeight, cadenceID, block) var r0 error - if rf, ok := ret.Get(0).(func(uint64, *types.Block) error); ok { - r0 = rf(cadenceHeight, block) + if rf, ok := ret.Get(0).(func(uint64, flow.Identifier, *types.Block) error); ok { + r0 = rf(cadenceHeight, cadenceID, block) } else { r0 = ret.Error(0) } @@ -222,12 +218,13 @@ func (_m *BlockIndexer) Store(cadenceHeight uint64, block *types.Block) error { return r0 } -// NewBlockIndexer creates a new instance of BlockIndexer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewBlockIndexer(t interface { +type mockConstructorTestingTNewBlockIndexer interface { mock.TestingT Cleanup(func()) -}) *BlockIndexer { +} + +// NewBlockIndexer creates a new instance of BlockIndexer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewBlockIndexer(t mockConstructorTestingTNewBlockIndexer) *BlockIndexer { mock := &BlockIndexer{} mock.Mock.Test(t) diff --git a/storage/mocks/ReceiptIndexer.go b/storage/mocks/ReceiptIndexer.go index 1f94595f..638b369b 100644 --- a/storage/mocks/ReceiptIndexer.go +++ b/storage/mocks/ReceiptIndexer.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.38.0. DO NOT EDIT. +// Code generated by mockery v2.21.4. DO NOT EDIT. package mocks @@ -20,10 +20,6 @@ type ReceiptIndexer struct { func (_m *ReceiptIndexer) BloomsForBlockRange(start *big.Int, end *big.Int) ([]*types.Bloom, []*big.Int, error) { ret := _m.Called(start, end) - if len(ret) == 0 { - panic("no return value specified for BloomsForBlockRange") - } - var r0 []*types.Bloom var r1 []*big.Int var r2 error @@ -59,10 +55,6 @@ func (_m *ReceiptIndexer) BloomsForBlockRange(start *big.Int, end *big.Int) ([]* func (_m *ReceiptIndexer) GetByBlockHeight(height *big.Int) ([]*types.Receipt, error) { ret := _m.Called(height) - if len(ret) == 0 { - panic("no return value specified for GetByBlockHeight") - } - var r0 []*types.Receipt var r1 error if rf, ok := ret.Get(0).(func(*big.Int) ([]*types.Receipt, error)); ok { @@ -89,10 +81,6 @@ func (_m *ReceiptIndexer) GetByBlockHeight(height *big.Int) ([]*types.Receipt, e func (_m *ReceiptIndexer) GetByTransactionID(ID common.Hash) (*types.Receipt, error) { ret := _m.Called(ID) - if len(ret) == 0 { - panic("no return value specified for GetByTransactionID") - } - var r0 *types.Receipt var r1 error if rf, ok := ret.Get(0).(func(common.Hash) (*types.Receipt, error)); ok { @@ -119,10 +107,6 @@ func (_m *ReceiptIndexer) GetByTransactionID(ID common.Hash) (*types.Receipt, er func (_m *ReceiptIndexer) Store(receipt *types.Receipt) error { ret := _m.Called(receipt) - if len(ret) == 0 { - panic("no return value specified for Store") - } - var r0 error if rf, ok := ret.Get(0).(func(*types.Receipt) error); ok { r0 = rf(receipt) @@ -133,12 +117,13 @@ func (_m *ReceiptIndexer) Store(receipt *types.Receipt) error { return r0 } -// NewReceiptIndexer creates a new instance of ReceiptIndexer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewReceiptIndexer(t interface { +type mockConstructorTestingTNewReceiptIndexer interface { mock.TestingT Cleanup(func()) -}) *ReceiptIndexer { +} + +// NewReceiptIndexer creates a new instance of ReceiptIndexer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewReceiptIndexer(t mockConstructorTestingTNewReceiptIndexer) *ReceiptIndexer { mock := &ReceiptIndexer{} mock.Mock.Test(t) diff --git a/storage/mocks/TraceIndexer.go b/storage/mocks/TraceIndexer.go new file mode 100644 index 00000000..c063844c --- /dev/null +++ b/storage/mocks/TraceIndexer.go @@ -0,0 +1,71 @@ +// Code generated by mockery v2.21.4. DO NOT EDIT. + +package mocks + +import ( + json "encoding/json" + + common "github.com/onflow/go-ethereum/common" + + mock "github.com/stretchr/testify/mock" +) + +// TraceIndexer is an autogenerated mock type for the TraceIndexer type +type TraceIndexer struct { + mock.Mock +} + +// GetTransaction provides a mock function with given fields: ID +func (_m *TraceIndexer) GetTransaction(ID common.Hash) (json.RawMessage, error) { + ret := _m.Called(ID) + + var r0 json.RawMessage + var r1 error + if rf, ok := ret.Get(0).(func(common.Hash) (json.RawMessage, error)); ok { + return rf(ID) + } + if rf, ok := ret.Get(0).(func(common.Hash) json.RawMessage); ok { + r0 = rf(ID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(json.RawMessage) + } + } + + if rf, ok := ret.Get(1).(func(common.Hash) error); ok { + r1 = rf(ID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// StoreTransaction provides a mock function with given fields: ID, trace +func (_m *TraceIndexer) StoreTransaction(ID common.Hash, trace json.RawMessage) error { + ret := _m.Called(ID, trace) + + var r0 error + if rf, ok := ret.Get(0).(func(common.Hash, json.RawMessage) error); ok { + r0 = rf(ID, trace) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +type mockConstructorTestingTNewTraceIndexer interface { + mock.TestingT + Cleanup(func()) +} + +// NewTraceIndexer creates a new instance of TraceIndexer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewTraceIndexer(t mockConstructorTestingTNewTraceIndexer) *TraceIndexer { + mock := &TraceIndexer{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/storage/mocks/TransactionIndexer.go b/storage/mocks/TransactionIndexer.go index f3d23e8a..0273b005 100644 --- a/storage/mocks/TransactionIndexer.go +++ b/storage/mocks/TransactionIndexer.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.38.0. DO NOT EDIT. +// Code generated by mockery v2.21.4. DO NOT EDIT. package mocks @@ -18,10 +18,6 @@ type TransactionIndexer struct { func (_m *TransactionIndexer) Get(ID common.Hash) (models.Transaction, error) { ret := _m.Called(ID) - if len(ret) == 0 { - panic("no return value specified for Get") - } - var r0 models.Transaction var r1 error if rf, ok := ret.Get(0).(func(common.Hash) (models.Transaction, error)); ok { @@ -48,10 +44,6 @@ func (_m *TransactionIndexer) Get(ID common.Hash) (models.Transaction, error) { func (_m *TransactionIndexer) Store(tx models.Transaction) error { ret := _m.Called(tx) - if len(ret) == 0 { - panic("no return value specified for Store") - } - var r0 error if rf, ok := ret.Get(0).(func(models.Transaction) error); ok { r0 = rf(tx) @@ -62,12 +54,13 @@ func (_m *TransactionIndexer) Store(tx models.Transaction) error { return r0 } -// NewTransactionIndexer creates a new instance of TransactionIndexer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewTransactionIndexer(t interface { +type mockConstructorTestingTNewTransactionIndexer interface { mock.TestingT Cleanup(func()) -}) *TransactionIndexer { +} + +// NewTransactionIndexer creates a new instance of TransactionIndexer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewTransactionIndexer(t mockConstructorTestingTNewTransactionIndexer) *TransactionIndexer { mock := &TransactionIndexer{} mock.Mock.Test(t) diff --git a/storage/pebble/blocks.go b/storage/pebble/blocks.go index f03ebf32..adbd2c56 100644 --- a/storage/pebble/blocks.go +++ b/storage/pebble/blocks.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/cockroachdb/pebble" + "github.com/onflow/flow-go-sdk" "github.com/onflow/flow-go/fvm/evm/types" "github.com/onflow/go-ethereum/common" @@ -31,7 +32,7 @@ func NewBlocks(store *Storage) *Blocks { } } -func (b *Blocks) Store(cadenceHeight uint64, block *types.Block) error { +func (b *Blocks) Store(cadenceHeight uint64, cadenceID flow.Identifier, block *types.Block) error { b.mux.Lock() defer b.mux.Unlock() @@ -60,11 +61,16 @@ func (b *Blocks) Store(cadenceHeight uint64, block *types.Block) error { return fmt.Errorf("failed to store block: %w", err) } - // set latest height + // set mapping of evm height to cadence block height if err := b.store.set(evmHeightToCadenceHeightKey, evmHeightBytes, cadenceHeightBytes, batch); err != nil { return fmt.Errorf("failed to store evm to cadence height: %w", err) } + // set mapping of evm height to cadence block id + if err := b.store.set(evmHeightToCadenceIDKey, evmHeightBytes, cadenceID.Bytes(), batch); err != nil { + return fmt.Errorf("failed to store evm to cadence id: %w", err) + } + if err := b.store.set(latestCadenceHeightKey, nil, cadenceHeightBytes, batch); err != nil { return fmt.Errorf("failed to store latest cadence height: %w", err) } @@ -184,7 +190,7 @@ func (b *Blocks) SetLatestCadenceHeight(height uint64) error { } // InitHeights sets the Cadence height to zero as well as EVM heights. Used for empty database init. -func (b *Blocks) InitHeights(cadenceHeight uint64) error { +func (b *Blocks) InitHeights(cadenceHeight uint64, cadenceID flow.Identifier) error { // sanity check, make sure we don't have any heights stored, disable overwriting the database _, err := b.LatestEVMHeight() if !errors.Is(err, errs.ErrNotInitialized) { @@ -200,7 +206,7 @@ func (b *Blocks) InitHeights(cadenceHeight uint64) error { } // we store genesis block because it isn't emitted over the network - if err := b.Store(cadenceHeight, types.GenesisBlock); err != nil { + if err := b.Store(cadenceHeight, cadenceID, types.GenesisBlock); err != nil { return fmt.Errorf("faield to set init genesis block: %w", err) } @@ -219,6 +225,18 @@ func (b *Blocks) GetCadenceHeight(evmHeight uint64) (uint64, error) { return binary.BigEndian.Uint64(val), nil } +func (b *Blocks) GetCadenceID(evmHeight uint64) (flow.Identifier, error) { + b.mux.RLock() + defer b.mux.RUnlock() + + val, err := b.store.get(evmHeightToCadenceIDKey, uint64Bytes(evmHeight)) + if err != nil { + return flow.Identifier{}, err + } + + return flow.BytesToID(val), nil +} + func (b *Blocks) getBlock(keyCode byte, key []byte) (*types.Block, error) { data, err := b.store.get(keyCode, key) if err != nil { diff --git a/storage/pebble/keys.go b/storage/pebble/keys.go index 67b6a5df..f53ebd85 100644 --- a/storage/pebble/keys.go +++ b/storage/pebble/keys.go @@ -7,6 +7,7 @@ const ( blockHeightKey = byte(1) blockIDToHeightKey = byte(2) evmHeightToCadenceHeightKey = byte(3) + evmHeightToCadenceIDKey = byte(4) // transaction keys txIDKey = byte(10) @@ -20,6 +21,9 @@ const ( accountNonceKey = byte(30) accountBalanceKey = byte(31) + // traces keys + traceTxIDKey = byte(40) + // special keys latestEVMHeightKey = byte(100) latestCadenceHeightKey = byte(102) diff --git a/storage/pebble/storage_test.go b/storage/pebble/storage_test.go index 8fe12a0e..a9ad1d16 100644 --- a/storage/pebble/storage_test.go +++ b/storage/pebble/storage_test.go @@ -3,22 +3,24 @@ package pebble import ( "testing" - "github.com/onflow/flow-evm-gateway/config" - "github.com/onflow/flow-evm-gateway/storage" - "github.com/onflow/flow-evm-gateway/storage/errors" - "github.com/onflow/flow-evm-gateway/storage/mocks" + "github.com/onflow/flow-go-sdk" "github.com/onflow/go-ethereum/common" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + + "github.com/onflow/flow-evm-gateway/config" + "github.com/onflow/flow-evm-gateway/storage" + "github.com/onflow/flow-evm-gateway/storage/errors" + "github.com/onflow/flow-evm-gateway/storage/mocks" ) // tests that make sure the implementation conform to the interface expected behaviour func TestBlocks(t *testing.T) { runDB("blocks", t, func(t *testing.T, db *Storage) { bl := NewBlocks(db) - err := bl.InitHeights(config.EmulatorInitCadenceHeight) + err := bl.InitHeights(config.EmulatorInitCadenceHeight, flow.Identifier{0x1}) require.NoError(t, err) suite.Run(t, &storage.BlockTestSuite{Blocks: bl}) }) @@ -28,11 +30,11 @@ func TestReceipts(t *testing.T) { runDB("receipts", t, func(t *testing.T, db *Storage) { // prepare the blocks database since they track heights which are used in receipts as well bl := NewBlocks(db) - err := bl.InitHeights(config.EmulatorInitCadenceHeight) + err := bl.InitHeights(config.EmulatorInitCadenceHeight, flow.Identifier{0x1}) require.NoError(t, err) - err = bl.Store(30, mocks.NewBlock(10)) // update first and latest height + err = bl.Store(30, flow.Identifier{0x1}, mocks.NewBlock(10)) // update first and latest height require.NoError(t, err) - err = bl.Store(30, mocks.NewBlock(30)) // update latest + err = bl.Store(30, flow.Identifier{0x1}, mocks.NewBlock(30)) // update latest require.NoError(t, err) suite.Run(t, &storage.ReceiptTestSuite{ReceiptIndexer: NewReceipts(db)}) @@ -51,27 +53,35 @@ func TestAccounts(t *testing.T) { }) } +func TestTraces(t *testing.T) { + runDB("traces", t, func(t *testing.T, db *Storage) { + suite.Run(t, &storage.TraceTestSuite{TraceIndexer: NewTraces(db)}) + }) +} + func TestBlock(t *testing.T) { runDB("store block", t, func(t *testing.T, db *Storage) { bl := mocks.NewBlock(10) blocks := NewBlocks(db) - err := blocks.InitHeights(config.EmulatorInitCadenceHeight) + err := blocks.InitHeights(config.EmulatorInitCadenceHeight, flow.Identifier{0x1}) require.NoError(t, err) - err = blocks.Store(20, bl) + err = blocks.Store(20, flow.Identifier{0x1}, bl) require.NoError(t, err) }) runDB("get stored block", t, func(t *testing.T, db *Storage) { const height = uint64(12) + cadenceID := flow.Identifier{0x1} + cadenceHeight := uint64(20) bl := mocks.NewBlock(height) blocks := NewBlocks(db) - err := blocks.InitHeights(config.EmulatorInitCadenceHeight) + err := blocks.InitHeights(config.EmulatorInitCadenceHeight, flow.Identifier{0x1}) require.NoError(t, err) - err = blocks.Store(30, bl) + err = blocks.Store(cadenceHeight, cadenceID, bl) require.NoError(t, err) block, err := blocks.GetByHeight(height) @@ -84,13 +94,21 @@ func TestBlock(t *testing.T) { block, err = blocks.GetByID(id) require.NoError(t, err) assert.Equal(t, bl, block) + + h, err := blocks.GetCadenceHeight(height) + require.NoError(t, err) + require.Equal(t, cadenceHeight, h) + + cid, err := blocks.GetCadenceID(height) + require.NoError(t, err) + require.Equal(t, cadenceID, cid) }) runDB("get not found block error", t, func(t *testing.T, db *Storage) { blocks := NewBlocks(db) - err := blocks.InitHeights(config.EmulatorInitCadenceHeight) + err := blocks.InitHeights(config.EmulatorInitCadenceHeight, flow.Identifier{0x1}) require.NoError(t, err) - _ = blocks.Store(2, mocks.NewBlock(1)) // init + _ = blocks.Store(2, flow.Identifier{0x1}, mocks.NewBlock(1)) // init bl, err := blocks.GetByHeight(11) require.ErrorIs(t, err, errors.ErrNotFound) diff --git a/storage/pebble/traces.go b/storage/pebble/traces.go new file mode 100644 index 00000000..c974808b --- /dev/null +++ b/storage/pebble/traces.go @@ -0,0 +1,48 @@ +package pebble + +import ( + "fmt" + "sync" + + "github.com/goccy/go-json" + "github.com/onflow/go-ethereum/common" + + "github.com/onflow/flow-evm-gateway/storage" +) + +var _ storage.TraceIndexer = &Traces{} + +type Traces struct { + store *Storage + mux sync.RWMutex +} + +func NewTraces(store *Storage) *Traces { + return &Traces{ + store: store, + mux: sync.RWMutex{}, + } +} + +func (t *Traces) StoreTransaction(ID common.Hash, trace json.RawMessage) error { + t.mux.Lock() + defer t.mux.Unlock() + + if err := t.store.set(traceTxIDKey, ID.Bytes(), trace, nil); err != nil { + return fmt.Errorf("failed to store trace for transaction ID %s: %w", ID.String(), err) + } + + return nil +} + +func (t *Traces) GetTransaction(ID common.Hash) (json.RawMessage, error) { + t.mux.RLock() + defer t.mux.RUnlock() + + val, err := t.store.get(traceTxIDKey, ID.Bytes()) + if err != nil { + return nil, fmt.Errorf("failed to get trace by ID %s: %w", ID.String(), err) + } + + return val, nil +} diff --git a/tests/go.mod b/tests/go.mod index fca7bf8a..72dd3695 100644 --- a/tests/go.mod +++ b/tests/go.mod @@ -15,6 +15,11 @@ require ( ) require ( + cloud.google.com/go v0.112.0 // indirect + cloud.google.com/go/compute v1.24.0 // indirect + cloud.google.com/go/compute/metadata v0.2.3 // indirect + cloud.google.com/go/iam v1.1.6 // indirect + cloud.google.com/go/storage v1.36.0 // indirect github.com/DataDog/zstd v1.5.2 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect github.com/SaveTheRbtz/mph v0.1.1-0.20240117162131-4166ec7869bc // indirect @@ -51,6 +56,7 @@ require ( github.com/ef-ds/deque v1.0.4 // indirect github.com/ethereum/c-kzg-4844 v0.4.0 // indirect github.com/ethereum/go-ethereum v1.13.10 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/fxamacker/cbor/v2 v2.4.1-0.20230228173756-c0c9f774e40c // indirect github.com/fxamacker/circlehash v0.3.0 // indirect @@ -68,10 +74,14 @@ require ( github.com/gofrs/flock v0.8.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/glog v1.2.0 // indirect + github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect github.com/google/go-dap v0.11.0 // indirect + github.com/google/s2a-go v0.1.7 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect + github.com/googleapis/gax-go/v2 v2.12.0 // indirect github.com/gorilla/mux v1.8.1 // indirect github.com/gorilla/websocket v1.5.1 // indirect github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 // indirect @@ -186,6 +196,9 @@ require ( github.com/vmihailenco/tagparser v0.1.1 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/zeebo/blake3 v0.2.3 // indirect + go.opencensus.io v0.24.0 // indirect + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect go.opentelemetry.io/otel v1.24.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 // indirect @@ -200,6 +213,7 @@ require ( golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect golang.org/x/mod v0.14.0 // indirect golang.org/x/net v0.21.0 // indirect + golang.org/x/oauth2 v0.17.0 // indirect golang.org/x/sync v0.6.0 // indirect golang.org/x/sys v0.17.0 // indirect golang.org/x/term v0.17.0 // indirect @@ -208,7 +222,9 @@ require ( golang.org/x/tools v0.17.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect gonum.org/v1/gonum v0.14.0 // indirect + google.golang.org/api v0.162.0 // indirect google.golang.org/appengine v1.6.8 // indirect + google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect google.golang.org/grpc v1.63.2 // indirect diff --git a/tests/go.sum b/tests/go.sum index c492b29c..84eeaa7e 100644 --- a/tests/go.sum +++ b/tests/go.sum @@ -1149,6 +1149,8 @@ github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20230428030218-4003588d1b74/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa h1:jQCWAUqqlij9Pgj2i/PB79y4KOPYVyFYdROxgaCwdTQ= +github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq8dk6e9PdstVsDgu9RuyIIJqAaF//0IM= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/cockroachdb/datadriven v1.0.0/go.mod h1:5Ib8Meh+jk1RlHIXej6Pzevx/NLlNvQB9pmSBZErGA4= github.com/cockroachdb/datadriven v1.0.2/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU= @@ -1298,6 +1300,8 @@ github.com/envoyproxy/protoc-gen-validate v0.10.0/go.mod h1:DRjgyB0I43LtJapqN6Ni github.com/envoyproxy/protoc-gen-validate v0.10.1/go.mod h1:DRjgyB0I43LtJapqN6NiRwroiAU2PaFuvk/vjgh61ss= github.com/envoyproxy/protoc-gen-validate v1.0.1/go.mod h1:0vj8bNkYbSTNS2PIyH87KZaeN4x9zpL9Qt8fQC7d+vs= github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE= +github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= +github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw= github.com/ethereum/c-kzg-4844 v0.4.0 h1:3MS1s4JtA868KpJxroZoepdV0ZKBp3u/O5HcZ7R3nlY= github.com/ethereum/c-kzg-4844 v0.4.0/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1aQputP83wc0= @@ -1556,10 +1560,12 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/ github.com/google/gofuzz v1.1.1-0.20200604201612-c04b05f3adfa/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8= github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo= +github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/martian/v3 v3.2.1/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk= +github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw= github.com/google/martian/v3 v3.3.2/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= @@ -1605,7 +1611,6 @@ github.com/googleapis/enterprise-certificate-proxy v0.2.4/go.mod h1:AwSRAtLfXpU5 github.com/googleapis/enterprise-certificate-proxy v0.2.5/go.mod h1:RxW0N9901Cko1VOCW3SXCpWP+mlIEkk2tP7jnHy9a3w= github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs= github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0= -github.com/googleapis/gax-go v0.0.0-20161107002406-da06d194a00e h1:CYRpN206UTHUinz3VJoLaBdy1gEGeJNsqT0mvswDcMw= github.com/googleapis/gax-go v0.0.0-20161107002406-da06d194a00e/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= diff --git a/tests/helpers.go b/tests/helpers.go index e1d7ca2b..86ccccd1 100644 --- a/tests/helpers.go +++ b/tests/helpers.go @@ -171,6 +171,12 @@ func executeTest(t *testing.T, testFile string) { parts := strings.Fields(command) t.Run(testFile, func(t *testing.T) { + // timeout for tests + go func() { + time.Sleep(time.Minute * 2) + t.FailNow() + }() + cmd := exec.Command(parts[0], parts[1:]...) if cmd.Err != nil { panic(cmd.Err)