Skip to content

Commit

Permalink
refactor: Refactor "get_provider"
Browse files Browse the repository at this point in the history
  • Loading branch information
luis-herasme committed Jul 26, 2024
1 parent eb2c7ef commit c8ec74f
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 46 deletions.
1 change: 1 addition & 0 deletions ghost-crab/src/block_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub trait BlockHandler {
fn name(&self) -> String;
}

#[derive(Clone)]
pub struct ProcessBlocksInput {
pub handler: BlockHandlerInstance,
pub templates: TemplateManager,
Expand Down
71 changes: 25 additions & 46 deletions ghost-crab/src/indexer/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::block_handler::{process_blocks, BlockHandlerInstance, ProcessBlocksInput};
use crate::cache::manager::RPCManager;
use crate::cache::manager::{CacheProvider, RPCManager};
use crate::event_handler::{process_events, EventHandlerInstance, ProcessEventsInput};

use ghost_crab_common::config::{self, Config, ConfigError};
Expand Down Expand Up @@ -43,20 +43,7 @@ impl Indexer {
.remove(&handler.name())
.ok_or(AddHandlerError::NotFound(handler.name()))?;

let network = self
.config
.networks
.get(&event_config.network)
.ok_or(AddHandlerError::NetworkNotFound(event_config.network.clone()))?;

let provider = self
.rpc_manager
.get_or_create(
event_config.network,
network.rpc_url.clone(),
network.requests_per_second,
)
.await;
let provider = self.get_provider(&event_config.network).await?;

self.handlers.push(ProcessEventsInput {
start_block: event_config.start_block,
Expand All @@ -81,41 +68,47 @@ impl Indexer {
.remove(&handler.name())
.ok_or(AddHandlerError::NotFound(handler.name()))?;

let provider = self.get_provider(&block_config.network).await?;

self.block_handlers.push(ProcessBlocksInput {
handler,
templates: self.templates.clone(),
provider,
config: block_config,
});

Ok(())
}

async fn get_provider(&mut self, network_name: &str) -> Result<CacheProvider, AddHandlerError> {
let network = self
.config
.networks
.get(&block_config.network)
.ok_or(AddHandlerError::NetworkNotFound(block_config.network.clone()))?;
.get(network_name)
.ok_or(AddHandlerError::NetworkNotFound(network_name.to_string()))?;

let provider = self
.rpc_manager
.get_or_create(
block_config.network.clone(),
network_name.to_string(),
network.rpc_url.clone(),
network.requests_per_second,
)
.await;

self.block_handlers.push(ProcessBlocksInput {
handler,
templates: self.templates.clone(),
provider,
config: block_config,
});

Ok(())
Ok(provider)
}

pub async fn start(mut self) -> Result<(), AddHandlerError> {
for block_handler in self.block_handlers {
for block_handler in self.block_handlers.clone() {
tokio::spawn(async move {
if let Err(error) = process_blocks(block_handler).await {
println!("Error processing logs for block handler: {error}");
}
});
}

for handler in self.handlers {
for handler in self.handlers.clone() {
tokio::spawn(async move {
if let Err(error) = process_events(handler).await {
println!("Error processing logs for handler: {error}");
Expand All @@ -125,26 +118,14 @@ impl Indexer {

// For dynamic sources (Templates)
while let Some(template) = self.rx.recv().await {
let template_config = self
let config = self
.config
.templates
.get(&template.handler.name())
.ok_or(AddHandlerError::NotFound(template.handler.name()))?;

let network = self
.config
.networks
.get(&template_config.network)
.ok_or(AddHandlerError::NetworkNotFound(template_config.network.clone()))?;

let provider = self
.rpc_manager
.get_or_create(
template_config.network.clone(),
network.rpc_url.clone(),
network.requests_per_second,
)
.await;
let execution_mode = config.execution_mode.unwrap_or(config::ExecutionMode::Parallel);
let provider = self.get_provider(&config.network.clone()).await?;

let handler = ProcessEventsInput {
start_block: template.start_block,
Expand All @@ -153,9 +134,7 @@ impl Indexer {
handler: template.handler,
templates: self.templates.clone(),
provider,
execution_mode: template_config
.execution_mode
.unwrap_or(config::ExecutionMode::Parallel),
execution_mode,
};

tokio::spawn(async move {
Expand Down

0 comments on commit c8ec74f

Please sign in to comment.