Skip to content

Commit

Permalink
Refactor client run
Browse files Browse the repository at this point in the history
  • Loading branch information
coa-telos committed Sep 30, 2024
1 parent 7426a0d commit 828163e
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 84 deletions.
126 changes: 43 additions & 83 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ impl ConsensusClient {

pub async fn run(mut self, mut rx: mpsc::Receiver<TelosEVMBlock>) -> Result<(), Error> {
let mut batch = vec![];
let mut lib: Option<data::Block> = self.db.get_lib()?;
let block_delta = self.config.chain_id.block_delta();
let chain_id = &self.config.chain_id;
let mut lib: data::Block = self.db.get_lib()?.unwrap_or_default();
loop {
let message = tokio::select! {
message = rx.recv() => message,
Expand All @@ -131,113 +131,73 @@ impl ConsensusClient {
break;
};

let block_num = block.block_num.as_u64();
let block_hash = block.block_hash;
let block_lib_num = block.lib_num;
if block_num % self.config.block_checkpoint_interval.as_u64() != 0 {
self.db.put_block(From::from(&block))?;
debug!("Block {} put in the database", block.block_num);
}
let block_num = block.block_num;

if block_num % self.config.block_checkpoint_interval.as_u64() == 0 {
self.db.put_block(From::from(&block))?;
debug!("Block {} put in the database", block.block_num);
}
self.db.put_block(From::from(&block))?;
debug!("Block {block_num} put in the database");

let latest_start: u32 = block_num
.saturating_sub(self.config.latest_blocks_in_db_num.into())
.as_u32();
let latest_start = block_num.saturating_sub(self.config.latest_blocks_in_db_num);

// Keep latest blocks and every nth block
if latest_start > 0 && latest_start % self.config.block_checkpoint_interval != 0 {
self.db.delete_block(latest_start)?;
debug!("Block {} delete from the database", latest_start);
debug!("Block {latest_start} deleted from the database");
}

let mut is_new_lib = false;
if lib
.as_ref()
.map(|lib| lib.number < block_lib_num)
.unwrap_or(true)
{
is_new_lib = true;
lib = Some(From::from(Lib(&block)));
self.db.put_lib(From::from(Lib(&block)))?;
info!(
"LIB {} put in the database hash {}",
block.lib_num, block.lib_hash
);
// NOTE: Case when new lib < current one is not supported
let is_new_lib = lib.number != block.lib_num;

if is_new_lib {
let new_lib = Lib(&block);
self.db.put_lib(Lib(&block).into())?;
info!("LIB {new_lib:?} put in the database");
lib = new_lib.into();
}

if let Some((latest_num, latest_hash)) = &self.latest_evm_block() {
if let Some((latest_evm_num, latest_evm_hash)) = self.latest_evm_block() {
// Check fork
if block_num == latest_num.as_u64() && &block_hash.to_string() != latest_hash {
error!("Fork detected! Latest executor block hash {latest_num:?} does not match consensus block hash {block_num:?}" );
if block_num == latest_evm_num && block.block_hash.to_string() != latest_evm_hash {
error!("Fork detected! Latest executor block hash {latest_evm_num:?} does not match consensus block hash {block_num:?}");
return Err(Error::ExecutorHashMismatch);
}

// Skip synced blocks
if block_num <= latest_num.as_u64() {
debug!("Block {block_num} skipped as its behind {latest_num} evm block");
if block_num <= latest_evm_num {
debug!("Block {block_num} skipped as its behind {latest_evm_num} evm block");
continue;
}
}

let block_hash = block.block_hash;
let block_is_final = block.is_final(chain_id);
let block_is_lib = block.is_lib(chain_id);
let lib_evm_num = block.lib_evm_num(chain_id);

batch.push(block);
// check if we caught up to head
// if lib is greater than current block send in batches
// if lib is less than current block batch size is 1 or more blocks
let block_num_with_delta = block_num.as_u32() + block_delta;
let flush = match lib.as_ref() {
Some(lib) if lib.number <= block_num_with_delta => true,
// Some(lib) if lib.number > block_num_with_delta && (lib.number - block_num_with_delta < self.config.batch_size.as_u32()) => true,
_ => batch.len() == self.config.batch_size,
};

// if LIB is less or equal than current block batch size is 1 or more blocks
// if LIB is greater than current block send in batches
let flush = !block_is_final || block_is_lib || batch.len() == self.config.batch_size;

if !flush {
info!(
"Added block {} lib {}",
block_num + block_delta.as_u64(),
lib.clone().unwrap().number
);
continue;
}
info!(
"Batch size {} latest lib {}, block num {} lib hash {}",
batch.len(),
lib.clone().unwrap().number,
block_num + block_delta.as_u64(),
lib.clone().unwrap().hash
);
batch.iter().for_each(|block| {
info!(
"number {:?} {:?}",
block.block_num + block_delta,
block.block_hash
);
});

let lib_num = lib.as_ref().map(|lib| lib.number);
let is_block_final = block_lib_num >= block_num_with_delta;

match lib_num.as_ref() {
None => debug!("Default finalized hash is the last one in the batch"),
Some(_) if is_block_final => debug!("Synced to head, LIB < current block"),
Some(_) if is_new_lib => debug!("New LIB is detected"),
Some(_) => debug!("Synced to head, LIB is unchanged"),
};

let finalized_hash = match lib_num {
None => Some(block_hash),
Some(_) if is_block_final => Some(block_hash),
// if lib hash has been changed we should send finalized hash for fc update
Some(_) if is_new_lib => self
.db
.get_block_or_prev(block_lib_num - block_delta)?
.map(|block| block.hash.parse().unwrap()),
Some(_) => None,
let finalized_hash = if block_is_final {
debug!("Synced to head, LIB < current block");
Some(block_hash)
} else if is_new_lib {
// if lib hash has been changed we should send finalized hash for fork choice update
debug!("New LIB is detected");
self.db
.get_block_or_prev(lib_evm_num)?
.map(|block| block.hash.parse().unwrap())
} else {
debug!("Synced to head, LIB is unchanged");
None
};

debug!("Send batch fc {:?}", finalized_hash);
debug!("Send batch finalized hash: {finalized_hash:?}",);
self.send_batch(&batch, finalized_hash).await?;
batch.clear();
}
Expand Down
16 changes: 16 additions & 0 deletions client/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use alloy::primitives::B256;
use eyre::{eyre, Context};
use rocksdb::{DBWithThreadMode, SingleThreaded, DB};
use serde::{Deserialize, Serialize};
use std::fmt::{self, Debug, Display};

Check warning on line 5 in client/src/data.rs

View workflow job for this annotation

GitHub Actions / Check

unused import: `Display`
use std::io::Read;

Check warning on line 6 in client/src/data.rs

View workflow job for this annotation

GitHub Actions / Check

unused import: `std::io::Read`
use std::str::FromStr;

Check warning on line 7 in client/src/data.rs

View workflow job for this annotation

GitHub Actions / Check

unused import: `std::str::FromStr`
use std::{fs, path::Path, sync::Arc};
Expand All @@ -17,8 +18,23 @@ pub struct Block {
pub hash: String,
}

impl Default for Block {
fn default() -> Self {
Self {
number: 0,
hash: Default::default(),
}
}
}

pub struct Lib<'a>(pub &'a TelosEVMBlock);

impl Debug for Lib<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "number: {}, hash: {}", self.0.lib_num, self.0.lib_hash)
}
}

impl From<&TelosEVMBlock> for Block {
fn from(value: &TelosEVMBlock) -> Self {
Block {
Expand Down
20 changes: 19 additions & 1 deletion translator/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::types::names::*;
use crate::types::ship_types::{
ActionTrace, ContractRow, GetBlocksResultV0, SignedBlock, TableDelta, TransactionTrace,
};
use crate::types::translator_types::NameToAddressCache;
use crate::types::translator_types::{ChainId, NameToAddressCache};
use alloy::primitives::{Bloom, Bytes, FixedBytes, B256, U256};
use alloy_consensus::constants::{EMPTY_OMMER_ROOT_HASH, EMPTY_ROOT_HASH};
use alloy_consensus::{Header, TxEnvelope};
Expand Down Expand Up @@ -115,6 +115,24 @@ pub struct TelosEVMBlock {
pub extra_fields: TelosEngineAPIExtraFields,
}

impl TelosEVMBlock {
pub fn lib_evm_num(&self, chain_id: &ChainId) -> u32 {
self.lib_num.saturating_sub(chain_id.block_delta())
}

pub fn block_num_with_delta(&self, chain_id: &ChainId) -> u32 {
self.block_num + chain_id.block_delta()
}

pub fn is_final(&self, chain_id: &ChainId) -> bool {
self.block_num_with_delta(chain_id) <= self.lib_num
}

pub fn is_lib(&self, chain_id: &ChainId) -> bool {
self.block_num_with_delta(chain_id) == self.lib_num
}
}

pub fn decode<T: Packer + Default>(raw: &[u8]) -> T {
let mut result = T::default();
result.unpack(raw);
Expand Down

0 comments on commit 828163e

Please sign in to comment.