From 13fd761363db98a0a5cf2a356a629f712d98a064 Mon Sep 17 00:00:00 2001 From: Bartosz Nowak Date: Sat, 29 Jun 2024 20:13:55 +0200 Subject: [PATCH] poc --- Cargo.toml | 1 + crates/delegator/Cargo.toml | 1 + crates/delegator/proto/delegator.proto | 6 +- crates/delegator/src/delegator.rs | 36 +++------- crates/delegator/src/delegator/event_loop.rs | 6 +- crates/delegator/src/main.rs | 20 ++++-- crates/delegator/src/swarm.rs | 2 - crates/delegator/src/tonic.rs | 69 +++++++++++++++++--- 8 files changed, 92 insertions(+), 49 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 675e34d..ff08124 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,6 +68,7 @@ tonic = "0.11.0" prost = "0.12" prost-types = "0.12" tonic-build = "0.11.0" +tokio-stream = "0.1.15" zetina-common = { path = "crates/common" } zetina-compiler = { path = "crates/compiler" } diff --git a/crates/delegator/Cargo.toml b/crates/delegator/Cargo.toml index a96c407..d439b22 100644 --- a/crates/delegator/Cargo.toml +++ b/crates/delegator/Cargo.toml @@ -26,6 +26,7 @@ tonic.workspace = true prost.workspace = true async-stream.workspace = true tokio-util.workspace = true +tokio-stream.workspace = true [build-dependencies] tonic-build.workspace = true \ No newline at end of file diff --git a/crates/delegator/proto/delegator.proto b/crates/delegator/proto/delegator.proto index 93d165f..2712477 100644 --- a/crates/delegator/proto/delegator.proto +++ b/crates/delegator/proto/delegator.proto @@ -3,8 +3,8 @@ syntax = "proto3"; package delegator; service DelegatorService { - rpc delegate(DelegateRequest) returns (DelegateResponse) {} + rpc Delegator(stream DelegateRequest) returns (stream DelegateResponse) {} } -message DelegateRequest { string name = 1; } -message DelegateResponse { string message = 1; } \ No newline at end of file +message DelegateRequest { bytes cairo_pie = 1; } +message DelegateResponse { bytes proof = 1; } \ No newline at end of file diff --git a/crates/delegator/src/delegator.rs b/crates/delegator/src/delegator.rs index e90ba17..8b35da5 100644 --- a/crates/delegator/src/delegator.rs +++ b/crates/delegator/src/delegator.rs @@ -4,53 +4,35 @@ use event_loop::delegator_loop; use futures::executor::block_on; use futures::FutureExt; use libp2p::gossipsub::Event; -use starknet::signers::SigningKey; -use std::hash::{DefaultHasher, Hash, Hasher}; -use tokio::{sync::mpsc, task::JoinHandle}; -use tokio_util::sync::CancellationToken; -use tracing::info; -use zetina_common::{ - hash, - job::{Job, JobData}, - job_witness::JobWitness, +use tokio::{ + sync::{broadcast, mpsc}, + task::JoinHandle, }; +use tokio_util::sync::CancellationToken; +use zetina_common::job_witness::JobWitness; -pub struct Delegator<'identity> { - signing_key: &'identity SigningKey, - job_topic_tx: mpsc::Sender>, +pub struct Delegator { cancellation_token: CancellationToken, handle: Option>, } -impl<'identity> Delegator<'identity> { +impl Delegator { pub fn new( - signing_key: &'identity SigningKey, - job_topic_tx: mpsc::Sender>, - job_witness_tx: mpsc::Sender, + job_witness_tx: broadcast::Sender, events_rx: mpsc::Receiver, ) -> Self { let cancellation_token = CancellationToken::new(); Self { - signing_key, - job_topic_tx, cancellation_token: cancellation_token.to_owned(), handle: Some(tokio::spawn(async move { delegator_loop(events_rx, job_witness_tx, cancellation_token).boxed().await })), } } - - pub async fn delegate(self, job_data: JobData) -> Result<(), mpsc::error::SendError>> { - let job = Job::try_from_job_data(job_data, self.signing_key); - let serialized_job = serde_json::to_string(&job).unwrap(); - self.job_topic_tx.send(serialized_job.into()).await?; - info!("Sent a new job: {}", hash!(&job)); - Ok(()) - } } -impl<'identity> Drop for Delegator<'identity> { +impl Drop for Delegator { fn drop(&mut self) { self.cancellation_token.cancel(); block_on(async move { diff --git a/crates/delegator/src/delegator/event_loop.rs b/crates/delegator/src/delegator/event_loop.rs index d2d0005..09fa4e3 100644 --- a/crates/delegator/src/delegator/event_loop.rs +++ b/crates/delegator/src/delegator/event_loop.rs @@ -1,6 +1,6 @@ use libp2p::gossipsub::Event; use std::hash::{DefaultHasher, Hash, Hasher}; -use tokio::sync::mpsc; +use tokio::sync::{broadcast, mpsc}; use tokio_util::sync::CancellationToken; use tracing::info; use zetina_common::{ @@ -13,7 +13,7 @@ use zetina_common::{ pub async fn delegator_loop( mut message_stream: mpsc::Receiver, - job_witness_tx: mpsc::Sender, + job_witness_tx: broadcast::Sender, cancellation_token: CancellationToken, ) { loop { @@ -35,7 +35,7 @@ pub async fn delegator_loop( if message.topic == gossipsub_ident_topic(Network::Sepolia, Topic::FinishedJob).into() { let job_witness: JobWitness = serde_json::from_slice(&message.data).unwrap(); info!("Received finished job event: {}", hash!(&job_witness)); - job_witness_tx.send(job_witness).await.unwrap(); + job_witness_tx.send(job_witness).unwrap(); } }, Event::Subscribed { peer_id, topic } => { diff --git a/crates/delegator/src/main.rs b/crates/delegator/src/main.rs index fc6b9dd..ebf58af 100644 --- a/crates/delegator/src/main.rs +++ b/crates/delegator/src/main.rs @@ -2,11 +2,13 @@ pub mod delegator; pub mod swarm; pub mod tonic; +use ::tonic::transport::Server; use delegator::Delegator; use libp2p::gossipsub; use starknet::providers::{jsonrpc::HttpTransport, JsonRpcClient, Url}; use swarm::SwarmRunner; -use tokio::sync::mpsc; +use tokio::sync::{broadcast, mpsc}; +use tonic::{proto::delegator_service_server::DelegatorServiceServer, DelegatorGRPCServer}; use tracing_subscriber::EnvFilter; use zetina_common::{ job_witness::JobWitness, @@ -40,7 +42,7 @@ async fn main() -> Result<(), Box> { let finished_job_topic = gossipsub_ident_topic(network, Topic::FinishedJob); let (swarm_events_tx, swarm_events_rx) = mpsc::channel::(100); - let (job_witness_tx, job_witness_rx) = mpsc::channel::(100); + let (job_witness_tx, job_witness_rx) = broadcast::channel::(100); let (new_job_topic_tx, new_job_topic_rx) = mpsc::channel::>(100); @@ -51,12 +53,18 @@ async fn main() -> Result<(), Box> { swarm_events_tx, )?; - Delegator::new( - node_account.get_signing_key(), + Delegator::new(job_witness_tx, swarm_events_rx); + + let server = DelegatorGRPCServer::new( + node_account.get_signing_key().to_owned(), new_job_topic_tx, - job_witness_tx, - swarm_events_rx, + job_witness_rx, ); + Server::builder() + .add_service(DelegatorServiceServer::new(server)) + .serve("[::1]:50051".parse().unwrap()) + .await?; + Ok(()) } diff --git a/crates/delegator/src/swarm.rs b/crates/delegator/src/swarm.rs index b0cb9b3..addbe7f 100644 --- a/crates/delegator/src/swarm.rs +++ b/crates/delegator/src/swarm.rs @@ -77,8 +77,6 @@ impl SwarmRunner { Ok(gossipsub::Behaviour::new(message_authenticity, config)?) } - - pub async fn run() {} } impl Drop for SwarmRunner { diff --git a/crates/delegator/src/tonic.rs b/crates/delegator/src/tonic.rs index 25cf0c5..3e51060 100644 --- a/crates/delegator/src/tonic.rs +++ b/crates/delegator/src/tonic.rs @@ -1,4 +1,19 @@ -use tonic::{transport::Server, Request, Response, Status}; +use async_stream::stream; +use futures::{Stream, StreamExt, TryStreamExt}; +use starknet::signers::SigningKey; +use std::hash::{DefaultHasher, Hash, Hasher}; +use std::pin::Pin; +use tokio::{ + select, + sync::{broadcast, mpsc}, +}; +use tonic::{Request, Response, Status, Streaming}; +use tracing::info; +use zetina_common::hash; +use zetina_common::{ + job::{Job, JobData}, + job_witness::JobWitness, +}; pub mod proto { tonic::include_proto!("delegator"); @@ -6,18 +21,56 @@ pub mod proto { use crate::tonic::proto::delegator_service_server::DelegatorService; use proto::{DelegateRequest, DelegateResponse}; -#[derive(Default)] -pub struct DelegatorGRPCServer {} +pub struct DelegatorGRPCServer { + signing_key: SigningKey, + job_topic_tx: mpsc::Sender>, + job_witness_rx: broadcast::Receiver, +} + +impl DelegatorGRPCServer { + pub fn new( + signing_key: SigningKey, + job_topic_tx: mpsc::Sender>, + job_witness_rx: broadcast::Receiver, + ) -> Self { + Self { signing_key, job_topic_tx, job_witness_rx } + } +} #[tonic::async_trait] impl DelegatorService for DelegatorGRPCServer { - async fn delegate( + type DelegatorStream = Pin> + Send>>; + async fn delegator( &self, - request: Request, - ) -> Result, Status> { + request: Request>, + ) -> Result, Status> { println!("Got a request from {:?}", request.remote_addr()); + let mut in_stream = request.into_inner().into_stream().fuse(); + let job_channel = self.job_topic_tx.clone(); + let mut witness_channel = self.job_witness_rx.resubscribe(); + let signing_key = self.signing_key.clone(); + + let out_stream = stream! { + loop { + select! { + Ok(request) = in_stream.select_next_some() => { + let job_data = JobData::new(0, request.cairo_pie); + let job = Job::try_from_job_data(job_data, &signing_key); + let serialized_job = serde_json::to_string(&job).unwrap(); + job_channel.send(serialized_job.into()).await.unwrap(); + info!("Sent a new job: {}", hash!(&job)); + } + Ok(rx) = witness_channel.recv() => { + yield Ok(DelegateResponse { proof: rx.proof }) + } + else => { + yield Err(Status::cancelled("")) + } + } + } + } + .boxed(); - let reply = DelegateResponse { message: format!("Hello {}!", request.into_inner().name) }; - Ok(Response::new(reply)) + Ok(Response::new(out_stream)) } }