diff --git a/README.md b/README.md index 7112dd31d..6092f692b 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ With QueryApi you can ## 🧩 Components 1. [QueryApi Coordinator](./indexer) An Indexer that tracks changes to the QueryApi registry contract. It triggers the execution of those IndexerFunctions -when they match new blocks by placing messages on an SQS queue. Spawns historical processing threads when needed. +when they match new blocks by placing messages on a Redis Stream. Spawns historical processing threads when needed. 1.a. Subfolders provide crates for the different components of the Indexer: indexer_rule_type (shared with registry contract), indexer_rules_engine, storage. 2. [Indexer Runner](.indexer-js-queue-handler) @@ -70,7 +70,6 @@ docker compose up ### Local Configuration - Coordinator watches the dev registry contract by default (`dev-queryapi.dataplatform.near`). To use a different contract, you can update the `REGISTRY_CONTRACT_ID` environment variable. -- Coodinator will log SQS messages rather than sending them. To use an actual Queue, you can update the `QUEUE_URL` and `START_FROM_BLOCK_QUEUE_URL` environment variables. ### Known Issues diff --git a/indexer/Cargo.lock b/indexer/Cargo.lock index dede1fcb7..ee1cdfeb4 100644 --- a/indexer/Cargo.lock +++ b/indexer/Cargo.lock @@ -507,33 +507,6 @@ dependencies = [ "url", ] -[[package]] -name = "aws-sdk-sqs" -version = "0.23.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffe1f563e227905539d5d1514e93a4c4e096366e1325ab24646783a3d6fe2c45" -dependencies = [ - "aws-credential-types", - "aws-endpoint", - "aws-http", - "aws-sig-auth", - "aws-smithy-async", - "aws-smithy-client", - "aws-smithy-http", - "aws-smithy-http-tower", - "aws-smithy-json", - "aws-smithy-query", - "aws-smithy-types", - "aws-smithy-xml", - "aws-types", - "bytes", - "http", - "regex", - "tokio-stream", - "tower", - "url", -] - [[package]] name = "aws-sdk-sso" version = "0.23.0" @@ -3353,7 +3326,6 @@ dependencies = [ "anyhow", "aws-credential-types", "aws-sdk-s3", - "aws-sdk-sqs", "aws-types", "base64 0.13.1", "borsh 0.10.3", @@ -3377,6 +3349,7 @@ dependencies = [ "storage", "tokio", "tokio-stream", + "tokio-util 0.6.10", "tracing", "tracing-subscriber 0.2.25", "unescape", diff --git a/indexer/README.md b/indexer/README.md index 4c4c99890..948a6e0f5 100644 --- a/indexer/README.md +++ b/indexer/README.md @@ -30,8 +30,6 @@ This project is using `workspace` feature of Cargo. Some tests require blocks with matching data. To download the test block, run `./download_test_blocks.sh 93085141`. Some other useful blocks are 80854399 92476362 93085141 93659695. -To log a message instead of sending SQS messages set your `QUEUE_URL` to `MOCK` in your `.env` file. - ## Design concept Identified major types of the events on the network: @@ -46,9 +44,6 @@ Identified major types of the events on the network: DATABASE_URL=postgres://user:pass@host/database LAKE_AWS_ACCESS_KEY=AKI_LAKE_ACCESS... LAKE_AWS_SECRET_ACCESS_KEY=LAKE_SECRET... -QUEUE_AWS_ACCESS_KEY=AKI_SQS_ACCESS... -QUEUE_AWS_SECRET_ACCESS_KEY=SQS_ACCESS_SECRET -QUEUE_URL=https://sqs.eu-central-1.amazonaws.com/754641474505/alertexer-queue ``` ## Running locally diff --git a/indexer/indexer_rules_engine/src/matcher.rs b/indexer/indexer_rules_engine/src/matcher.rs index 662e51161..864422db1 100644 --- a/indexer/indexer_rules_engine/src/matcher.rs +++ b/indexer/indexer_rules_engine/src/matcher.rs @@ -115,8 +115,8 @@ fn match_account( outcome_with_receipt: &IndexerExecutionOutcomeWithReceipt, ) -> bool { match account_id { - x if x.contains(",") => x - .split(",") + x if x.contains(',') => x + .split(',') .any(|sub_account_id| match_account(sub_account_id.trim(), outcome_with_receipt)), _ => { wildmatch::WildMatch::new(account_id).matches(&outcome_with_receipt.receipt.receiver_id) diff --git a/indexer/queryapi_coordinator/Cargo.toml b/indexer/queryapi_coordinator/Cargo.toml index 01c1c99d4..b606c1b6a 100644 --- a/indexer/queryapi_coordinator/Cargo.toml +++ b/indexer/queryapi_coordinator/Cargo.toml @@ -17,6 +17,7 @@ prometheus = "0.13.0" serde = { version = "1", features = ["derive"] } serde_json = "1.0.55" tokio = { version = "1.1", features = ["sync", "time", "macros", "rt-multi-thread"] } +tokio-util = "0.6.7" tokio-stream = { version = "0.1" } tracing = "0.1.34" @@ -41,4 +42,3 @@ unescape = "0.1.0" aws-types = "0.53.0" aws-credential-types = "0.53.0" aws-sdk-s3 = "0.23.0" -aws-sdk-sqs = "0.23.0" diff --git a/indexer/queryapi_coordinator/README.md b/indexer/queryapi_coordinator/README.md index a267f4ab2..9cf8bd47f 100644 --- a/indexer/queryapi_coordinator/README.md +++ b/indexer/queryapi_coordinator/README.md @@ -17,4 +17,3 @@ see terraform scripts https://github.com/near/near-ops/tree/master/provisioning/ This app requires: * a connection to a database containing "alert" rules to match blocks against; * a redis server where identifiers of processed blocks are stored; - * a SQS queue to write to. \ No newline at end of file diff --git a/indexer/queryapi_coordinator/src/historical_block_processing.rs b/indexer/queryapi_coordinator/src/historical_block_processing.rs index 82b60b8c3..82118aea3 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing.rs @@ -1,15 +1,10 @@ -use crate::indexer_types::{IndexerFunction, IndexerQueueMessage}; -use crate::opts::{Opts, Parser}; -use crate::queue; +use crate::indexer_types::IndexerFunction; use crate::s3; use anyhow::{bail, Context}; use aws_sdk_s3::Client as S3Client; -use aws_sdk_s3::Config; -use aws_sdk_sqs::Client; -use aws_types::SdkConfig; use chrono::{DateTime, LocalResult, TimeZone, Utc}; use indexer_rule_type::indexer_rule::MatchingRule; -use indexer_rules_engine::types::indexer_rule_match::{ChainId, IndexerRuleMatchPayload}; +use indexer_rules_engine::types::indexer_rule_match::ChainId; use near_jsonrpc_client::JsonRpcClient; use near_jsonrpc_primitives::types::blocks::RpcBlockRequest; use near_lake_framework::near_indexer_primitives::types::{BlockHeight, BlockId, BlockReference}; @@ -22,37 +17,94 @@ pub const INDEXED_ACTIONS_FILES_FOLDER: &str = "silver/accounts/action_receipt_a pub const MAX_UNINDEXED_BLOCKS_TO_PROCESS: u64 = 7200; // two hours of blocks takes ~14 minutes. pub const MAX_RPC_BLOCKS_TO_PROCESS: u8 = 20; -pub fn spawn_historical_message_thread( - block_height: BlockHeight, - new_indexer_function: &IndexerFunction, - redis_connection_manager: &storage::ConnectionManager, -) -> Option> { - let redis_connection_manager = redis_connection_manager.clone(); - new_indexer_function.start_block_height.map(|_| { - let new_indexer_function_copy = new_indexer_function.clone(); - tokio::spawn(async move { - process_historical_messages_or_handle_error( - block_height, - new_indexer_function_copy, - Opts::parse(), - &redis_connection_manager, - ) - .await - }) - }) +pub struct Task { + handle: JoinHandle<()>, + cancellation_token: tokio_util::sync::CancellationToken, +} + +/// Represents the async task used to process and push historical messages +pub struct Streamer { + task: Option, +} + +impl Streamer { + pub fn new() -> Self { + Streamer { task: None } + } + + pub fn start( + &mut self, + current_block_height: BlockHeight, + indexer: IndexerFunction, + redis_connection_manager: storage::ConnectionManager, + s3_client: S3Client, + chain_id: ChainId, + json_rpc_client: JsonRpcClient, + ) -> anyhow::Result<()> { + if self.task.is_some() { + return Err(anyhow::anyhow!("Streamer has already been started",)); + } + + let cancellation_token = tokio_util::sync::CancellationToken::new(); + let cancellation_token_clone = cancellation_token.clone(); + + let handle = tokio::spawn(async move { + tokio::select! { + _ = cancellation_token_clone.cancelled() => { + tracing::info!( + target: crate::INDEXER, + "Cancelling existing historical backfill for indexer: {:?}", + indexer.get_full_name(), + ); + }, + _ = process_historical_messages_or_handle_error( + current_block_height, + indexer.clone(), + &redis_connection_manager, + &s3_client, + &chain_id, + &json_rpc_client, + ) => { } + } + }); + + self.task = Some(Task { + handle, + cancellation_token, + }); + + Ok(()) + } + + pub async fn cancel(&mut self) -> anyhow::Result<()> { + if let Some(task) = self.task.take() { + task.cancellation_token.cancel(); + task.handle.await?; + + return Ok(()); + } + + Err(anyhow::anyhow!( + "Attempted to cancel already cancelled, or not started, Streamer" + )) + } } pub(crate) async fn process_historical_messages_or_handle_error( - block_height: BlockHeight, + current_block_height: BlockHeight, indexer_function: IndexerFunction, - opts: Opts, redis_connection_manager: &storage::ConnectionManager, + s3_client: &S3Client, + chain_id: &ChainId, + json_rpc_client: &JsonRpcClient, ) -> i64 { match process_historical_messages( - block_height, + current_block_height, indexer_function, - opts, redis_connection_manager, + s3_client, + chain_id, + json_rpc_client, ) .await { @@ -69,13 +121,15 @@ pub(crate) async fn process_historical_messages_or_handle_error( } } pub(crate) async fn process_historical_messages( - block_height: BlockHeight, + current_block_height: BlockHeight, indexer_function: IndexerFunction, - opts: Opts, redis_connection_manager: &storage::ConnectionManager, + s3_client: &S3Client, + chain_id: &ChainId, + json_rpc_client: &JsonRpcClient, ) -> anyhow::Result { let start_block = indexer_function.start_block_height.unwrap(); - let block_difference: i64 = (block_height - start_block) as i64; + let block_difference: i64 = (current_block_height - start_block) as i64; match block_difference { i64::MIN..=-1 => { bail!("Skipping back fill, start_block_height is greater than current block height: {:?} {:?}", @@ -90,30 +144,20 @@ pub(crate) async fn process_historical_messages( 1..=i64::MAX => { tracing::info!( target: crate::INDEXER, - "Back filling {block_difference} blocks from {start_block} to current block height {block_height}: {:?} {:?}", + "Back filling {block_difference} blocks from {start_block} to current block height {current_block_height}: {:?} {:?}", indexer_function.account_id, indexer_function.function_name ); - let chain_id = opts.chain_id().clone(); - let aws_region = opts.aws_queue_region.clone(); - let queue_client = queue::queue_client(aws_region, opts.queue_credentials()); - let queue_url = opts.start_from_block_queue_url.clone(); - let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); - - let json_rpc_client = JsonRpcClient::connect(opts.rpc_url()); let start_date = - lookup_block_date_or_next_block_date(start_block, &json_rpc_client).await?; - - let mut indexer_function = indexer_function.clone(); + lookup_block_date_or_next_block_date(start_block, json_rpc_client).await?; - let last_indexed_block = last_indexed_block_from_metadata(aws_config).await?; - let last_indexed_block = last_indexed_block; + let last_indexed_block = last_indexed_block_from_metadata(s3_client).await?; let mut blocks_from_index = filter_matching_blocks_from_index_files( start_block, &indexer_function, - aws_config, + s3_client, start_date, ) .await?; @@ -129,18 +173,21 @@ pub(crate) async fn process_historical_messages( let mut blocks_between_indexed_and_current_block: Vec = filter_matching_unindexed_blocks_from_lake( last_indexed_block, - block_height, + current_block_height, &indexer_function, - aws_config, - chain_id.clone(), + s3_client, + chain_id, ) .await?; blocks_from_index.append(&mut blocks_between_indexed_and_current_block); - let first_block_in_index = *blocks_from_index.first().unwrap_or(&start_block); - if !blocks_from_index.is_empty() { + storage::del( + redis_connection_manager, + storage::generate_historical_stream_key(&indexer_function.get_full_name()), + ) + .await?; storage::sadd( redis_connection_manager, storage::STREAMS_SET_KEY, @@ -163,18 +210,6 @@ pub(crate) async fn process_historical_messages( &[("block_height", current_block)], ) .await?; - - send_execution_message( - block_height, - first_block_in_index, - chain_id.clone(), - &queue_client, - queue_url.clone(), - &mut indexer_function, - current_block, - None, - ) - .await; } } } @@ -182,11 +217,9 @@ pub(crate) async fn process_historical_messages( } pub(crate) async fn last_indexed_block_from_metadata( - aws_config: &SdkConfig, + s3_client: &S3Client, ) -> anyhow::Result { let key = format!("{}/{}", INDEXED_ACTIONS_FILES_FOLDER, "latest_block.json"); - let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client: S3Client = S3Client::from_conf(s3_config); let metadata = s3::fetch_text_file_from_s3(INDEXED_DATA_FILES_BUCKET, key, s3_client).await?; let metadata: serde_json::Value = serde_json::from_str(&metadata).unwrap(); @@ -207,7 +240,7 @@ pub(crate) async fn last_indexed_block_from_metadata( pub(crate) async fn filter_matching_blocks_from_index_files( start_block_height: BlockHeight, indexer_function: &IndexerFunction, - aws_config: &SdkConfig, + s3_client: &S3Client, start_date: DateTime, ) -> anyhow::Result> { let s3_bucket = INDEXED_DATA_FILES_BUCKET; @@ -224,7 +257,7 @@ pub(crate) async fn filter_matching_blocks_from_index_files( needs_dedupe_and_sort = true; } s3::fetch_contract_index_files( - aws_config, + s3_client, s3_bucket, INDEXED_ACTIONS_FILES_FOLDER, start_date, @@ -304,12 +337,10 @@ async fn filter_matching_unindexed_blocks_from_lake( last_indexed_block: BlockHeight, ending_block_height: BlockHeight, indexer_function: &IndexerFunction, - aws_config: &SdkConfig, - chain_id: ChainId, + s3_client: &S3Client, + chain_id: &ChainId, ) -> anyhow::Result> { - let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client: S3Client = S3Client::from_conf(s3_config); - let lake_bucket = lake_bucket_for_chain(chain_id.clone()); + let lake_bucket = lake_bucket_for_chain(chain_id); let indexer_rule = &indexer_function.indexer_rule; let count = ending_block_height - last_indexed_block; @@ -331,11 +362,15 @@ async fn filter_matching_unindexed_blocks_from_lake( for current_block in (last_indexed_block + 1)..ending_block_height { // fetch block file from S3 let key = format!("{}/block.json", normalize_block_height(current_block)); - let s3_result = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client.clone()).await; + let s3_result = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client).await; if s3_result.is_err() { let error = s3_result.err().unwrap(); - if let Some(_) = error.downcast_ref::() { + if error + .root_cause() + .downcast_ref::() + .is_some() + { tracing::info!( target: crate::INDEXER, "In manual filtering, skipping block number {} which was not found. For function {:?} {:?}", @@ -362,7 +397,7 @@ async fn filter_matching_unindexed_blocks_from_lake( normalize_block_height(current_block), shard_id ); - let shard = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client.clone()).await?; + let shard = s3::fetch_text_file_from_s3(&lake_bucket, key, s3_client).await?; match serde_json::from_slice::( shard.as_ref(), ) { @@ -401,7 +436,7 @@ async fn filter_matching_unindexed_blocks_from_lake( Ok(blocks_to_process) } -fn lake_bucket_for_chain(chain_id: ChainId) -> String { +fn lake_bucket_for_chain(chain_id: &ChainId) -> String { format!("{}{}", LAKE_BUCKET_PREFIX, chain_id) } @@ -409,42 +444,6 @@ fn normalize_block_height(block_height: BlockHeight) -> String { format!("{:0>12}", block_height) } -async fn send_execution_message( - block_height: BlockHeight, - first_block: BlockHeight, - chain_id: ChainId, - queue_client: &Client, - queue_url: String, - indexer_function: &mut IndexerFunction, - current_block: u64, - payload: Option, -) { - // only request provisioning on the first block - if current_block != first_block { - indexer_function.provisioned = true; - } - - let msg = IndexerQueueMessage { - chain_id, - indexer_rule_id: 0, - indexer_rule_name: indexer_function.function_name.clone(), - payload, - block_height: current_block, - indexer_function: indexer_function.clone(), - is_historical: true, - }; - - match queue::send_to_indexer_queue(queue_client, queue_url, vec![msg]).await { - Ok(_) => {} - Err(err) => tracing::error!( - target: crate::INDEXER, - "#{} an error occurred when sending messages to the queue\n{:#?}", - block_height, - err - ), - } -} - // if block does not exist, try next block, up to MAX_RPC_BLOCKS_TO_PROCESS (20) blocks pub async fn lookup_block_date_or_next_block_date( block_height: u64, diff --git a/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs b/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs index 69ca67af9..86c281bb5 100644 --- a/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs +++ b/indexer/queryapi_coordinator/src/historical_block_processing_integration_tests.rs @@ -20,11 +20,6 @@ mod tests { redis_connection_string: env::var("REDIS_CONNECTION_STRING").unwrap(), lake_aws_access_key, lake_aws_secret_access_key, - queue_aws_access_key: "".to_string(), - queue_aws_secret_access_key: "".to_string(), - aws_queue_region: "".to_string(), - queue_url: "MOCK".to_string(), - start_from_block_queue_url: "MOCK".to_string(), registry_contract_id: "".to_string(), port: 0, chain_id: ChainId::Mainnet(StartOptions::FromLatest), @@ -38,9 +33,11 @@ mod tests { async fn test_indexing_metadata_file() { let opts = Opts::test_opts_with_aws(); let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); let last_indexed_block = - historical_block_processing::last_indexed_block_from_metadata(aws_config) + historical_block_processing::last_indexed_block_from_metadata(&s3_client) .await .unwrap(); let a: Range = 90000000..9000000000; // valid for the next 300 years @@ -75,19 +72,28 @@ mod tests { }; let opts = Opts::test_opts_with_aws(); + let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); + let redis_connection_manager = storage::connect(&opts.redis_connection_string) .await .unwrap(); + + let json_rpc_client = near_jsonrpc_client::JsonRpcClient::connect(opts.rpc_url()); + let fake_block_height = - historical_block_processing::last_indexed_block_from_metadata(aws_config) + historical_block_processing::last_indexed_block_from_metadata(&s3_client) .await .unwrap(); let result = historical_block_processing::process_historical_messages( fake_block_height + 1, indexer_function, - opts, &redis_connection_manager, + &s3_client, + &opts.chain_id(), + &json_rpc_client, ) .await; assert!(result.unwrap() > 0); @@ -120,9 +126,11 @@ mod tests { let opts = Opts::test_opts_with_aws(); let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); let start_block_height = 77016214; - let naivedatetime_utc = NaiveDate::from_ymd_opt(2022, 10, 03) + let naivedatetime_utc = NaiveDate::from_ymd_opt(2022, 10, 3) .unwrap() .and_hms_opt(0, 0, 0) .unwrap(); @@ -130,7 +138,7 @@ mod tests { let blocks = filter_matching_blocks_from_index_files( start_block_height, &indexer_function, - aws_config, + &s3_client, datetime_utc, ) .await; @@ -145,7 +153,7 @@ mod tests { } Err(e) => { println!("Error: {:?}", e); - assert!(false); + panic!(); } } } @@ -177,9 +185,11 @@ mod tests { let opts = Opts::test_opts_with_aws(); let aws_config: &SdkConfig = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); let start_block_height = 45894620; - let naivedatetime_utc = NaiveDate::from_ymd_opt(2021, 08, 01) + let naivedatetime_utc = NaiveDate::from_ymd_opt(2021, 8, 1) .unwrap() .and_hms_opt(0, 0, 0) .unwrap(); @@ -187,7 +197,7 @@ mod tests { let blocks = filter_matching_blocks_from_index_files( start_block_height, &indexer_function, - aws_config, + &s3_client, datetime_utc, ) .await; diff --git a/indexer/queryapi_coordinator/src/indexer_registry.rs b/indexer/queryapi_coordinator/src/indexer_registry.rs index 2c79cc70c..ddf8d0593 100644 --- a/indexer/queryapi_coordinator/src/indexer_registry.rs +++ b/indexer/queryapi_coordinator/src/indexer_registry.rs @@ -8,8 +8,6 @@ use near_lake_framework::near_indexer_primitives::views::QueryRequest; use serde_json::{json, Value}; use std::collections::hash_map::Entry; use std::collections::HashMap; -use tokio::sync::MutexGuard; -use tokio::task::JoinHandle; use unescape::unescape; use crate::indexer_reducer; @@ -17,16 +15,6 @@ use crate::indexer_reducer::FunctionCallInfo; use crate::indexer_types::{IndexerFunction, IndexerRegistry}; use indexer_rule_type::indexer_rule::{IndexerRule, IndexerRuleKind, MatchingRule, Status}; -pub(crate) fn registry_as_vec_of_indexer_functions( - registry: &IndexerRegistry, -) -> Vec { - registry - .values() - .flat_map(|fns| fns.values()) - .cloned() - .collect() -} - struct RegistryFunctionInvocation { pub account_id: AccountId, pub function_name: String, @@ -99,20 +87,18 @@ pub(crate) fn build_registry_from_json(raw_registry: Value) -> IndexerRegistry { /// Returns spawned start_from_block threads pub(crate) async fn index_registry_changes( - block_height: BlockHeight, - registry: &mut MutexGuard<'_, IndexerRegistry>, + current_block_height: BlockHeight, context: &QueryApiContext<'_>, -) -> Vec> { - index_and_process_remove_calls(registry, context); +) -> anyhow::Result<()> { + index_and_process_remove_calls(context).await; - index_and_process_register_calls(block_height, registry, context) + index_and_process_register_calls(current_block_height, context).await } -fn index_and_process_register_calls( - block_height: BlockHeight, - registry: &mut MutexGuard, - context: &QueryApiContext, -) -> Vec> { +async fn index_and_process_register_calls( + current_block_height: BlockHeight, + context: &QueryApiContext<'_>, +) -> anyhow::Result<()> { let registry_method_name = "register_indexer_function"; let registry_calls_rule = build_registry_indexer_rule(registry_method_name, context.registry_contract_id); @@ -122,7 +108,6 @@ fn index_and_process_register_calls( context.chain_id, context.streamer_message.block.header.height, ); - let mut spawned_start_from_block_threads = Vec::new(); if !registry_updates.is_empty() { for update in registry_updates { @@ -134,7 +119,8 @@ fn index_and_process_register_calls( match new_indexer_function { None => continue, Some(mut new_indexer_function) => { - let fns = registry + let mut indexer_registry_lock = context.indexer_registry.lock().await; + let fns = indexer_registry_lock .entry(new_indexer_function.account_id.clone()) .or_default(); @@ -145,7 +131,7 @@ fn index_and_process_register_calls( tracing::info!( target: crate::INDEXER, "Block {}. Indexed creation call to {registry_method_name}: {:?} {:?}", - block_height, + current_block_height, new_indexer_function.account_id.clone(), new_indexer_function.function_name.clone() ); @@ -156,7 +142,7 @@ fn index_and_process_register_calls( tracing::info!( target: crate::INDEXER, "Block {}. Indexed update call to {registry_method_name}: {:?} {:?}", - block_height, + current_block_height, new_indexer_function.account_id.clone(), new_indexer_function.function_name.clone(), ); @@ -168,15 +154,26 @@ fn index_and_process_register_calls( } if new_indexer_function.start_block_height.is_some() { - if let Some(thread) = - crate::historical_block_processing::spawn_historical_message_thread( - block_height, - &mut new_indexer_function, - context.redis_connection_manager, - ) + let mut streamers_lock = context.streamers.lock().await; + + if let Some(mut existing_streamer) = + streamers_lock.remove(&new_indexer_function.get_full_name()) { - spawned_start_from_block_threads.push(thread); + existing_streamer.cancel().await?; } + + let mut streamer = crate::historical_block_processing::Streamer::new(); + + streamer.start( + current_block_height, + new_indexer_function.clone(), + context.redis_connection_manager.clone(), + context.s3_client.clone(), + context.chain_id.clone(), + context.json_rpc_client.clone(), + )?; + + streamers_lock.insert(new_indexer_function.get_full_name(), streamer); } fns.insert(update.method_name.clone(), new_indexer_function); @@ -184,13 +181,11 @@ fn index_and_process_register_calls( }; } } - spawned_start_from_block_threads + + Ok(()) } -fn index_and_process_remove_calls( - registry: &mut MutexGuard, - context: &QueryApiContext, -) { +async fn index_and_process_remove_calls(context: &QueryApiContext<'_>) { let registry_method_name = "remove_indexer_function"; let registry_calls_rule = build_registry_indexer_rule(registry_method_name, context.registry_contract_id); @@ -217,7 +212,12 @@ fn index_and_process_remove_calls( function_invocation.account_id.clone(), function_invocation.function_name.clone(), ); - match registry.entry(function_invocation.account_id.clone()) { + match context + .indexer_registry + .lock() + .await + .entry(function_invocation.account_id.clone()) + { Entry::Vacant(_) => {} Entry::Occupied(mut fns) => { fns.get_mut() diff --git a/indexer/queryapi_coordinator/src/indexer_types.rs b/indexer/queryapi_coordinator/src/indexer_types.rs index f3880320b..ea075866f 100644 --- a/indexer/queryapi_coordinator/src/indexer_types.rs +++ b/indexer/queryapi_coordinator/src/indexer_types.rs @@ -1,28 +1,9 @@ use indexer_rule_type::indexer_rule::IndexerRule; -use indexer_rules_engine::types::indexer_rule_match::{ChainId, IndexerRuleMatchPayload}; use near_lake_framework::near_indexer_primitives::types::AccountId; use std::collections::HashMap; pub type IndexerRegistry = HashMap>; -#[derive( - borsh::BorshSerialize, - borsh::BorshDeserialize, - serde::Serialize, - serde::Deserialize, - Clone, - Debug, -)] -pub struct IndexerQueueMessage { - pub chain_id: ChainId, - pub indexer_rule_id: u32, - pub indexer_rule_name: String, - pub payload: Option, - pub block_height: u64, - pub indexer_function: IndexerFunction, - pub is_historical: bool, -} - #[derive( borsh::BorshSerialize, borsh::BorshDeserialize, diff --git a/indexer/queryapi_coordinator/src/main.rs b/indexer/queryapi_coordinator/src/main.rs index a67f5653a..fe88af9c7 100644 --- a/indexer/queryapi_coordinator/src/main.rs +++ b/indexer/queryapi_coordinator/src/main.rs @@ -1,15 +1,16 @@ -use cached::SizedCache; +use std::collections::HashMap; + use futures::stream::{self, StreamExt}; use near_jsonrpc_client::JsonRpcClient; -use tokio::sync::{Mutex, MutexGuard}; +use tokio::sync::Mutex; use indexer_rules_engine::types::indexer_rule_match::{ChainId, IndexerRuleMatch}; -use near_lake_framework::near_indexer_primitives::types::{AccountId, BlockHeight}; -use near_lake_framework::near_indexer_primitives::{types, StreamerMessage}; +use near_lake_framework::near_indexer_primitives::types::BlockHeight; +use near_lake_framework::near_indexer_primitives::StreamerMessage; use utils::serialize_to_camel_case_json_string; use crate::indexer_types::IndexerFunction; -use indexer_types::{IndexerQueueMessage, IndexerRegistry}; +use indexer_types::IndexerRegistry; use opts::{Opts, Parser}; use storage::{self, generate_real_time_streamer_message_key, ConnectionManager}; @@ -19,33 +20,24 @@ mod indexer_registry; mod indexer_types; mod metrics; mod opts; -mod queue; mod s3; mod utils; pub(crate) const INDEXER: &str = "queryapi_coordinator"; -pub(crate) const INTERVAL: std::time::Duration = std::time::Duration::from_millis(100); -pub(crate) const MAX_DELAY_TIME: std::time::Duration = std::time::Duration::from_millis(4000); -pub(crate) const RETRY_COUNT: usize = 2; type SharedIndexerRegistry = std::sync::Arc>; -#[derive(Debug, Default, Clone, Copy)] -pub struct BalanceDetails { - pub non_staked: types::Balance, - pub staked: types::Balance, -} - -pub type BalanceCache = std::sync::Arc>>; +type Streamers = std::sync::Arc>>; pub(crate) struct QueryApiContext<'a> { pub streamer_message: near_lake_framework::near_indexer_primitives::StreamerMessage, pub chain_id: &'a ChainId, - pub queue_client: &'a queue::QueueClient, - pub queue_url: &'a str, + pub s3_client: &'a aws_sdk_s3::Client, + pub json_rpc_client: &'a JsonRpcClient, pub registry_contract_id: &'a str, - pub balance_cache: &'a BalanceCache, pub redis_connection_manager: &'a ConnectionManager, + pub indexer_registry: &'a SharedIndexerRegistry, + pub streamers: &'a Streamers, } #[tokio::main] @@ -57,14 +49,11 @@ async fn main() -> anyhow::Result<()> { let opts = Opts::parse(); let chain_id = &opts.chain_id(); - let aws_region = opts.aws_queue_region.clone(); - let queue_client = queue::queue_client(aws_region, opts.queue_credentials()); - let queue_url = opts.queue_url.clone(); let registry_contract_id = opts.registry_contract_id.clone(); - // We want to prevent unnecessary RPC queries to find previous balance - let balances_cache: BalanceCache = - std::sync::Arc::new(Mutex::new(SizedCache::with_size(100_000))); + let aws_config = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); tracing::info!(target: INDEXER, "Connecting to redis..."); let redis_connection_manager = storage::connect(&opts.redis_connection_string).await?; @@ -86,6 +75,8 @@ async fn main() -> anyhow::Result<()> { let indexer_registry: SharedIndexerRegistry = std::sync::Arc::new(Mutex::new(indexer_functions)); + let streamers = std::sync::Arc::new(tokio::sync::Mutex::new(HashMap::new())); + tracing::info!(target: INDEXER, "Generating LakeConfig..."); let config: near_lake_framework::LakeConfig = opts.to_lake_config().await; @@ -100,14 +91,16 @@ async fn main() -> anyhow::Result<()> { .map(|streamer_message| { let context = QueryApiContext { redis_connection_manager: &redis_connection_manager, - queue_url: &queue_url, - balance_cache: &balances_cache, registry_contract_id: ®istry_contract_id, streamer_message, chain_id, - queue_client: &queue_client, + json_rpc_client: &json_rpc_client, + s3_client: &s3_client, + indexer_registry: &indexer_registry, + streamers: &streamers, }; - handle_streamer_message(context, indexer_registry.clone()) + + handle_streamer_message(context) }) .buffer_unordered(1usize); @@ -126,13 +119,15 @@ async fn main() -> anyhow::Result<()> { } } -async fn handle_streamer_message( - context: QueryApiContext<'_>, - indexer_registry: SharedIndexerRegistry, -) -> anyhow::Result { - let mut indexer_registry_locked = indexer_registry.lock().await; - let indexer_functions = - indexer_registry::registry_as_vec_of_indexer_functions(&indexer_registry_locked); +async fn handle_streamer_message(context: QueryApiContext<'_>) -> anyhow::Result { + let indexer_functions = { + let lock = context.indexer_registry.lock().await; + + lock.values() + .flat_map(|fns| fns.values()) + .cloned() + .collect::>() + }; let mut indexer_function_filter_matches_futures = stream::iter(indexer_functions.iter()) .map(|indexer_function| { @@ -156,19 +151,7 @@ async fn handle_streamer_message( ) .await?; - let spawned_indexers = indexer_registry::index_registry_changes( - block_height, - &mut indexer_registry_locked, - &context, - ) - .await; - if !spawned_indexers.is_empty() { - tracing::info!( - target: INDEXER, - "Spawned {} historical backfill indexers", - spawned_indexers.len() - ); - } + indexer_registry::index_registry_changes(block_height, &context).await?; while let Some(indexer_function_with_matches) = indexer_function_filter_matches_futures.next().await @@ -177,9 +160,7 @@ async fn handle_streamer_message( let indexer_function = indexer_function_with_matches.indexer_function; let indexer_rule_matches = indexer_function_with_matches.matches; - let mut indexer_function_messages: Vec = Vec::new(); - - for indexer_rule_match in indexer_rule_matches.iter() { + for _ in indexer_rule_matches.iter() { tracing::debug!( target: INDEXER, "Matched filter {:?} for function {} {}", @@ -188,22 +169,8 @@ async fn handle_streamer_message( indexer_function.function_name, ); - let msg = IndexerQueueMessage { - chain_id: indexer_rule_match.chain_id.clone(), - indexer_rule_id: indexer_rule_match.indexer_rule_id.unwrap_or(0), - indexer_rule_name: indexer_rule_match - .indexer_rule_name - .clone() - .unwrap_or("".to_string()), - payload: Some(indexer_rule_match.payload.clone()), - block_height, - indexer_function: indexer_function.clone(), - is_historical: false, - }; - indexer_function_messages.push(msg); - if !indexer_function.provisioned { - set_provisioned_flag(&mut indexer_registry_locked, &indexer_function); + set_provisioned_flag(context.indexer_registry, indexer_function).await; } storage::sadd( @@ -226,27 +193,6 @@ async fn handle_streamer_message( ) .await?; } - - stream::iter(indexer_function_messages.into_iter()) - .chunks(10) - .for_each(|indexer_queue_messages_batch| async { - match queue::send_to_indexer_queue( - context.queue_client, - context.queue_url.to_string(), - indexer_queue_messages_batch, - ) - .await - { - Ok(_) => {} - Err(err) => tracing::error!( - target: INDEXER, - "#{} an error occurred during sending messages to the queue\n{:#?}", - context.streamer_message.block.header.height, - err - ), - } - }) - .await; } } @@ -271,11 +217,15 @@ async fn handle_streamer_message( Ok(context.streamer_message.block.header.height) } -fn set_provisioned_flag( - indexer_registry_locked: &mut MutexGuard, +async fn set_provisioned_flag( + indexer_registry: &SharedIndexerRegistry, indexer_function: &IndexerFunction, ) { - match indexer_registry_locked.get_mut(&indexer_function.account_id) { + match indexer_registry + .lock() + .await + .get_mut(&indexer_function.account_id) + { Some(account_functions) => { match account_functions.get_mut(&indexer_function.function_name) { Some(indexer_function) => { @@ -332,48 +282,53 @@ async fn reduce_rule_matches_for_indexer_function<'x>( #[cfg(test)] mod historical_block_processing_integration_tests; -use indexer_rule_type::indexer_rule::{IndexerRule, IndexerRuleKind, MatchingRule, Status}; -use std::collections::HashMap; - -#[tokio::test] -async fn set_provisioning_finds_functions_in_registry() { - let mut indexer_registry = IndexerRegistry::new(); - let indexer_function = IndexerFunction { - account_id: "test_near".to_string().parse().unwrap(), - function_name: "test_indexer".to_string(), - code: "".to_string(), - start_block_height: None, - schema: None, - provisioned: false, - indexer_rule: IndexerRule { - indexer_rule_kind: IndexerRuleKind::Action, - id: None, - name: None, - matching_rule: MatchingRule::ActionAny { - affected_account_id: "social.near".to_string(), - status: Status::Success, +#[cfg(test)] +mod tests { + use super::*; + use indexer_rule_type::indexer_rule::{IndexerRule, IndexerRuleKind, MatchingRule, Status}; + use std::collections::HashMap; + + #[tokio::test] + async fn set_provisioning_finds_functions_in_registry() { + let mut indexer_registry = IndexerRegistry::new(); + let indexer_function = IndexerFunction { + account_id: "test_near".to_string().parse().unwrap(), + function_name: "test_indexer".to_string(), + code: "".to_string(), + start_block_height: None, + schema: None, + provisioned: false, + indexer_rule: IndexerRule { + indexer_rule_kind: IndexerRuleKind::Action, + id: None, + name: None, + matching_rule: MatchingRule::ActionAny { + affected_account_id: "social.near".to_string(), + status: Status::Success, + }, }, - }, - }; + }; - let mut functions: HashMap = HashMap::new(); - functions.insert( - indexer_function.function_name.clone(), - indexer_function.clone(), - ); - indexer_registry.insert(indexer_function.account_id.clone(), functions); + let mut functions: HashMap = HashMap::new(); + functions.insert( + indexer_function.function_name.clone(), + indexer_function.clone(), + ); + indexer_registry.insert(indexer_function.account_id.clone(), functions); - let indexer_registry: SharedIndexerRegistry = std::sync::Arc::new(Mutex::new(indexer_registry)); - let mut indexer_registry_locked = indexer_registry.lock().await; + let indexer_registry: SharedIndexerRegistry = + std::sync::Arc::new(Mutex::new(indexer_registry)); + let mut indexer_registry_locked = indexer_registry.lock().await; - set_provisioned_flag(&mut indexer_registry_locked, &&indexer_function); + set_provisioned_flag(&indexer_registry, &indexer_function); - let account_functions = indexer_registry_locked - .get(&indexer_function.account_id) - .unwrap(); - let indexer_function = account_functions - .get(&indexer_function.function_name) - .unwrap(); + let account_functions = indexer_registry_locked + .get(&indexer_function.account_id) + .unwrap(); + let indexer_function = account_functions + .get(&indexer_function.function_name) + .unwrap(); - assert!(indexer_function.provisioned); + assert!(indexer_function.provisioned); + } } diff --git a/indexer/queryapi_coordinator/src/opts.rs b/indexer/queryapi_coordinator/src/opts.rs index af0fd4a11..b99d580c8 100644 --- a/indexer/queryapi_coordinator/src/opts.rs +++ b/indexer/queryapi_coordinator/src/opts.rs @@ -26,21 +26,6 @@ pub struct Opts { #[clap(long, env)] /// AWS Secret Access Key with the rights to read from AWS S3 pub lake_aws_secret_access_key: String, - /// AWS Access Key with the rights to send messages to the `--queue-url` - #[clap(long, env)] - pub queue_aws_access_key: String, - /// AWS Secret Access Key with the rights to send messages to the `--queue-url` - #[clap(long, env)] - pub queue_aws_secret_access_key: String, - /// Which AWS region to use with the `--queue-url` - #[clap(long, env)] - pub aws_queue_region: String, - /// URL to the main AWS SQS queue backed by Queue Handler lambda - #[clap(long, env)] - pub queue_url: String, - /// URL to the AWS SQS queue for processing historical data - #[clap(long, env)] - pub start_from_block_queue_url: String, /// Registry contract to use #[clap(env)] pub registry_contract_id: String, @@ -61,6 +46,7 @@ pub enum ChainId { } #[derive(Subcommand, Debug, Clone)] +#[allow(clippy::enum_variant_names)] pub enum StartOptions { FromBlock { height: u64 }, FromInterruption, @@ -98,18 +84,6 @@ impl Opts { aws_credential_types::provider::SharedCredentialsProvider::new(provider) } - // Creates AWS Credentials for SQS Queue - pub fn queue_credentials(&self) -> aws_credential_types::provider::SharedCredentialsProvider { - let provider = aws_credential_types::Credentials::new( - self.queue_aws_access_key.clone(), - self.queue_aws_secret_access_key.clone(), - None, - None, - "queryapi_coordinator_queue", - ); - aws_credential_types::provider::SharedCredentialsProvider::new(provider) - } - /// Creates AWS Shared Config for NEAR Lake pub fn lake_aws_sdk_config(&self) -> aws_types::sdk_config::SdkConfig { aws_types::sdk_config::SdkConfig::builder() diff --git a/indexer/queryapi_coordinator/src/queue.rs b/indexer/queryapi_coordinator/src/queue.rs deleted file mode 100644 index 1ff6fa5cb..000000000 --- a/indexer/queryapi_coordinator/src/queue.rs +++ /dev/null @@ -1,73 +0,0 @@ -use crate::indexer_types::IndexerQueueMessage; -use aws_credential_types::provider::SharedCredentialsProvider; -pub use aws_sdk_sqs::{ - error::SendMessageError, model::SendMessageBatchRequestEntry, Client as QueueClient, Region, -}; - -pub const MOCK_QUEUE_URL: &str = "MOCK"; - -/// Creates AWS SQS Client for QueryApi SQS -pub fn queue_client(region: String, credentials: SharedCredentialsProvider) -> aws_sdk_sqs::Client { - let shared_config = queue_aws_sdk_config(region, credentials); - aws_sdk_sqs::Client::new(&shared_config) -} - -/// Creates AWS Shared Config for QueryApi SQS queue -pub fn queue_aws_sdk_config( - region: String, - credentials: SharedCredentialsProvider, -) -> aws_types::sdk_config::SdkConfig { - aws_types::sdk_config::SdkConfig::builder() - .credentials_provider(credentials) - .region(aws_types::region::Region::new(region)) - .build() -} - -pub async fn send_to_indexer_queue( - client: &aws_sdk_sqs::Client, - queue_url: String, - indexer_queue_messages: Vec, -) -> anyhow::Result<()> { - if queue_url == MOCK_QUEUE_URL { - for m in &indexer_queue_messages { - tracing::info!( - "Mock sending messages to SQS: {:?} {:?}", - m.indexer_function.function_name, - m.block_height - ); - } - return Ok(()); - } - - let message_bodies: Vec = indexer_queue_messages - .into_iter() - .enumerate() - .map(|(index, indexer_queue_message)| { - SendMessageBatchRequestEntry::builder() - .id(index.to_string()) - .message_body( - serde_json::to_string(&indexer_queue_message) - .expect("Failed to Json Serialize IndexerQueueMessage"), - ) - .message_group_id(format!( - "{}_{}", - indexer_queue_message.indexer_function.account_id, - indexer_queue_message.indexer_function.function_name - )) - .build() - }) - .collect(); - - let rsp = client - .send_message_batch() - .queue_url(queue_url) - .set_entries(Some(message_bodies)) - .send() - .await?; - tracing::debug!( - target: crate::INDEXER, - "Response from sending a message to SQS\n{:#?}", - rsp - ); - Ok(()) -} diff --git a/indexer/queryapi_coordinator/src/s3.rs b/indexer/queryapi_coordinator/src/s3.rs index f88323688..9b7e8696b 100644 --- a/indexer/queryapi_coordinator/src/s3.rs +++ b/indexer/queryapi_coordinator/src/s3.rs @@ -1,7 +1,5 @@ use anyhow::{bail, Context, Result}; use aws_sdk_s3::Client as S3Client; -use aws_sdk_s3::Config; -use aws_types::SdkConfig; use chrono::{DateTime, NaiveDate, Utc}; use futures::future::try_join_all; @@ -17,7 +15,7 @@ fn storage_path_for_account(account: &str) -> String { } pub async fn find_index_files_by_pattern( - aws_config: &SdkConfig, + s3_client: &S3Client, s3_bucket: &str, s3_folder: &str, pattern: &str, @@ -29,10 +27,10 @@ pub async fn find_index_files_by_pattern( for account in account_array { let account = account.trim(); let sub_results = if account.contains('*') { - list_index_files_by_wildcard(aws_config, s3_bucket, s3_folder, &account).await? + list_index_files_by_wildcard(s3_client, s3_bucket, s3_folder, &account).await? } else { list_s3_bucket_by_prefix( - aws_config, + s3_client, s3_bucket, &format!("{}/{}/", s3_folder, storage_path_for_account(account)), ) @@ -43,11 +41,11 @@ pub async fn find_index_files_by_pattern( results } x if x.contains('*') => { - list_index_files_by_wildcard(aws_config, s3_bucket, s3_folder, &x).await? + list_index_files_by_wildcard(s3_client, s3_bucket, s3_folder, &x).await? } _ => { list_s3_bucket_by_prefix( - aws_config, + s3_client, s3_bucket, &format!("{}/{}/", s3_folder, storage_path_for_account(pattern),), ) @@ -57,7 +55,7 @@ pub async fn find_index_files_by_pattern( } async fn list_index_files_by_wildcard( - aws_config: &SdkConfig, + s3_client: &S3Client, s3_bucket: &str, s3_folder: &str, pattern: &&str, @@ -67,24 +65,20 @@ async fn list_index_files_by_wildcard( let path = storage_path_for_account(&pattern); let folders = - list_s3_bucket_by_prefix(aws_config, s3_bucket, &format!("{}/{}/", s3_folder, path)) - .await?; + list_s3_bucket_by_prefix(s3_client, s3_bucket, &format!("{}/{}/", s3_folder, path)).await?; // for each matching folder list files let mut results = vec![]; for folder in folders { - results.extend(list_s3_bucket_by_prefix(aws_config, s3_bucket, &folder).await?); + results.extend(list_s3_bucket_by_prefix(s3_client, s3_bucket, &folder).await?); } Ok(results) } async fn list_s3_bucket_by_prefix( - aws_config: &SdkConfig, + s3_client: &S3Client, s3_bucket: &str, s3_prefix: &str, ) -> Result> { - let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client: S3Client = S3Client::from_conf(s3_config); - let mut results = vec![]; let mut continuation_token: Option = None; @@ -126,18 +120,15 @@ async fn list_s3_bucket_by_prefix( } pub async fn fetch_contract_index_files( - aws_config: &SdkConfig, + s3_client: &S3Client, s3_bucket: &str, s3_folder: &str, start_date: DateTime, contract_pattern: &str, ) -> Result> { - let s3_config: Config = aws_sdk_s3::config::Builder::from(aws_config).build(); - let s3_client: S3Client = S3Client::from_conf(s3_config); - // list all index files let file_list = - find_index_files_by_pattern(aws_config, s3_bucket, s3_folder, contract_pattern).await?; + find_index_files_by_pattern(s3_client, s3_bucket, s3_folder, contract_pattern).await?; let fetch_and_parse_tasks = file_list .into_iter() @@ -146,7 +137,7 @@ pub async fn fetch_contract_index_files( let s3_client = s3_client.clone(); async move { // Fetch the file - fetch_text_file_from_s3(s3_bucket, key, s3_client).await + fetch_text_file_from_s3(s3_bucket, key, &s3_client).await } }) .collect::>(); @@ -162,7 +153,7 @@ pub async fn fetch_contract_index_files( pub async fn fetch_text_file_from_s3( s3_bucket: &str, key: String, - s3_client: S3Client, + s3_client: &S3Client, ) -> Result { // todo: can we retry if this fails like the lake s3_fetcher fn does? // If so, can we differentiate between a file not existing (block height does not exist) and a network error? @@ -216,11 +207,14 @@ mod tests { #[tokio::test] async fn list_delta_bucket() { let opts = Opts::test_opts_with_aws(); + let aws_config = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); let list = list_s3_bucket_by_prefix( - &opts.lake_aws_sdk_config(), + &s3_client, INDEXED_DATA_FILES_BUCKET, - &format!("{}/", INDEXED_ACTIONS_FILES_FOLDER.to_string()), + &format!("{}/", INDEXED_ACTIONS_FILES_FOLDER), ) .await .unwrap(); @@ -231,9 +225,12 @@ mod tests { #[tokio::test] async fn list_with_single_contract() { let opts = Opts::test_opts_with_aws(); + let aws_config = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); let list = find_index_files_by_pattern( - &opts.lake_aws_sdk_config(), + &s3_client, INDEXED_DATA_FILES_BUCKET, INDEXED_ACTIONS_FILES_FOLDER, "hackathon.agency.near", @@ -247,9 +244,12 @@ mod tests { #[tokio::test] async fn list_with_csv_contracts() { let opts = Opts::test_opts_with_aws(); + let aws_config = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); let list = find_index_files_by_pattern( - &opts.lake_aws_sdk_config(), + &s3_client, INDEXED_DATA_FILES_BUCKET, INDEXED_ACTIONS_FILES_FOLDER, "hackathon.agency.near, hackathon.aurora-silo-dev.near, hackathon.sputnik-dao.near", @@ -263,9 +263,12 @@ mod tests { #[tokio::test] async fn list_with_wildcard_contracts() { let opts = Opts::test_opts_with_aws(); + let aws_config = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); let list = find_index_files_by_pattern( - &opts.lake_aws_sdk_config(), + &s3_client, INDEXED_DATA_FILES_BUCKET, INDEXED_ACTIONS_FILES_FOLDER, "*.keypom.near", @@ -279,9 +282,12 @@ mod tests { #[tokio::test] async fn list_with_csv_and_wildcard_contracts() { let opts = Opts::test_opts_with_aws(); + let aws_config = &opts.lake_aws_sdk_config(); + let s3_config = aws_sdk_s3::config::Builder::from(aws_config).build(); + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); let list = find_index_files_by_pattern( - &opts.lake_aws_sdk_config(), + &s3_client, INDEXED_DATA_FILES_BUCKET, INDEXED_ACTIONS_FILES_FOLDER, "*.keypom.near, hackathon.agency.near, *.nearcrowd.near", @@ -322,14 +328,17 @@ mod tests { let s3_result = fetch_text_file_from_s3( format!("{}{}", LAKE_BUCKET_PREFIX, "mainnet").as_str(), "does_not_exist/block.json".to_string(), - s3_client, + &s3_client, ) .await; if s3_result.is_err() { let wrapped_error = s3_result.err().unwrap(); let error = wrapped_error.root_cause(); - if let Some(_) = error.downcast_ref::() { + if error + .downcast_ref::() + .is_some() + { success = true; } else { println!("Failed to downcast error: {:?}", error); diff --git a/indexer/queryapi_coordinator/src/utils.rs b/indexer/queryapi_coordinator/src/utils.rs index 9f8c2585a..27830f9a3 100644 --- a/indexer/queryapi_coordinator/src/utils.rs +++ b/indexer/queryapi_coordinator/src/utils.rs @@ -64,7 +64,7 @@ pub(crate) fn serialize_to_camel_case_json_string( // Convert keys to Camel Case to_camel_case_keys(&mut message_value); - return serde_json::to_string(&message_value); + serde_json::to_string(&message_value) } fn to_camel_case_keys(message_value: &mut Value) { @@ -74,13 +74,13 @@ fn to_camel_case_keys(message_value: &mut Value) { for key in map.keys().cloned().collect::>() { // Generate Camel Case Key let new_key = key - .split("_") + .split('_') .enumerate() .map(|(i, str)| { if i > 0 { return str[..1].to_uppercase() + &str[1..]; } - return str.to_owned(); + str.to_owned() }) .collect::>() .join("");