Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(relay-monitor): Add MEV Relay Payload Delivered #373

Merged
merged 6 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion deploy/local/docker-compose/vector-http-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ transforms:
libp2p_trace_gossipsub_blob_sidecar: .event.name == "LIBP2P_TRACE_GOSSIPSUB_BLOB_SIDECAR"
beacon_api_eth_v1_beacon_validators: .event.name == "BEACON_API_ETH_V1_BEACON_VALIDATORS"
mev_relay_bid_trace_builder_block_submission: .event.name == "MEV_RELAY_BID_TRACE_BUILDER_BLOCK_SUBMISSION"
mev_relay_proposer_payload_delivered: .event.name == "MEV_RELAY_PROPOSER_PAYLOAD_DELIVERED"
sinks:
metrics:
type: prometheus_exporter
Expand Down Expand Up @@ -889,4 +890,20 @@ sinks:
healthcheck:
enabled: true
encoding:
codec: json
codec: json
mev_relay_proposer_payload_delivered_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.mev_relay_proposer_payload_delivered
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: mev-relay-proposer-payload-delivered
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
109 changes: 106 additions & 3 deletions deploy/local/docker-compose/vector-kafka-clickhouse.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ transforms:
mempool_transaction_v2: .event.name == "MEMPOOL_TRANSACTION_V2"
mempool_transaction: .event.name == "MEMPOOL_TRANSACTION"
mev_relay_bid_trace_builder_block_submission: .event.name == "MEV_RELAY_BID_TRACE_BUILDER_BLOCK_SUBMISSION"
mev_relay_proposer_payload_delivered: .event.name == "MEV_RELAY_PROPOSER_PAYLOAD_DELIVERED"
xatu_server_events_router_matched:
type: log_to_metric
inputs:
Expand Down Expand Up @@ -394,6 +395,7 @@ transforms:
- xatu_server_events_router.mempool_transaction
- xatu_server_events_router.mempool_transaction_v2
- xatu_server_events_router.mev_relay_bid_trace_builder_block_submission
- xatu_server_events_router.mev_relay_proposer_payload_delivered
metrics:
- type: counter
field: event.name
Expand All @@ -414,6 +416,7 @@ transforms:
tags:
event: "{{event.name}}"
source: "xatu-kafka-clickhouse"

beacon_api_eth_v1_beacon_committee_formatted:
type: remap
inputs:
Expand Down Expand Up @@ -448,7 +451,6 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.updated_date_time = to_unix_timestamp(now())

del(.event)
del(.meta)
del(.data)
Expand Down Expand Up @@ -1620,7 +1622,7 @@ transforms:
del(.event)
del(.meta)
del(.data)
canonical_beacon_block_elatorated_attestation_formatted:
canonical_beacon_block_elaborated_attestation_formatted:
type: remap
inputs:
- xatu_server_events_router.canonical_beacon_block_elaborated_attestation
Expand Down Expand Up @@ -1837,6 +1839,87 @@ transforms:
del(.meta_consensus_implementation)
del(.meta_network_id)

del(.event)
del(.meta)
del(.data)
mev_relay_proposer_payload_delivered_formatted:
type: remap
inputs:
- xatu_server_events_router.mev_relay_proposer_payload_delivered
source: |-
event_date_time, err = parse_timestamp(.event.date_time, format: "%+");
if err == null {
.event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds")
} else {
.error = err
.error_description = "failed to parse event date time"
log(., level: "error", rate_limit_secs: 60)
}

.slot = .data.slot
.block_number = .data.block_number
.payload = .data.payload

slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.slot.start_date_time, format: "%+");
if err == null {
.slot_start_date_time = to_unix_timestamp(slot_start_date_time)
} else {
.error = err
.error_description = "failed to parse slot start date time"
log(., level: "error", rate_limit_secs: 60)
}
.epoch = .meta.client.additional_data.epoch.number

epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+");
if err == null {
.epoch_start_date_time = to_unix_timestamp(epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}

.wallclock_slot = .meta.client.additional_data.wallclock_slot.number
wallclock_slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.wallclock_slot.start_date_time, format: "%+");
if err == null {
.wallclock_slot_start_date_time = to_unix_timestamp(wallclock_slot_start_date_time)
} else {
.error = err
.error_description = "failed to parse wallclock slot start date time"
log(., level: "error", rate_limit_secs: 60)
}

.wallclock_epoch = .meta.client.additional_data.wallclock_epoch.number
wallclock_epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.wallclock_epoch.start_date_time, format: "%+");
if err == null {
.wallclock_epoch_start_date_time = to_unix_timestamp(wallclock_epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse wallclock epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}

.relay_name = .meta.client.additional_data.relay.name
.parent_hash = .data.parent_hash
.block_hash = .data.block_hash
.builder_pubkey = .data.builder_pubkey
.proposer_pubkey = .data.proposer_pubkey
.proposer_fee_recipient = .data.proposer_fee_recipient
.gas_limit = .data.gas_limit
.gas_used = .data.gas_used
.payload = .data.payload
.block_number = .data.block_number
.num_tx = .data.num_tx
.timestamp = .data.timestamp
.timestamp_ms = .data.timestamp_ms

.updated_date_time = to_unix_timestamp(now())


del(.meta_consensus_implementation)
del(.meta_network_id)


del(.event)
del(.meta)
del(.data)
Expand Down Expand Up @@ -2351,7 +2434,7 @@ sinks:
canonical_beacon_block_elaborated_attestation_clickhouse:
type: clickhouse
inputs:
- canonical_beacon_block_elatorated_attestation_formatted
- canonical_beacon_block_elaborated_attestation_formatted
database: default
endpoint: "${CLICKHOUSE_ENDPOINT}"
table: canonical_beacon_elaborated_attestation
Expand Down Expand Up @@ -2448,3 +2531,23 @@ sinks:
healthcheck:
enabled: true
skip_unknown_fields: true
mev_relay_proposer_payload_delivered_clickhouse:
type: clickhouse
inputs:
- mev_relay_proposer_payload_delivered_formatted
database: default
endpoint: "${CLICKHOUSE_ENDPOINT}"
table: mev_relay_proposer_payload_delivered
auth:
strategy: basic
user: "${CLICKHOUSE_USER}"
password: "${CLICKHOUSE_PASSWORD}"
batch:
max_bytes: 52428800
max_events: 200000
timeout_secs: 1
buffer:
max_events: 200000
healthcheck:
enabled: true
skip_unknown_fields: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS mev_relay_proposer_payload_delivered ON CLUSTER '{cluster}' SYNC;
DROP TABLE IF EXISTS mev_relay_proposer_payload_delivered_local ON CLUSTER '{cluster}' SYNC;
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
CREATE TABLE default.mev_relay_proposer_payload_delivered_local ON CLUSTER '{cluster}' (
`updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)),
`event_date_time` DateTime64(3) COMMENT 'When the payload was delivered' CODEC(DoubleDelta, ZSTD(1)),
`slot` UInt32 COMMENT 'Slot number within the payload' CODEC(DoubleDelta, ZSTD(1)),
`slot_start_date_time` DateTime COMMENT 'The start time for the slot that the bid is for' CODEC(DoubleDelta, ZSTD(1)),
`epoch` UInt32 COMMENT 'Epoch number derived from the slot that the bid is for' CODEC(DoubleDelta, ZSTD(1)),
`epoch_start_date_time` DateTime COMMENT 'The start time for the epoch that the bid is for' CODEC(DoubleDelta, ZSTD(1)),
`wallclock_slot` UInt32 COMMENT 'The wallclock slot when the request was sent' CODEC(DoubleDelta, ZSTD(1)),
`wallclock_slot_start_date_time` DateTime COMMENT 'The start time for the slot when the request was sent' CODEC(DoubleDelta, ZSTD(1)),
`wallclock_epoch` UInt32 COMMENT 'The wallclock epoch when the request was sent' CODEC(DoubleDelta, ZSTD(1)),
`wallclock_epoch_start_date_time` DateTime COMMENT 'The start time for the wallclock epoch when the request was sent' CODEC(DoubleDelta, ZSTD(1)),
`block_number` UInt64 COMMENT 'The block number of the payload' CODEC(DoubleDelta, ZSTD(1)),
`relay_name` String COMMENT 'The relay that delivered the payload' CODEC(ZSTD(1)),
`block_hash` FixedString(66) COMMENT 'The block hash associated with the payload' CODEC(ZSTD(1)),
`proposer_pubkey` String COMMENT 'The proposer pubkey that received the payload' CODEC(ZSTD(1)),
`builder_pubkey` String COMMENT 'The builder pubkey that sent the payload' CODEC(ZSTD(1)),
`proposer_fee_recipient` FixedString(42) COMMENT 'The proposer fee recipient of the payload' CODEC(ZSTD(1)),
`gas_limit` UInt64 COMMENT 'The gas limit of the payload' CODEC(DoubleDelta, ZSTD(1)),
`gas_used` UInt64 COMMENT 'The gas used by the payload' CODEC(DoubleDelta, ZSTD(1)),
`num_tx` UInt32 COMMENT 'The number of transactions in the payload' CODEC(DoubleDelta, ZSTD(1)),
`meta_client_name` LowCardinality(String) COMMENT 'Name of the client that generated the event',
`meta_client_id` String COMMENT 'Unique Session ID of the client that generated the event. This changes every time the client is restarted.' CODEC(ZSTD(1)),
`meta_client_version` LowCardinality(String) COMMENT 'Version of the client that generated the event',
`meta_client_implementation` LowCardinality(String) COMMENT 'Implementation of the client that generated the event',
`meta_client_os` LowCardinality(String) COMMENT 'Operating system of the client that generated the event',
`meta_client_ip` Nullable(IPv6) COMMENT 'IP address of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_city` LowCardinality(String) COMMENT 'City of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_country` LowCardinality(String) COMMENT 'Country of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_country_code` LowCardinality(String) COMMENT 'Country code of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_continent_code` LowCardinality(String) COMMENT 'Continent code of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_longitude` Nullable(Float64) COMMENT 'Longitude of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_latitude` Nullable(Float64) COMMENT 'Latitude of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_autonomous_system_number` Nullable(UInt32) COMMENT 'Autonomous system number of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_autonomous_system_organization` Nullable(String) COMMENT 'Autonomous system organization of the client that generated the event' CODEC(ZSTD(1)),
`meta_network_name` LowCardinality(String) COMMENT 'Ethereum network name',
`meta_labels` Map(String, String) COMMENT 'Labels associated with the event' CODEC(ZSTD(1))
) ENGINE = ReplicatedReplacingMergeTree(
'/clickhouse/{installation}/{cluster}/{database}/tables/{table}/{shard}',
'{replica}',
updated_date_time
) PARTITION BY toStartOfMonth(slot_start_date_time)
ORDER BY
(
slot_start_date_time,
meta_network_name,
relay_name,
block_hash,
meta_client_name,
builder_pubkey,
proposer_pubkey,
) COMMENT 'Contains MEV relay proposer payload delivered data.';

CREATE TABLE default.mev_relay_proposer_payload_delivered ON CLUSTER '{cluster}' AS default.mev_relay_proposer_payload_delivered_local ENGINE = Distributed(
'{cluster}',
default,
mev_relay_proposer_payload_delivered_local,
cityHash64(
slot,
meta_network_name
)
);
3 changes: 2 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ services:
"libp2p-trace-gossipsub-blob-sidecar"
"beacon-api-eth-v1-beacon-validators"
"mev-relay-bid-trace-builder-block-submission"
"mev-relay-proposer-payload-delivered"
)
for topic in "$${topics[@]}"; do
echo "Creating topic: $$topic";
Expand Down Expand Up @@ -514,7 +515,7 @@ services:
image: nginx:1.27.1-bookworm
container_name: xatu-nginx
ports:
- "${NGINX_ADDRESS:-127.0.0.1}:${NGINX_PORT:-80}:80"
- "${NGINX_ADDRESS:-127.0.0.1}:${NGINX_PORT:-8044}:80"
environment:
- BASE_HOSTNAME=${BASE_HOSTNAME:-example.com}
volumes:
Expand Down
Loading
Loading