diff --git a/pkg/sentry/ethereum/beacon.go b/pkg/sentry/ethereum/beacon.go index 913be3b9..aaaf739b 100644 --- a/pkg/sentry/ethereum/beacon.go +++ b/pkg/sentry/ethereum/beacon.go @@ -24,7 +24,7 @@ type BeaconNode struct { onReadyCallbacks []func(ctx context.Context) error } -func NewBeaconNode(ctx context.Context, name string, config *Config, log logrus.FieldLogger) (*BeaconNode, error) { +func NewBeaconNode(ctx context.Context, name string, config *Config, log logrus.FieldLogger, opt *Options) (*BeaconNode, error) { opts := *beacon. DefaultOptions(). DisablePrometheusMetrics() @@ -48,7 +48,7 @@ func NewBeaconNode(ctx context.Context, name string, config *Config, log logrus. }, "xatu_sentry", opts) metadata := services.NewMetadataService(log, node, config.OverrideNetworkName) - duties := services.NewDutiesService(log, node, &metadata) + duties := services.NewDutiesService(log, node, &metadata, opt.FetchProposerDuties, opt.FetchBeaconCommittees) svcs := []services.Service{ &metadata, diff --git a/pkg/sentry/ethereum/option.go b/pkg/sentry/ethereum/option.go new file mode 100644 index 00000000..78f8fa08 --- /dev/null +++ b/pkg/sentry/ethereum/option.go @@ -0,0 +1,18 @@ +package ethereum + +type Options struct { + FetchBeaconCommittees bool + FetchProposerDuties bool +} + +func (o *Options) WithFetchBeaconCommittees(fetchBeaconCommittees bool) *Options { + o.FetchBeaconCommittees = fetchBeaconCommittees + + return o +} + +func (o *Options) WithFetchProposerDuties(fetchProposerDuties bool) *Options { + o.FetchProposerDuties = fetchProposerDuties + + return o +} diff --git a/pkg/sentry/ethereum/services/duties.go b/pkg/sentry/ethereum/services/duties.go index 0c1d4bd4..a2c243d9 100644 --- a/pkg/sentry/ethereum/services/duties.go +++ b/pkg/sentry/ethereum/services/duties.go @@ -31,6 +31,9 @@ type DutiesService struct { metadata *MetadataService + proposerDutiesEnabled bool + beaconCommitteesEnabled bool + bootstrapped bool onReadyCallbacks []func(context.Context) error @@ -38,7 +41,7 @@ type DutiesService struct { lastSyncState bool } -func NewDutiesService(log logrus.FieldLogger, sbeacon beacon.Node, metadata *MetadataService) DutiesService { +func NewDutiesService(log logrus.FieldLogger, sbeacon beacon.Node, metadata *MetadataService, proposerDutiesEnabled, beaconCommitteesEnabled bool) DutiesService { return DutiesService{ beacon: sbeacon, log: log.WithField("module", "sentry/ethereum/duties"), @@ -59,11 +62,35 @@ func NewDutiesService(log logrus.FieldLogger, sbeacon beacon.Node, metadata *Met bootstrapped: false, + proposerDutiesEnabled: proposerDutiesEnabled, + beaconCommitteesEnabled: beaconCommitteesEnabled, + lastSyncState: false, } } func (m *DutiesService) Start(ctx context.Context) error { + m.log.WithFields(logrus.Fields{ + "proposer_duties_enabled": m.proposerDutiesEnabled, + "beacon_committees_enabled": m.beaconCommitteesEnabled, + }).Info("Starting duties service") + + if !m.proposerDutiesEnabled && !m.beaconCommitteesEnabled { + m.log.Info("Duties service is disabled") + + if err := m.Ready(ctx); err != nil { + return fmt.Errorf("failed to fire on ready callback: %w", err) + } + + for _, fn := range m.onReadyCallbacks { + if err := fn(ctx); err != nil { + return fmt.Errorf("failed to fire on ready callback: %w", err) + } + } + + return nil + } + go func() { operation := func() error { if err := m.fetchRequiredEpochDuties(ctx, false); err != nil { @@ -99,6 +126,10 @@ func (m *DutiesService) Start(ctx context.Context) error { // Fetch beacon committees m.metadata.Wallclock().OnEpochChanged(func(epoch ethwallclock.Epoch) { + if !m.beaconCommitteesEnabled { + return + } + // Sleep for a bit to give the beacon node a chance to run its epoch transition. // We don't really care about nice-to-have duties so the sleep here is fine. // "Required" duties (aka the current epoch) will be refetched the moment that epoch @@ -117,6 +148,10 @@ func (m *DutiesService) Start(ctx context.Context) error { // Fetch proposer duties m.metadata.Wallclock().OnEpochChanged(func(epoch ethwallclock.Epoch) { + if !m.proposerDutiesEnabled { + return + } + // Sleep for a bit to give the beacon node a chance to run its epoch transition. time.Sleep(100 * time.Millisecond) @@ -127,6 +162,10 @@ func (m *DutiesService) Start(ctx context.Context) error { // Anticipate the next epoch and fetch the next epoch's beacon committees. m.metadata.Wallclock().OnEpochChanged(func(epoch ethwallclock.Epoch) { + if !m.beaconCommitteesEnabled { + return + } + // Sleep until just before the start of the next epoch to fetch the next epoch's duties. time.Sleep(epoch.TimeWindow().EndsIn() - 2*time.Second) @@ -144,6 +183,10 @@ func (m *DutiesService) Start(ctx context.Context) error { }) m.beacon.OnChainReOrg(ctx, func(ctx context.Context, ev *v1.ChainReorgEvent) error { + if !m.beaconCommitteesEnabled { + return nil + } + m.log.Info("Chain reorg detected - refetching beacon committees") if err := m.fetchRequiredEpochDuties(ctx, true); err != nil { @@ -154,6 +197,10 @@ func (m *DutiesService) Start(ctx context.Context) error { }) m.beacon.OnSyncStatus(ctx, func(ctx context.Context, ev *beacon.SyncStatusEvent) error { + if !m.beaconCommitteesEnabled { + return nil + } + if ev.State.IsSyncing != m.lastSyncState { m.log.WithFields(logrus.Fields{ "is_syncing": ev.State.IsSyncing, @@ -201,6 +248,10 @@ func (m *DutiesService) Name() Name { } func (m *DutiesService) RequiredEpochDuties(ctx context.Context) []phase0.Epoch { + if !m.beaconCommitteesEnabled { + return []phase0.Epoch{} + } + now := m.metadata.Wallclock().Epochs().Current() epochNumber := now.Number() @@ -213,6 +264,10 @@ func (m *DutiesService) RequiredEpochDuties(ctx context.Context) []phase0.Epoch } func (m *DutiesService) NiceToHaveEpochDuties(ctx context.Context) []phase0.Epoch { + if !m.beaconCommitteesEnabled { + return []phase0.Epoch{} + } + now := m.metadata.Wallclock().Epochs().Current() epochNumber := now.Number() @@ -237,6 +292,10 @@ func (m *DutiesService) NiceToHaveEpochDuties(ctx context.Context) []phase0.Epoc } func (m *DutiesService) Ready(ctx context.Context) error { + if !m.beaconCommitteesEnabled { + return nil + } + for _, epoch := range m.RequiredEpochDuties(ctx) { if duties := m.beaconCommittees.Get(epoch); duties == nil { return fmt.Errorf("duties for epoch %d are not ready", epoch) @@ -247,6 +306,10 @@ func (m *DutiesService) Ready(ctx context.Context) error { } func (m *DutiesService) fetchRequiredEpochDuties(ctx context.Context, overrideCache ...bool) error { + if !m.beaconCommitteesEnabled { + return nil + } + if m.metadata.Wallclock() == nil { return fmt.Errorf("metadata service is not ready") } @@ -263,6 +326,10 @@ func (m *DutiesService) fetchRequiredEpochDuties(ctx context.Context, overrideCa } func (m *DutiesService) fetchNiceToHaveEpochDuties(ctx context.Context) error { + if !m.beaconCommitteesEnabled { + return nil + } + if m.metadata.Wallclock() == nil { return fmt.Errorf("metadata service is not ready") } @@ -295,6 +362,10 @@ func (m *DutiesService) fireOnProposerDutiesSubscriptions(epoch phase0.Epoch, du } func (m *DutiesService) fetchBeaconCommittee(ctx context.Context, epoch phase0.Epoch, overrideCache ...bool) error { + if !m.beaconCommitteesEnabled { + return fmt.Errorf("beacon committees are not enabled") + } + if len(overrideCache) != 0 && !overrideCache[0] { if duties := m.beaconCommittees.Get(epoch); duties != nil { return nil diff --git a/pkg/sentry/sentry.go b/pkg/sentry/sentry.go index e6f55972..cb7338b4 100644 --- a/pkg/sentry/sentry.go +++ b/pkg/sentry/sentry.go @@ -19,6 +19,7 @@ import ( "github.com/attestantio/go-eth2-client/spec/altair" "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/beevik/ntp" + "github.com/ethpandaops/beacon/pkg/beacon" "github.com/ethpandaops/xatu/pkg/networks" "github.com/ethpandaops/xatu/pkg/observability" "github.com/ethpandaops/xatu/pkg/output" @@ -115,7 +116,48 @@ func New(ctx context.Context, log logrus.FieldLogger, config *Config, overrides return nil, err } - beacon, err := ethereum.NewBeaconNode(ctx, config.Name, &config.Ethereum, log) + beaconOpts := ethereum.Options{} + + hasAttestationSubscription := false + + attestationTopic := "attestation" + + if config.Ethereum.BeaconSubscriptions != nil { + for _, topic := range *config.Ethereum.BeaconSubscriptions { + if topic == attestationTopic { + hasAttestationSubscription = true + + break + } + } + } else if beacon.DefaultEnabledBeaconSubscriptionOptions().Enabled { + // If no subscriptions have been provided in config, we need to check if the default options have it enabled + for _, topic := range beacon.DefaultEnabledBeaconSubscriptionOptions().Topics { + if topic == attestationTopic { + hasAttestationSubscription = true + + break + } + } + } + + if hasAttestationSubscription { + log.Info("Enabling beacon committees as we are subscribed to attestation events") + + beaconOpts.WithFetchBeaconCommittees(true) + } + + if config.BeaconCommittees != nil && config.BeaconCommittees.Enabled { + log.Info("Enabling beacon committees as we need to fetch them on interval") + beaconOpts.WithFetchBeaconCommittees(true) + } + + if config.ProposerDuty != nil && config.ProposerDuty.Enabled { + log.Info("Enabling proposer duties as we need to fetch them on interval") + beaconOpts.WithFetchProposerDuties(true) + } + + b, err := ethereum.NewBeaconNode(ctx, config.Name, &config.Ethereum, log, &beaconOpts) if err != nil { return nil, err } @@ -126,7 +168,7 @@ func New(ctx context.Context, log logrus.FieldLogger, config *Config, overrides s := &Sentry{ Config: config, sinks: sinks, - beacon: beacon, + beacon: b, clockDrift: time.Duration(0), log: log, duplicateCache: duplicateCache,