Skip to content

Commit

Permalink
refactor: Improve indexer errors and file structure
Browse files Browse the repository at this point in the history
  • Loading branch information
luis-herasme committed Jul 26, 2024
1 parent 0185944 commit 441692d
Show file tree
Hide file tree
Showing 14 changed files with 74 additions and 77 deletions.
8 changes: 4 additions & 4 deletions ghost-crab/src/block_handler.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::cache::manager::CacheProvider;
use crate::indexer::rpc_manager::Provider;
use crate::indexer::templates::TemplateManager;
use crate::latest_block_manager::LatestBlockManager;
use alloy::providers::Provider;
use alloy::providers::Provider as AlloyProvider;
use alloy::rpc::types::eth::Block;
use alloy::rpc::types::eth::BlockNumberOrTag;
use alloy::transports::TransportError;
Expand All @@ -12,7 +12,7 @@ use std::sync::Arc;
use std::time::Duration;

pub struct BlockContext {
pub provider: CacheProvider,
pub provider: Provider,
pub templates: TemplateManager,
pub block_number: u64,
}
Expand All @@ -35,7 +35,7 @@ pub trait BlockHandler {
pub struct ProcessBlocksInput {
pub handler: BlockHandlerInstance,
pub templates: TemplateManager,
pub provider: CacheProvider,
pub provider: Provider,
pub config: BlockHandlerConfig,
}

Expand Down
3 changes: 0 additions & 3 deletions ghost-crab/src/cache/mod.rs

This file was deleted.

8 changes: 4 additions & 4 deletions ghost-crab/src/event_handler.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::cache::manager::CacheProvider;
use crate::indexer::rpc_manager::Provider;
use crate::indexer::templates::TemplateManager;
use crate::latest_block_manager::LatestBlockManager;
use alloy::eips::BlockNumberOrTag;
use alloy::primitives::Address;
use alloy::providers::Provider;
use alloy::providers::Provider as AlloyProvider;
use alloy::rpc::types::eth::Filter;
use alloy::rpc::types::eth::Log;
use alloy::rpc::types::Block;
Expand All @@ -15,7 +15,7 @@ use std::time::Duration;

pub struct EventContext {
pub log: Log,
pub provider: CacheProvider,
pub provider: Provider,
pub templates: TemplateManager,
pub contract_address: Address,
}
Expand Down Expand Up @@ -49,7 +49,7 @@ pub struct ProcessEventsInput {
pub step: u64,
pub handler: EventHandlerInstance,
pub templates: TemplateManager,
pub provider: CacheProvider,
pub provider: Provider,
pub execution_mode: ExecutionMode,
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
use super::error::{Error, Result};
use rocksdb::DB;

#[derive(Debug)]
pub enum Error {
DB(rocksdb::Error),
CacheFileNotFound(std::io::Error),
}

type Result<T> = core::result::Result<T, Error>;

pub fn load_cache(network: &str) -> Result<DB> {
let current_dir = std::env::current_dir().map_err(|e| Error::CacheFileNotFound(e))?;
let cache_path = current_dir.join("cache").join(network);
Expand Down
36 changes: 25 additions & 11 deletions ghost-crab/src/indexer/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,40 @@ use alloy::hex::FromHexError;
use core::fmt;

#[derive(Debug)]
pub enum AddHandlerError {
pub enum Error {
NotFound(String),
DB(rocksdb::Error),
NetworkNotFound(String),
InvalidAddress { address: String, error: FromHexError },
InvalidAddress(FromHexError),
CacheFileNotFound(std::io::Error),
InvalidRpcUrl(Box<dyn std::error::Error>),
}

impl fmt::Display for AddHandlerError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
pub type Result<T> = core::result::Result<T, Error>;

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
AddHandlerError::NotFound(handler) => {
write!(f, "Handler not found: {}", handler)
Error::NotFound(handler) => {
writeln!(f, "Handler not found: {}", handler)
}
Error::DB(error) => {
writeln!(f, "Error while loading cache: {}", error)
}
Error::NetworkNotFound(network) => {
writeln!(f, "Network not found: {}", network)
}
Error::InvalidAddress(error) => {
writeln!(f, "Invalid address: {}", error)
}
AddHandlerError::NetworkNotFound(network) => {
write!(f, "Network not found: {}", network)
Error::CacheFileNotFound(error) => {
writeln!(f, "Cache file not found: {}", error)
}
AddHandlerError::InvalidAddress { address, error } => {
write!(f, "Invalid address: {}.\nError: {}", address, error)
Error::InvalidRpcUrl(error) => {
writeln!(f, "Invalid RPC url: {}", error)
}
}
}
}

impl std::error::Error for AddHandlerError {}
impl std::error::Error for Error {}
37 changes: 15 additions & 22 deletions ghost-crab/src/indexer/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use super::rpc_manager::{Provider, RPCManager};
use crate::block_handler::{process_blocks, BlockHandlerInstance, ProcessBlocksInput};
use crate::cache::manager::{CacheProvider, RPCManager};
use crate::event_handler::{process_events, EventHandlerInstance, ProcessEventsInput};

use alloy::primitives::Address;
use ghost_crab_common::config::{self, Config, ConfigError};
use tokio::sync::mpsc::{self, Receiver};

use super::error::AddHandlerError;
use super::error::{Error, Result};
use super::templates::{Template, TemplateManager};

pub struct Indexer {
Expand All @@ -19,8 +19,8 @@ pub struct Indexer {
}

impl Indexer {
pub fn new() -> Result<Indexer, ConfigError> {
let (tx, rx) = mpsc::channel::<Template>(1);
pub fn new() -> core::result::Result<Indexer, ConfigError> {
let (tx, rx) = mpsc::channel::<Template>(100);

let config = config::load()?;

Expand All @@ -34,21 +34,17 @@ impl Indexer {
})
}

pub async fn load_event_handler(
&mut self,
handler: EventHandlerInstance,
) -> Result<(), AddHandlerError> {
pub async fn load_event_handler(&mut self, handler: EventHandlerInstance) -> Result<()> {
let event_config = self
.config
.data_sources
.remove(&handler.name())
.ok_or(AddHandlerError::NotFound(handler.name()))?;
.ok_or(Error::NotFound(handler.name()))?;

let provider = self.get_provider(&event_config.network).await?;

let address = str::parse::<Address>(&event_config.address).map_err(|error| {
AddHandlerError::InvalidAddress { address: event_config.address, error }
})?;
let address = str::parse::<Address>(&event_config.address)
.map_err(|error| Error::InvalidAddress(error))?;

self.handlers.push(ProcessEventsInput {
start_block: event_config.start_block,
Expand All @@ -63,15 +59,12 @@ impl Indexer {
Ok(())
}

pub async fn load_block_handler(
&mut self,
handler: BlockHandlerInstance,
) -> Result<(), AddHandlerError> {
pub async fn load_block_handler(&mut self, handler: BlockHandlerInstance) -> Result<()> {
let block_config = self
.config
.block_handlers
.remove(&handler.name())
.ok_or(AddHandlerError::NotFound(handler.name()))?;
.ok_or(Error::NotFound(handler.name()))?;

let provider = self.get_provider(&block_config.network).await?;

Expand All @@ -85,12 +78,12 @@ impl Indexer {
Ok(())
}

async fn get_provider(&mut self, network_name: &str) -> Result<CacheProvider, AddHandlerError> {
async fn get_provider(&mut self, network_name: &str) -> Result<Provider> {
let network = self
.config
.networks
.get(network_name)
.ok_or(AddHandlerError::NetworkNotFound(network_name.to_string()))?;
.ok_or(Error::NetworkNotFound(network_name.to_string()))?;

let provider = self
.rpc_manager
Expand All @@ -99,12 +92,12 @@ impl Indexer {
network.rpc_url.clone(),
network.requests_per_second,
)
.await;
.await?;

Ok(provider)
}

pub async fn start(mut self) -> Result<(), AddHandlerError> {
pub async fn start(mut self) -> Result<()> {
for block_handler in self.block_handlers.clone() {
tokio::spawn(async move {
if let Err(error) = process_blocks(block_handler).await {
Expand All @@ -127,7 +120,7 @@ impl Indexer {
.config
.templates
.get(&template.handler.name())
.ok_or(AddHandlerError::NotFound(template.handler.name()))?;
.ok_or(Error::NotFound(template.handler.name()))?;

let execution_mode = config.execution_mode.unwrap_or(config::ExecutionMode::Parallel);
let provider = self.get_provider(&config.network.clone()).await?;
Expand Down
2 changes: 2 additions & 0 deletions ghost-crab/src/indexer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod cache;
pub mod error;
pub mod indexer;
pub mod rpc_manager;
pub mod templates;
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
use super::cache::load_cache;
use super::error::{Error, Result};
use crate::layers::cache_layer::CacheLayer;
use crate::layers::cache_layer::CacheService;
use crate::layers::rate_limit_layer::RateLimit;
use crate::layers::rate_limit_layer::RateLimitLayer;
use alloy::providers::ProviderBuilder;
use alloy::providers::RootProvider;
use alloy::rpc::client::ClientBuilder;
use alloy::transports::http::reqwest::Url;
use alloy::transports::http::{Client, Http};
use std::collections::HashMap;
use std::time::Duration;

use crate::rate_limit::RateLimit;
use crate::rate_limit::RateLimitLayer;

use super::cache::load_cache;
use super::cache_layer::CacheLayer;
use super::cache_layer::CacheService;

pub type CacheProvider = RootProvider<CacheService<RateLimit<Http<Client>>>>;
pub type Provider = RootProvider<CacheService<RateLimit<Http<Client>>>>;

pub struct RPCManager {
rpcs: HashMap<String, CacheProvider>,
rpcs: HashMap<String, Provider>,
}

impl RPCManager {
Expand All @@ -28,24 +28,22 @@ impl RPCManager {
network: String,
rpc_url: String,
rate_limit: u64,
) -> CacheProvider {
) -> Result<Provider> {
if let Some(provider) = self.rpcs.get(&rpc_url) {
return provider.clone();
return Ok(provider.clone());
}

let cache = load_cache(&network).unwrap();
let url = Url::parse(&rpc_url).map_err(|e| Error::InvalidRpcUrl(Box::new(e)))?;
let cache = load_cache(&network)?;

let cache_layer = CacheLayer::new(cache);
let rate_limit_layer = RateLimitLayer::new(rate_limit, Duration::from_secs(1));

let client = ClientBuilder::default()
.layer(cache_layer)
.layer(rate_limit_layer)
.http(rpc_url.parse().unwrap());

let client = ClientBuilder::default().layer(cache_layer).layer(rate_limit_layer).http(url);
let provider = ProviderBuilder::new().on_client(client);

self.rpcs.insert(rpc_url.clone(), provider.clone());

provider
Ok(provider)
}
}
8 changes: 4 additions & 4 deletions ghost-crab/src/latest_block_manager.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use alloy::providers::Provider;
use alloy::providers::Provider as AlloyProvider;
use alloy::transports::TransportError;
use std::time::{Duration, Instant};

use crate::cache::manager::CacheProvider;
use crate::indexer::rpc_manager::Provider;

pub struct LatestBlockManager {
provider: CacheProvider,
provider: Provider,
cache_duration: Duration,
block_number: Option<u64>,
last_fetch: Instant,
}

impl LatestBlockManager {
pub fn new(provider: CacheProvider, cache_duration: Duration) -> Self {
pub fn new(provider: Provider, cache_duration: Duration) -> Self {
Self { provider, cache_duration, block_number: None, last_fetch: Instant::now() }
}

Expand Down
File renamed without changes.
2 changes: 2 additions & 0 deletions ghost-crab/src/layers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod cache_layer;
pub mod rate_limit_layer;
File renamed without changes.
3 changes: 1 addition & 2 deletions ghost-crab/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
pub mod block_handler;
pub mod cache;
pub mod event_handler;
pub mod indexer;
pub mod prelude;
Expand All @@ -8,4 +7,4 @@ pub use ghost_crab_common::config;
pub use indexer::indexer::Indexer;

mod latest_block_manager;
mod rate_limit;
mod layers;
1 change: 0 additions & 1 deletion ghost-crab/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ pub use std::sync::Arc;
pub use tokio;

pub use crate::block_handler::{BlockContext, BlockHandler};
pub use crate::cache;
pub use crate::config;
pub use crate::indexer;
pub use crate::indexer::templates::Template;
Expand Down

0 comments on commit 441692d

Please sign in to comment.