Skip to content

Commit

Permalink
Merge pull request #170 from onflow/gregor/stream/blocks-events-broad…
Browse files Browse the repository at this point in the history
…cast

Streaming of new heads
  • Loading branch information
sideninja authored Apr 11, 2024
2 parents ac48b9c + 9484956 commit df4fa6d
Show file tree
Hide file tree
Showing 55 changed files with 5,258 additions and 3,714 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
integration/db-test
tests/db-test
tests/web3js/node_modules
tests/web3js/package-lock.json
db
flow.json
flow*.json
.idea
/tests/web3js/node_modules
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ test:
.PHONY: e2e-test
e2e-test:
# test all packages
cd integration && go test -cover ./...
cd tests && go test -cover ./...

.PHONY: check-tidy
check-tidy:
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ The application can be configured using the following flags at runtime:
| `--coa-key` | (required) | *WARNING*: Do not use this flag in production! Private key value for the COA address used for submitting transactions. |
| `--coa-resource-create` | `false` | Auto-create the COA resource in the Flow COA account provided if one doesn't exist. |
| `--log-level` | `debug` | Define verbosity of the log output ('debug', 'info', 'error') |
| `--stream-limit` | 10 | Rate-limits the events sent to the client within one second |
| `--stream-timeout` | 3sec | Defines the timeout in seconds the server waits for the event to be sent to the client |


## Getting Started

Expand Down
58 changes: 18 additions & 40 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ import (
"github.com/rs/zerolog"
)

func SupportedAPIs(blockChainAPI *BlockChainAPI, pullAPI *PullAPI) []rpc.API {
func SupportedAPIs(blockChainAPI *BlockChainAPI, streamAPI *StreamAPI, pullAPI *PullAPI) []rpc.API {
return []rpc.API{{
Namespace: "eth",
Service: blockChainAPI,
}, {
Namespace: "eth",
Service: streamAPI,
}, {
Namespace: "eth",
Service: pullAPI,
Expand Down Expand Up @@ -147,60 +150,35 @@ func (b *BlockChainAPI) GetBalance(
func (b *BlockChainAPI) GetTransactionByHash(
ctx context.Context,
hash common.Hash,
) (*RPCTransaction, error) {
) (*Transaction, error) {
tx, err := b.transactions.Get(hash)
if err != nil {
return handleError[*RPCTransaction](b.logger, err)
return handleError[*Transaction](b.logger, err)
}

txHash, err := tx.Hash()
if err != nil {
return handleError[*RPCTransaction](b.logger, err)
b.logger.Error().Err(err).Any("tx", tx).Msg("failed to calculate tx hash")
return nil, errs.ErrInternal
}

rcp, err := b.receipts.GetByTransactionID(txHash)
if err != nil {
return handleError[*RPCTransaction](b.logger, err)
return handleError[*Transaction](b.logger, err)
}

from, err := tx.From()
if err != nil {
b.logger.Error().Err(err).Msg("failed to calculate sender")
return nil, errs.ErrInternal
}

v, r, s := tx.RawSignatureValues()
index := uint64(rcp.TransactionIndex)

txResult := &RPCTransaction{
Hash: txHash,
BlockHash: &rcp.BlockHash,
BlockNumber: (*hexutil.Big)(rcp.BlockNumber),
From: from,
To: tx.To(),
Gas: hexutil.Uint64(rcp.GasUsed),
GasPrice: (*hexutil.Big)(rcp.EffectiveGasPrice),
Input: tx.Data(),
Nonce: hexutil.Uint64(tx.Nonce()),
TransactionIndex: (*hexutil.Uint64)(&index),
Value: (*hexutil.Big)(tx.Value()),
Type: hexutil.Uint64(tx.Type()),
V: (*hexutil.Big)(v),
R: (*hexutil.Big)(r),
S: (*hexutil.Big)(s),
}
return txResult, nil
return NewTransaction(tx, *rcp)
}

// GetTransactionByBlockHashAndIndex returns the transaction for the given block hash and index.
func (b *BlockChainAPI) GetTransactionByBlockHashAndIndex(
ctx context.Context,
blockHash common.Hash,
index hexutil.Uint,
) (*RPCTransaction, error) {
) (*Transaction, error) {
block, err := b.blocks.GetByID(blockHash)
if err != nil {
return handleError[*RPCTransaction](b.logger, err)
return handleError[*Transaction](b.logger, err)
}

highestIndex := len(block.TransactionHashes) - 1
Expand All @@ -211,7 +189,7 @@ func (b *BlockChainAPI) GetTransactionByBlockHashAndIndex(
txHash := block.TransactionHashes[index]
tx, err := b.GetTransactionByHash(ctx, txHash)
if err != nil {
return handleError[*RPCTransaction](b.logger, err)
return handleError[*Transaction](b.logger, err)
}

return tx, nil
Expand All @@ -222,10 +200,10 @@ func (b *BlockChainAPI) GetTransactionByBlockNumberAndIndex(
ctx context.Context,
blockNumber rpc.BlockNumber,
index hexutil.Uint,
) (*RPCTransaction, error) {
) (*Transaction, error) {
block, err := b.blocks.GetByHeight(uint64(blockNumber))
if err != nil {
return handleError[*RPCTransaction](b.logger, err)
return handleError[*Transaction](b.logger, err)
}

highestIndex := len(block.TransactionHashes) - 1
Expand All @@ -236,7 +214,7 @@ func (b *BlockChainAPI) GetTransactionByBlockNumberAndIndex(
txHash := block.TransactionHashes[index]
tx, err := b.GetTransactionByHash(ctx, txHash)
if err != nil {
return handleError[*RPCTransaction](b.logger, err)
return handleError[*Transaction](b.logger, err)
}

return tx, nil
Expand Down Expand Up @@ -750,8 +728,8 @@ func (b *BlockChainAPI) Hashrate() hexutil.Uint64 {
func (b *BlockChainAPI) fetchBlockTransactions(
ctx context.Context,
block *evmTypes.Block,
) ([]*RPCTransaction, error) {
transactions := make([]*RPCTransaction, 0)
) ([]*Transaction, error) {
transactions := make([]*Transaction, 0)
for _, txHash := range block.TransactionHashes {
transaction, err := b.GetTransactionByHash(ctx, txHash)
if err != nil {
Expand Down
97 changes: 71 additions & 26 deletions api/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,26 @@ package api

import (
"errors"
errs "github.com/onflow/flow-evm-gateway/api/errors"
"github.com/onflow/flow-evm-gateway/models"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
)

var (
errFilterNotFound = errors.New("filter not found")
errExceedMaxTopics = errors.New("exceed max topics")
errFilterNotFound = errors.New("filter not found")
errExceedMaxTopics = errors.New("exceed max topics")
errExceedMaxAddresses = errors.New("exceed max addresses")
)

// The maximum number of topic criteria allowed, vm.LOG4 - vm.LOG0
const maxTopics = 4

// The maximum number of addresses allowed
const maxAddresses = 6

// TransactionArgs represents the arguments to construct a new transaction
// or a message call.
type TransactionArgs struct {
Expand Down Expand Up @@ -72,30 +78,69 @@ type StorageResult struct {
Proof []string `json:"proof"`
}

// RPCTransaction represents a transaction that will serialize to the RPC representation of a transaction
type RPCTransaction struct {
BlockHash *common.Hash `json:"blockHash"`
BlockNumber *hexutil.Big `json:"blockNumber"`
From common.Address `json:"from"`
Gas hexutil.Uint64 `json:"gas"`
GasPrice *hexutil.Big `json:"gasPrice"`
GasFeeCap *hexutil.Big `json:"maxFeePerGas,omitempty"`
GasTipCap *hexutil.Big `json:"maxPriorityFeePerGas,omitempty"`
MaxFeePerBlobGas *hexutil.Big `json:"maxFeePerBlobGas,omitempty"`
Hash common.Hash `json:"hash"`
Input hexutil.Bytes `json:"input"`
Nonce hexutil.Uint64 `json:"nonce"`
To *common.Address `json:"to"`
TransactionIndex *hexutil.Uint64 `json:"transactionIndex"`
Value *hexutil.Big `json:"value"`
Type hexutil.Uint64 `json:"type"`
Accesses *types.AccessList `json:"accessList,omitempty"`
ChainID *hexutil.Big `json:"chainId,omitempty"`
BlobVersionedHashes []common.Hash `json:"blobVersionedHashes,omitempty"`
V *hexutil.Big `json:"v"`
R *hexutil.Big `json:"r"`
S *hexutil.Big `json:"s"`
YParity *hexutil.Uint64 `json:"yParity,omitempty"`
// Transaction represents a transaction that will serialize to the RPC representation of a transaction
type Transaction struct {
BlockHash *common.Hash `json:"blockHash"`
BlockNumber *hexutil.Big `json:"blockNumber"`
From *common.MixedcaseAddress `json:"from"`
Gas hexutil.Uint64 `json:"gas"`
GasPrice *hexutil.Big `json:"gasPrice"`
GasFeeCap *hexutil.Big `json:"maxFeePerGas,omitempty"`
GasTipCap *hexutil.Big `json:"maxPriorityFeePerGas,omitempty"`
MaxFeePerBlobGas *hexutil.Big `json:"maxFeePerBlobGas,omitempty"`
Hash common.Hash `json:"hash"`
Input hexutil.Bytes `json:"input"`
Nonce hexutil.Uint64 `json:"nonce"`
To *common.MixedcaseAddress `json:"to"`
TransactionIndex *hexutil.Uint64 `json:"transactionIndex"`
Value *hexutil.Big `json:"value"`
Type hexutil.Uint64 `json:"type"`
Accesses *types.AccessList `json:"accessList,omitempty"`
ChainID *hexutil.Big `json:"chainId,omitempty"`
BlobVersionedHashes []common.Hash `json:"blobVersionedHashes,omitempty"`
V *hexutil.Big `json:"v"`
R *hexutil.Big `json:"r"`
S *hexutil.Big `json:"s"`
YParity *hexutil.Uint64 `json:"yParity,omitempty"`
}

func NewTransaction(tx models.Transaction, receipt types.Receipt) (*Transaction, error) {
txHash, err := tx.Hash()
if err != nil {
return nil, err
}

f, err := tx.From()
if err != nil {
return nil, errs.ErrInternal
}
from := common.NewMixedcaseAddress(f)

var to common.MixedcaseAddress
if t := tx.To(); t != nil {
to = common.NewMixedcaseAddress(*t)
}

v, r, s := tx.RawSignatureValues()
index := uint64(receipt.TransactionIndex)

return &Transaction{
Hash: txHash,
BlockHash: &receipt.BlockHash,
BlockNumber: (*hexutil.Big)(receipt.BlockNumber),
From: &from,
To: &to,
Gas: hexutil.Uint64(receipt.GasUsed),
GasPrice: (*hexutil.Big)(receipt.EffectiveGasPrice),
Input: tx.Data(),
Nonce: hexutil.Uint64(tx.Nonce()),
TransactionIndex: (*hexutil.Uint64)(&index),
Value: (*hexutil.Big)(tx.Value()),
Type: hexutil.Uint64(tx.Type()),
V: (*hexutil.Big)(v),
R: (*hexutil.Big)(r),
S: (*hexutil.Big)(s),
}, nil
}

// SignTransactionResult represents a RLP encoded signed transaction.
Expand Down
6 changes: 2 additions & 4 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,8 @@ func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {

ws := h.wsHandler
if ws != nil && isWebSocket(r) {
if checkPath(r, "") {
ws.ServeHTTP(w, r)
return
}
ws.ServeHTTP(w, r)
return
}

// enable logging responses
Expand Down
Loading

0 comments on commit df4fa6d

Please sign in to comment.