Skip to content

Commit

Permalink
Storage fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
janezpodhostnik committed Nov 7, 2024
1 parent f8802d3 commit beedd0f
Show file tree
Hide file tree
Showing 17 changed files with 384 additions and 681 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ generate:
mockery --dir=storage --name=BlockIndexer --output=storage/mocks
mockery --dir=storage --name=ReceiptIndexer --output=storage/mocks
mockery --dir=storage --name=TransactionIndexer --output=storage/mocks
mockery --dir=storage --name=AccountIndexer --output=storage/mocks
mockery --dir=storage --name=TraceIndexer --output=storage/mocks
mockery --all --dir=services/ingestion --output=services/ingestion/mocks
mockery --dir=models --name=Engine --output=models/mocks
Expand Down
16 changes: 0 additions & 16 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ type BlockChainAPI struct {
blocks storage.BlockIndexer
transactions storage.TransactionIndexer
receipts storage.ReceiptIndexer
accounts storage.AccountIndexer
indexingResumedHeight uint64
limiter limiter.Store
collector metrics.Collector
Expand All @@ -166,7 +165,6 @@ func NewBlockChainAPI(
blocks storage.BlockIndexer,
transactions storage.TransactionIndexer,
receipts storage.ReceiptIndexer,
accounts storage.AccountIndexer,
ratelimiter limiter.Store,
collector metrics.Collector,
) (*BlockChainAPI, error) {
Expand All @@ -183,7 +181,6 @@ func NewBlockChainAPI(
blocks: blocks,
transactions: transactions,
receipts: receipts,
accounts: accounts,
indexingResumedHeight: indexingResumedHeight,
limiter: ratelimiter,
collector: collector,
Expand Down Expand Up @@ -761,19 +758,6 @@ func (b *BlockChainAPI) GetTransactionCount(
return handleError[*hexutil.Uint64](err, l, b.collector)
}

nonce, err := b.accounts.GetNonce(address)
if err != nil {
return handleError[*hexutil.Uint64](errs.ErrInternal, l, b.collector)
}

// compare both until we gain confidence in db nonce tracking working correctly
if nonce != networkNonce {
l.Error().
Uint64("network-nonce", networkNonce).
Uint64("db-nonce", nonce).
Msg("network nonce does not equal db nonce")
}

return (*hexutil.Uint64)(&networkNonce), nil
}

Expand Down
24 changes: 17 additions & 7 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ type Storages struct {
Blocks storage.BlockIndexer
Transactions storage.TransactionIndexer
Receipts storage.ReceiptIndexer
Accounts storage.AccountIndexer
Traces storage.TraceIndexer
}

Expand Down Expand Up @@ -156,7 +155,6 @@ func (b *Bootstrap) StartEventIngestion(ctx context.Context) error {
b.storages.Blocks,
b.storages.Receipts,
b.storages.Transactions,
b.storages.Accounts,
b.storages.Traces,
b.publishers.Block,
b.publishers.Logs,
Expand Down Expand Up @@ -256,7 +254,6 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error {
b.storages.Blocks,
b.storages.Transactions,
b.storages.Receipts,
b.storages.Accounts,
ratelimiter,
b.collector,
)
Expand Down Expand Up @@ -393,6 +390,16 @@ func (b *Bootstrap) StopProfilerServer() {
}
}

func (b *Bootstrap) StopDB() {
if b.storages == nil || b.storages.Storage == nil {
return
}
err := b.storages.Storage.Close()
if err != nil {
b.logger.Err(err).Msg("PebbleDB graceful shutdown failed")
}
}

// StartEngine starts provided engine and panics if there are startup errors.
func StartEngine(
ctx context.Context,
Expand Down Expand Up @@ -493,12 +500,13 @@ func setupStorage(
}(batch)

cadenceHeight := config.InitCadenceHeight
evmBlokcHeight := uint64(0)
cadenceBlock, err := client.GetBlockHeaderByHeight(context.Background(), cadenceHeight)
if err != nil {
return nil, fmt.Errorf("could not fetch provided cadence height, make sure it's correct: %w", err)
}

snapshot, err := registerStore.GetSnapshotAt(0)
snapshot, err := registerStore.GetSnapshotAt(evmBlokcHeight)
if err != nil {
return nil, fmt.Errorf("could not get register snapshot at block height %d: %w", 0, err)
}
Expand All @@ -514,7 +522,7 @@ func setupStorage(
return nil, fmt.Errorf("could not set account status: %w", err)
}

err = registerStore.Store(delta.GetUpdates(), cadenceHeight, batch)
err = registerStore.Store(delta.GetUpdates(), evmBlokcHeight, batch)
if err != nil {
return nil, fmt.Errorf("could not store register updates: %w", err)
}
Expand All @@ -533,7 +541,9 @@ func setupStorage(
return nil, fmt.Errorf("could not commit register updates: %w", err)
}

logger.Info().Msgf("database initialized with cadence height: %d", cadenceHeight)
logger.Info().
Stringer("fvm_address_for_evm_storage_account", storageAddress).
Msgf("database initialized with cadence height: %d", cadenceHeight)
}
//else {
// // TODO(JanezP): verify storage account owner is correct
Expand All @@ -545,7 +555,6 @@ func setupStorage(
Registers: registerStore,
Transactions: pebble.NewTransactions(store),
Receipts: pebble.NewReceipts(store),
Accounts: pebble.NewAccounts(store),
Traces: pebble.NewTraces(store),
}, nil
}
Expand Down Expand Up @@ -585,6 +594,7 @@ func Run(ctx context.Context, cfg *config.Config, ready chan struct{}) error {
boot.StopEventIngestion()
boot.StopMetricsServer()
boot.StopAPIServer()
boot.StopDB()

return nil
}
30 changes: 18 additions & 12 deletions services/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ingestion
import (
"context"
"fmt"

Check failure on line 5 in services/ingestion/engine.go

View workflow job for this annotation

GitHub Actions / Lint

File is not `goimports`-ed (goimports)

flowGo "github.com/onflow/flow-go/model/flow"

pebbleDB "github.com/cockroachdb/pebble"
Expand Down Expand Up @@ -46,7 +45,6 @@ type Engine struct {
blocks storage.BlockIndexer
receipts storage.ReceiptIndexer
transactions storage.TransactionIndexer
accounts storage.AccountIndexer
traces storage.TraceIndexer
log zerolog.Logger
evmLastHeight *models.SequentialHeight
Expand All @@ -64,7 +62,6 @@ func NewEventIngestionEngine(
blocks storage.BlockIndexer,
receipts storage.ReceiptIndexer,
transactions storage.TransactionIndexer,
accounts storage.AccountIndexer,
traces storage.TraceIndexer,
blocksPublisher *models.Publisher[*models.Block],
logsPublisher *models.Publisher[[]*gethTypes.Log],
Expand All @@ -84,7 +81,6 @@ func NewEventIngestionEngine(
blocks: blocks,
receipts: receipts,
transactions: transactions,
accounts: accounts,
traces: traces,
log: log,
blocksPublisher: blocksPublisher,
Expand Down Expand Up @@ -120,14 +116,21 @@ func (e *Engine) Run(ctx context.Context) error {
e.MarkReady()

for events := range e.subscriber.Subscribe(ctx) {
select {
case <-ctx.Done():
// stop the engine
return nil
default:
}

if events.Err != nil {
return fmt.Errorf(
"failure in event subscription with: %w",
events.Err,
)
}

err := e.processEvents(events.Events)
err := e.processEvents(ctx, events.Events)
if err != nil {
e.log.Error().Err(err).Msg("failed to process EVM events")
return err
Expand All @@ -149,7 +152,7 @@ func (e *Engine) Run(ctx context.Context) error {
// https://github.com/onflow/flow-go/blob/master/fvm/evm/types/events.go
//
// Any error is unexpected and fatal.
func (e *Engine) processEvents(events *models.CadenceEvents) error {
func (e *Engine) processEvents(ctx context.Context, events *models.CadenceEvents) error {
e.log.Info().
Uint64("cadence-height", events.CadenceHeight()).
Int("cadence-event-length", events.Length()).
Expand All @@ -168,8 +171,7 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error {
return nil // nothing else to do this was heartbeat event with not event payloads
}

// TODO(JanezP): accounts need an indexed batch. Investigate why and try to switch to non-indexed batch
batch := e.store.NewIndexedBatch()
batch := e.store.NewBatch()
defer func(batch *pebbleDB.Batch) {
err := batch.Close()
if err != nil {
Expand Down Expand Up @@ -255,6 +257,14 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error {
}
}

select {
case <-ctx.Done():
// Temporary solution to avoid committing the batch when the DB is closed
// TODO(JanezP): handle this better
return nil
default:
}

if err := batch.Commit(pebbleDB.Sync); err != nil {
return fmt.Errorf("failed to commit indexed data for Cadence block %d: %w", events.CadenceHeight(), err)
}
Expand Down Expand Up @@ -327,10 +337,6 @@ func (e *Engine) indexTransaction(
return fmt.Errorf("failed to store tx: %s, with: %w", tx.Hash(), err)
}

if err := e.accounts.Update(tx, receipt, batch); err != nil {
return fmt.Errorf("failed to update accounts for tx: %s, with: %w", tx.Hash(), err)
}

return nil
}

Expand Down
30 changes: 0 additions & 30 deletions services/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,6 @@ func TestSerialBlockIngestion(t *testing.T) {
}).
Once() // make sure this isn't called multiple times

accounts := &storageMock.AccountIndexer{}
accounts.
On("Update").
Return(func() error { return nil })

traces := &storageMock.TraceIndexer{}

eventsChan := make(chan models.BlockEvents)
Expand All @@ -77,7 +72,6 @@ func TestSerialBlockIngestion(t *testing.T) {
blocks,
receipts,
transactions,
accounts,
traces,
models.NewPublisher[*models.Block](),
models.NewPublisher[[]*gethTypes.Log](),
Expand Down Expand Up @@ -139,11 +133,6 @@ func TestSerialBlockIngestion(t *testing.T) {
}).
Once() // make sure this isn't called multiple times

accounts := &storageMock.AccountIndexer{}
accounts.
On("Update", mock.Anything, mock.Anything).
Return(func(t models.TransactionCall, r *gethTypes.Receipt) error { return nil })

traces := &storageMock.TraceIndexer{}

eventsChan := make(chan models.BlockEvents)
Expand All @@ -162,7 +151,6 @@ func TestSerialBlockIngestion(t *testing.T) {
blocks,
receipts,
transactions,
accounts,
traces,
models.NewPublisher[*models.Block](),
models.NewPublisher[[]*gethTypes.Log](),
Expand Down Expand Up @@ -255,11 +243,6 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
return nil
})

accounts := &storageMock.AccountIndexer{}
accounts.
On("Update", mock.AnythingOfType("models.TransactionCall"), mock.AnythingOfType("*models.Receipt"), mock.Anything).
Return(func(tx models.Transaction, receipt *models.Receipt, _ *pebbleDB.Batch) error { return nil })

eventsChan := make(chan models.BlockEvents)
subscriber := &mocks.EventSubscriber{}
subscriber.
Expand Down Expand Up @@ -289,7 +272,6 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
blocks,
receipts,
transactions,
accounts,
traces,
models.NewPublisher[*models.Block](),
models.NewPublisher[[]*gethTypes.Log](),
Expand Down Expand Up @@ -369,11 +351,6 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
On("SetLatestCadenceHeight", mock.AnythingOfType("uint64")).
Return(func(h uint64) error { return nil })

accounts := &storageMock.AccountIndexer{}
accounts.
On("Update", mock.AnythingOfType("models.TransactionCall"), mock.AnythingOfType("*models.Receipt"), mock.Anything).
Return(func(tx models.Transaction, receipt *models.Receipt, _ *pebbleDB.Batch) error { return nil })

eventsChan := make(chan models.BlockEvents)
subscriber := &mocks.EventSubscriber{}
subscriber.
Expand Down Expand Up @@ -403,7 +380,6 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
blocks,
receipts,
transactions,
accounts,
traces,
models.NewPublisher[*models.Block](),
models.NewPublisher[[]*gethTypes.Log](),
Expand Down Expand Up @@ -479,11 +455,6 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
}).
Once() // make sure this isn't called multiple times

accounts := &storageMock.AccountIndexer{}
accounts.
On("Update", mock.Anything, mock.AnythingOfType("*models.Receipt"), mock.Anything).
Return(func(t models.Transaction, r *models.Receipt, _ *pebbleDB.Batch) error { return nil })

traces := &storageMock.TraceIndexer{}

eventsChan := make(chan models.BlockEvents)
Expand All @@ -503,7 +474,6 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
blocks,
receipts,
transactions,
accounts,
traces,
models.NewPublisher[*models.Block](),
models.NewPublisher[[]*gethTypes.Log](),
Expand Down
Loading

0 comments on commit beedd0f

Please sign in to comment.