Skip to content

Commit

Permalink
Merge pull request #832 from near/main
Browse files Browse the repository at this point in the history
Prod Release 25/06/24
  • Loading branch information
morgsmccauley authored Jun 24, 2024
2 parents 78cf5ae + 056f272 commit b382782
Show file tree
Hide file tree
Showing 92 changed files with 4,772 additions and 2,515 deletions.
File renamed without changes.
101 changes: 101 additions & 0 deletions coordinator/src/handlers/data_layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#![cfg_attr(test, allow(dead_code))]

use near_primitives::types::AccountId;

pub use runner::data_layer::TaskStatus;

use anyhow::Context;
use runner::data_layer::data_layer_client::DataLayerClient;
use runner::data_layer::{DeprovisionRequest, GetTaskStatusRequest, ProvisionRequest};
use tonic::transport::channel::Channel;
use tonic::Request;

use crate::indexer_config::IndexerConfig;

#[cfg(not(test))]
pub use DataLayerHandlerImpl as DataLayerHandler;
#[cfg(test)]
pub use MockDataLayerHandlerImpl as DataLayerHandler;

type TaskId = String;

pub struct DataLayerHandlerImpl {
client: DataLayerClient<Channel>,
}

#[cfg_attr(test, mockall::automock)]
impl DataLayerHandlerImpl {
pub fn connect(runner_url: &str) -> anyhow::Result<Self> {
let channel = Channel::from_shared(runner_url.to_string())
.context("Runner URL is invalid")?
.connect_lazy();
let client = DataLayerClient::new(channel);

Ok(Self { client })
}

pub async fn start_provisioning_task(
&self,
indexer_config: &IndexerConfig,
) -> anyhow::Result<TaskId> {
let request = ProvisionRequest {
account_id: indexer_config.account_id.to_string(),
function_name: indexer_config.function_name.clone(),
schema: indexer_config.schema.clone(),
};

let response = self
.client
.clone()
.start_provisioning_task(Request::new(request))
.await?;

Ok(response.into_inner().task_id)
}

pub async fn start_deprovisioning_task(
&self,
account_id: AccountId,
function_name: String,
) -> anyhow::Result<TaskId> {
let request = DeprovisionRequest {
account_id: account_id.to_string(),
function_name,
};

let response = self
.client
.clone()
.start_deprovisioning_task(Request::new(request))
.await?;

Ok(response.into_inner().task_id)
}

pub async fn get_task_status(&self, task_id: TaskId) -> anyhow::Result<TaskStatus> {
let request = GetTaskStatusRequest { task_id };

let response = self
.client
.clone()
.get_task_status(Request::new(request))
.await;

if let Err(error) = response {
if error.code() == tonic::Code::NotFound {
return Ok(TaskStatus::Failed);
}

return Err(error.into());
}

let status = match response.unwrap().into_inner().status {
1 => TaskStatus::Pending,
2 => TaskStatus::Complete,
3 => TaskStatus::Failed,
_ => anyhow::bail!("Received invalid task status"),
};

Ok(status)
}
}
File renamed without changes.
3 changes: 3 additions & 0 deletions coordinator/src/handlers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod block_streams;
pub mod data_layer;
pub mod executors;
Loading

0 comments on commit b382782

Please sign in to comment.