Skip to content

Commit

Permalink
feat(sentry): Disable fetching committees if not needed (#388)
Browse files Browse the repository at this point in the history
* feat(sentry): Disable fetching committees if not needed

* feat(sentry): Disable fetching committees if not needed

* fix: Improve error handling in duties service
  • Loading branch information
samcm authored Oct 7, 2024
1 parent c5fd9b2 commit 683d129
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 5 deletions.
4 changes: 2 additions & 2 deletions pkg/sentry/ethereum/beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type BeaconNode struct {
onReadyCallbacks []func(ctx context.Context) error
}

func NewBeaconNode(ctx context.Context, name string, config *Config, log logrus.FieldLogger) (*BeaconNode, error) {
func NewBeaconNode(ctx context.Context, name string, config *Config, log logrus.FieldLogger, opt *Options) (*BeaconNode, error) {
opts := *beacon.
DefaultOptions().
DisablePrometheusMetrics()
Expand All @@ -48,7 +48,7 @@ func NewBeaconNode(ctx context.Context, name string, config *Config, log logrus.
}, "xatu_sentry", opts)

metadata := services.NewMetadataService(log, node, config.OverrideNetworkName)
duties := services.NewDutiesService(log, node, &metadata)
duties := services.NewDutiesService(log, node, &metadata, opt.FetchProposerDuties, opt.FetchBeaconCommittees)

svcs := []services.Service{
&metadata,
Expand Down
18 changes: 18 additions & 0 deletions pkg/sentry/ethereum/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package ethereum

type Options struct {
FetchBeaconCommittees bool
FetchProposerDuties bool
}

func (o *Options) WithFetchBeaconCommittees(fetchBeaconCommittees bool) *Options {
o.FetchBeaconCommittees = fetchBeaconCommittees

return o
}

func (o *Options) WithFetchProposerDuties(fetchProposerDuties bool) *Options {
o.FetchProposerDuties = fetchProposerDuties

return o
}
73 changes: 72 additions & 1 deletion pkg/sentry/ethereum/services/duties.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,17 @@ type DutiesService struct {

metadata *MetadataService

proposerDutiesEnabled bool
beaconCommitteesEnabled bool

bootstrapped bool

onReadyCallbacks []func(context.Context) error

lastSyncState bool
}

func NewDutiesService(log logrus.FieldLogger, sbeacon beacon.Node, metadata *MetadataService) DutiesService {
func NewDutiesService(log logrus.FieldLogger, sbeacon beacon.Node, metadata *MetadataService, proposerDutiesEnabled, beaconCommitteesEnabled bool) DutiesService {
return DutiesService{
beacon: sbeacon,
log: log.WithField("module", "sentry/ethereum/duties"),
Expand All @@ -59,11 +62,35 @@ func NewDutiesService(log logrus.FieldLogger, sbeacon beacon.Node, metadata *Met

bootstrapped: false,

proposerDutiesEnabled: proposerDutiesEnabled,
beaconCommitteesEnabled: beaconCommitteesEnabled,

lastSyncState: false,
}
}

func (m *DutiesService) Start(ctx context.Context) error {
m.log.WithFields(logrus.Fields{
"proposer_duties_enabled": m.proposerDutiesEnabled,
"beacon_committees_enabled": m.beaconCommitteesEnabled,
}).Info("Starting duties service")

if !m.proposerDutiesEnabled && !m.beaconCommitteesEnabled {
m.log.Info("Duties service is disabled")

if err := m.Ready(ctx); err != nil {
return fmt.Errorf("failed to fire on ready callback: %w", err)
}

for _, fn := range m.onReadyCallbacks {
if err := fn(ctx); err != nil {
return fmt.Errorf("failed to fire on ready callback: %w", err)
}
}

return nil
}

go func() {
operation := func() error {
if err := m.fetchRequiredEpochDuties(ctx, false); err != nil {
Expand Down Expand Up @@ -99,6 +126,10 @@ func (m *DutiesService) Start(ctx context.Context) error {

// Fetch beacon committees
m.metadata.Wallclock().OnEpochChanged(func(epoch ethwallclock.Epoch) {
if !m.beaconCommitteesEnabled {
return
}

// Sleep for a bit to give the beacon node a chance to run its epoch transition.
// We don't really care about nice-to-have duties so the sleep here is fine.
// "Required" duties (aka the current epoch) will be refetched the moment that epoch
Expand All @@ -117,6 +148,10 @@ func (m *DutiesService) Start(ctx context.Context) error {

// Fetch proposer duties
m.metadata.Wallclock().OnEpochChanged(func(epoch ethwallclock.Epoch) {
if !m.proposerDutiesEnabled {
return
}

// Sleep for a bit to give the beacon node a chance to run its epoch transition.
time.Sleep(100 * time.Millisecond)

Expand All @@ -127,6 +162,10 @@ func (m *DutiesService) Start(ctx context.Context) error {

// Anticipate the next epoch and fetch the next epoch's beacon committees.
m.metadata.Wallclock().OnEpochChanged(func(epoch ethwallclock.Epoch) {
if !m.beaconCommitteesEnabled {
return
}

// Sleep until just before the start of the next epoch to fetch the next epoch's duties.
time.Sleep(epoch.TimeWindow().EndsIn() - 2*time.Second)

Expand All @@ -144,6 +183,10 @@ func (m *DutiesService) Start(ctx context.Context) error {
})

m.beacon.OnChainReOrg(ctx, func(ctx context.Context, ev *v1.ChainReorgEvent) error {
if !m.beaconCommitteesEnabled {
return nil
}

m.log.Info("Chain reorg detected - refetching beacon committees")

if err := m.fetchRequiredEpochDuties(ctx, true); err != nil {
Expand All @@ -154,6 +197,10 @@ func (m *DutiesService) Start(ctx context.Context) error {
})

m.beacon.OnSyncStatus(ctx, func(ctx context.Context, ev *beacon.SyncStatusEvent) error {
if !m.beaconCommitteesEnabled {
return nil
}

if ev.State.IsSyncing != m.lastSyncState {
m.log.WithFields(logrus.Fields{
"is_syncing": ev.State.IsSyncing,
Expand Down Expand Up @@ -201,6 +248,10 @@ func (m *DutiesService) Name() Name {
}

func (m *DutiesService) RequiredEpochDuties(ctx context.Context) []phase0.Epoch {
if !m.beaconCommitteesEnabled {
return []phase0.Epoch{}
}

now := m.metadata.Wallclock().Epochs().Current()

epochNumber := now.Number()
Expand All @@ -213,6 +264,10 @@ func (m *DutiesService) RequiredEpochDuties(ctx context.Context) []phase0.Epoch
}

func (m *DutiesService) NiceToHaveEpochDuties(ctx context.Context) []phase0.Epoch {
if !m.beaconCommitteesEnabled {
return []phase0.Epoch{}
}

now := m.metadata.Wallclock().Epochs().Current()

epochNumber := now.Number()
Expand All @@ -237,6 +292,10 @@ func (m *DutiesService) NiceToHaveEpochDuties(ctx context.Context) []phase0.Epoc
}

func (m *DutiesService) Ready(ctx context.Context) error {
if !m.beaconCommitteesEnabled {
return nil
}

for _, epoch := range m.RequiredEpochDuties(ctx) {
if duties := m.beaconCommittees.Get(epoch); duties == nil {
return fmt.Errorf("duties for epoch %d are not ready", epoch)
Expand All @@ -247,6 +306,10 @@ func (m *DutiesService) Ready(ctx context.Context) error {
}

func (m *DutiesService) fetchRequiredEpochDuties(ctx context.Context, overrideCache ...bool) error {
if !m.beaconCommitteesEnabled {
return nil
}

if m.metadata.Wallclock() == nil {
return fmt.Errorf("metadata service is not ready")
}
Expand All @@ -263,6 +326,10 @@ func (m *DutiesService) fetchRequiredEpochDuties(ctx context.Context, overrideCa
}

func (m *DutiesService) fetchNiceToHaveEpochDuties(ctx context.Context) error {
if !m.beaconCommitteesEnabled {
return nil
}

if m.metadata.Wallclock() == nil {
return fmt.Errorf("metadata service is not ready")
}
Expand Down Expand Up @@ -295,6 +362,10 @@ func (m *DutiesService) fireOnProposerDutiesSubscriptions(epoch phase0.Epoch, du
}

func (m *DutiesService) fetchBeaconCommittee(ctx context.Context, epoch phase0.Epoch, overrideCache ...bool) error {
if !m.beaconCommitteesEnabled {
return fmt.Errorf("beacon committees are not enabled")
}

if len(overrideCache) != 0 && !overrideCache[0] {
if duties := m.beaconCommittees.Get(epoch); duties != nil {
return nil
Expand Down
46 changes: 44 additions & 2 deletions pkg/sentry/sentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/attestantio/go-eth2-client/spec/altair"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/beevik/ntp"
"github.com/ethpandaops/beacon/pkg/beacon"
"github.com/ethpandaops/xatu/pkg/networks"
"github.com/ethpandaops/xatu/pkg/observability"
"github.com/ethpandaops/xatu/pkg/output"
Expand Down Expand Up @@ -115,7 +116,48 @@ func New(ctx context.Context, log logrus.FieldLogger, config *Config, overrides
return nil, err
}

beacon, err := ethereum.NewBeaconNode(ctx, config.Name, &config.Ethereum, log)
beaconOpts := ethereum.Options{}

hasAttestationSubscription := false

attestationTopic := "attestation"

if config.Ethereum.BeaconSubscriptions != nil {
for _, topic := range *config.Ethereum.BeaconSubscriptions {
if topic == attestationTopic {
hasAttestationSubscription = true

break
}
}
} else if beacon.DefaultEnabledBeaconSubscriptionOptions().Enabled {
// If no subscriptions have been provided in config, we need to check if the default options have it enabled
for _, topic := range beacon.DefaultEnabledBeaconSubscriptionOptions().Topics {
if topic == attestationTopic {
hasAttestationSubscription = true

break
}
}
}

if hasAttestationSubscription {
log.Info("Enabling beacon committees as we are subscribed to attestation events")

beaconOpts.WithFetchBeaconCommittees(true)
}

if config.BeaconCommittees != nil && config.BeaconCommittees.Enabled {
log.Info("Enabling beacon committees as we need to fetch them on interval")
beaconOpts.WithFetchBeaconCommittees(true)
}

if config.ProposerDuty != nil && config.ProposerDuty.Enabled {
log.Info("Enabling proposer duties as we need to fetch them on interval")
beaconOpts.WithFetchProposerDuties(true)
}

b, err := ethereum.NewBeaconNode(ctx, config.Name, &config.Ethereum, log, &beaconOpts)
if err != nil {
return nil, err
}
Expand All @@ -126,7 +168,7 @@ func New(ctx context.Context, log logrus.FieldLogger, config *Config, overrides
s := &Sentry{
Config: config,
sinks: sinks,
beacon: beacon,
beacon: b,
clockDrift: time.Duration(0),
log: log,
duplicateCache: duplicateCache,
Expand Down

0 comments on commit 683d129

Please sign in to comment.