Skip to content

Commit

Permalink
poc
Browse files Browse the repository at this point in the history
  • Loading branch information
Okm165 committed Jun 29, 2024
1 parent 5556bdb commit 13fd761
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 49 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ tonic = "0.11.0"
prost = "0.12"
prost-types = "0.12"
tonic-build = "0.11.0"
tokio-stream = "0.1.15"

zetina-common = { path = "crates/common" }
zetina-compiler = { path = "crates/compiler" }
Expand Down
1 change: 1 addition & 0 deletions crates/delegator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ tonic.workspace = true
prost.workspace = true
async-stream.workspace = true
tokio-util.workspace = true
tokio-stream.workspace = true

[build-dependencies]
tonic-build.workspace = true
6 changes: 3 additions & 3 deletions crates/delegator/proto/delegator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ syntax = "proto3";
package delegator;

service DelegatorService {
rpc delegate(DelegateRequest) returns (DelegateResponse) {}
rpc Delegator(stream DelegateRequest) returns (stream DelegateResponse) {}
}

message DelegateRequest { string name = 1; }
message DelegateResponse { string message = 1; }
message DelegateRequest { bytes cairo_pie = 1; }
message DelegateResponse { bytes proof = 1; }
36 changes: 9 additions & 27 deletions crates/delegator/src/delegator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,53 +4,35 @@ use event_loop::delegator_loop;
use futures::executor::block_on;
use futures::FutureExt;
use libp2p::gossipsub::Event;
use starknet::signers::SigningKey;
use std::hash::{DefaultHasher, Hash, Hasher};
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use tracing::info;
use zetina_common::{
hash,
job::{Job, JobData},
job_witness::JobWitness,
use tokio::{
sync::{broadcast, mpsc},
task::JoinHandle,
};
use tokio_util::sync::CancellationToken;
use zetina_common::job_witness::JobWitness;

pub struct Delegator<'identity> {
signing_key: &'identity SigningKey,
job_topic_tx: mpsc::Sender<Vec<u8>>,
pub struct Delegator {
cancellation_token: CancellationToken,
handle: Option<JoinHandle<()>>,
}

impl<'identity> Delegator<'identity> {
impl Delegator {
pub fn new(
signing_key: &'identity SigningKey,
job_topic_tx: mpsc::Sender<Vec<u8>>,
job_witness_tx: mpsc::Sender<JobWitness>,
job_witness_tx: broadcast::Sender<JobWitness>,
events_rx: mpsc::Receiver<Event>,
) -> Self {
let cancellation_token = CancellationToken::new();

Self {
signing_key,
job_topic_tx,
cancellation_token: cancellation_token.to_owned(),
handle: Some(tokio::spawn(async move {
delegator_loop(events_rx, job_witness_tx, cancellation_token).boxed().await
})),
}
}

pub async fn delegate(self, job_data: JobData) -> Result<(), mpsc::error::SendError<Vec<u8>>> {
let job = Job::try_from_job_data(job_data, self.signing_key);
let serialized_job = serde_json::to_string(&job).unwrap();
self.job_topic_tx.send(serialized_job.into()).await?;
info!("Sent a new job: {}", hash!(&job));
Ok(())
}
}

impl<'identity> Drop for Delegator<'identity> {
impl Drop for Delegator {
fn drop(&mut self) {
self.cancellation_token.cancel();
block_on(async move {
Expand Down
6 changes: 3 additions & 3 deletions crates/delegator/src/delegator/event_loop.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use libp2p::gossipsub::Event;
use std::hash::{DefaultHasher, Hash, Hasher};
use tokio::sync::mpsc;
use tokio::sync::{broadcast, mpsc};
use tokio_util::sync::CancellationToken;
use tracing::info;
use zetina_common::{
Expand All @@ -13,7 +13,7 @@ use zetina_common::{

pub async fn delegator_loop(
mut message_stream: mpsc::Receiver<Event>,
job_witness_tx: mpsc::Sender<JobWitness>,
job_witness_tx: broadcast::Sender<JobWitness>,
cancellation_token: CancellationToken,
) {
loop {
Expand All @@ -35,7 +35,7 @@ pub async fn delegator_loop(
if message.topic == gossipsub_ident_topic(Network::Sepolia, Topic::FinishedJob).into() {
let job_witness: JobWitness = serde_json::from_slice(&message.data).unwrap();
info!("Received finished job event: {}", hash!(&job_witness));
job_witness_tx.send(job_witness).await.unwrap();
job_witness_tx.send(job_witness).unwrap();
}
},
Event::Subscribed { peer_id, topic } => {
Expand Down
20 changes: 14 additions & 6 deletions crates/delegator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ pub mod delegator;
pub mod swarm;
pub mod tonic;

use ::tonic::transport::Server;
use delegator::Delegator;
use libp2p::gossipsub;
use starknet::providers::{jsonrpc::HttpTransport, JsonRpcClient, Url};
use swarm::SwarmRunner;
use tokio::sync::mpsc;
use tokio::sync::{broadcast, mpsc};
use tonic::{proto::delegator_service_server::DelegatorServiceServer, DelegatorGRPCServer};
use tracing_subscriber::EnvFilter;
use zetina_common::{
job_witness::JobWitness,
Expand Down Expand Up @@ -40,7 +42,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let finished_job_topic = gossipsub_ident_topic(network, Topic::FinishedJob);

let (swarm_events_tx, swarm_events_rx) = mpsc::channel::<gossipsub::Event>(100);
let (job_witness_tx, job_witness_rx) = mpsc::channel::<JobWitness>(100);
let (job_witness_tx, job_witness_rx) = broadcast::channel::<JobWitness>(100);

let (new_job_topic_tx, new_job_topic_rx) = mpsc::channel::<Vec<u8>>(100);

Expand All @@ -51,12 +53,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
swarm_events_tx,
)?;

Delegator::new(
node_account.get_signing_key(),
Delegator::new(job_witness_tx, swarm_events_rx);

let server = DelegatorGRPCServer::new(
node_account.get_signing_key().to_owned(),
new_job_topic_tx,
job_witness_tx,
swarm_events_rx,
job_witness_rx,
);

Server::builder()
.add_service(DelegatorServiceServer::new(server))
.serve("[::1]:50051".parse().unwrap())
.await?;

Ok(())
}
2 changes: 0 additions & 2 deletions crates/delegator/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ impl SwarmRunner {

Ok(gossipsub::Behaviour::new(message_authenticity, config)?)
}

pub async fn run() {}
}

impl Drop for SwarmRunner {
Expand Down
69 changes: 61 additions & 8 deletions crates/delegator/src/tonic.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,76 @@
use tonic::{transport::Server, Request, Response, Status};
use async_stream::stream;
use futures::{Stream, StreamExt, TryStreamExt};
use starknet::signers::SigningKey;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::pin::Pin;
use tokio::{
select,
sync::{broadcast, mpsc},
};
use tonic::{Request, Response, Status, Streaming};
use tracing::info;
use zetina_common::hash;
use zetina_common::{
job::{Job, JobData},
job_witness::JobWitness,
};

pub mod proto {
tonic::include_proto!("delegator");
}
use crate::tonic::proto::delegator_service_server::DelegatorService;
use proto::{DelegateRequest, DelegateResponse};

#[derive(Default)]
pub struct DelegatorGRPCServer {}
pub struct DelegatorGRPCServer {
signing_key: SigningKey,
job_topic_tx: mpsc::Sender<Vec<u8>>,
job_witness_rx: broadcast::Receiver<JobWitness>,
}

impl DelegatorGRPCServer {
pub fn new(
signing_key: SigningKey,
job_topic_tx: mpsc::Sender<Vec<u8>>,
job_witness_rx: broadcast::Receiver<JobWitness>,
) -> Self {
Self { signing_key, job_topic_tx, job_witness_rx }
}
}

#[tonic::async_trait]
impl DelegatorService for DelegatorGRPCServer {
async fn delegate(
type DelegatorStream = Pin<Box<dyn Stream<Item = Result<DelegateResponse, Status>> + Send>>;
async fn delegator(
&self,
request: Request<DelegateRequest>,
) -> Result<Response<DelegateResponse>, Status> {
request: Request<Streaming<DelegateRequest>>,
) -> Result<Response<Self::DelegatorStream>, Status> {
println!("Got a request from {:?}", request.remote_addr());
let mut in_stream = request.into_inner().into_stream().fuse();
let job_channel = self.job_topic_tx.clone();
let mut witness_channel = self.job_witness_rx.resubscribe();
let signing_key = self.signing_key.clone();

let out_stream = stream! {
loop {
select! {
Ok(request) = in_stream.select_next_some() => {
let job_data = JobData::new(0, request.cairo_pie);
let job = Job::try_from_job_data(job_data, &signing_key);
let serialized_job = serde_json::to_string(&job).unwrap();
job_channel.send(serialized_job.into()).await.unwrap();
info!("Sent a new job: {}", hash!(&job));
}
Ok(rx) = witness_channel.recv() => {
yield Ok(DelegateResponse { proof: rx.proof })
}
else => {
yield Err(Status::cancelled(""))
}
}
}
}
.boxed();

let reply = DelegateResponse { message: format!("Hello {}!", request.into_inner().name) };
Ok(Response::new(reply))
Ok(Response::new(out_stream))
}
}

0 comments on commit 13fd761

Please sign in to comment.