Skip to content

Commit

Permalink
improved grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
Okm165 committed Jul 2, 2024
1 parent 7e7ea7d commit 0f13cdd
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 46 deletions.
34 changes: 30 additions & 4 deletions crates/delegator/proto/delegator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,37 @@ syntax = "proto3";
package delegator;

service DelegatorService {
rpc Delegator(stream DelegateRequest) returns (stream DelegateResponse) {}
rpc Delegate(DelegateRequest) returns (DelegateResponse) {}
rpc Events(EventsRequest) returns (stream EventsResponse) {}
}

message DelegateRequest {
bytes cairo_pie = 1;
}

message DelegateRequest { bytes cairo_pie = 1; }
message DelegateResponse {
bytes proof = 1;
uint64 job_hash = 2;
}
}

message EventsRequest {
uint64 job_hash = 1;
}

message EventsResponse {
EventType event_type = 1;
oneof event {
Picked picked = 2;
Proven proven = 3;
}
}

enum EventType {
PICKED = 0;
PROVEN = 1;
}

message Picked {}

message Proven {
bytes proof = 1;
}
7 changes: 6 additions & 1 deletion crates/delegator/src/delegator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub struct Delegator {

impl Delegator {
pub fn new(
job_picked_tx: broadcast::Sender<Job>,
job_witness_tx: broadcast::Sender<JobWitness>,
mut events_rx: mpsc::Receiver<Event>,
) -> Self {
Expand All @@ -39,6 +40,7 @@ impl Delegator {
if message.topic == gossipsub_ident_topic(Network::Sepolia, Topic::PickedJob).into() {
let job: Job = serde_json::from_slice(&message.data)?;
info!("Received picked job event: {}", hash!(&job));
job_picked_tx.send(job)?;
}
// Received a finished-job message from the network
if message.topic == gossipsub_ident_topic(Network::Sepolia, Topic::FinishedJob).into() {
Expand Down Expand Up @@ -84,7 +86,10 @@ use thiserror::Error;
#[derive(Error, Debug)]
pub enum DelegatorError {
#[error("broadcast_send_error")]
BroadcastSendError(#[from] tokio::sync::broadcast::error::SendError<JobWitness>),
BroadcastJobSendError(#[from] tokio::sync::broadcast::error::SendError<Job>),

#[error("broadcast_send_error")]
BroadcastJobWitnessSendError(#[from] tokio::sync::broadcast::error::SendError<JobWitness>),

#[error("io")]
Io(#[from] std::io::Error),
Expand Down
5 changes: 4 additions & 1 deletion crates/delegator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tonic::{proto::delegator_service_server::DelegatorServiceServer, DelegatorGR
use tracing_subscriber::EnvFilter;
use zetina_common::{
graceful_shutdown::shutdown_signal,
job::Job,
job_witness::JobWitness,
network::Network,
node_account::NodeAccount,
Expand Down Expand Up @@ -43,6 +44,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let finished_job_topic = gossipsub_ident_topic(network, Topic::FinishedJob);

let (swarm_events_tx, swarm_events_rx) = mpsc::channel::<gossipsub::Event>(100);
let (job_picked_tx, job_picked_rx) = broadcast::channel::<Job>(100);
let (job_witness_tx, job_witness_rx) = broadcast::channel::<JobWitness>(100);

let (new_job_topic_tx, new_job_topic_rx) = mpsc::channel::<Vec<u8>>(100);
Expand All @@ -54,11 +56,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
swarm_events_tx,
)?;

Delegator::new(job_witness_tx, swarm_events_rx);
Delegator::new(job_picked_tx, job_witness_tx, swarm_events_rx);

let server = DelegatorGRPCServer::new(
node_account.get_signing_key().to_owned(),
new_job_topic_tx,
job_picked_rx,
job_witness_rx,
);

Expand Down
88 changes: 48 additions & 40 deletions crates/delegator/src/tonic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@ pub mod proto {
tonic::include_proto!("delegator");
}
use proto::delegator_service_server::DelegatorService;
use proto::{DelegateRequest, DelegateResponse};
use proto::events_response::Event;
use proto::{
DelegateRequest, DelegateResponse, EventType, EventsRequest, EventsResponse, Picked, Proven,
};

use async_stream::stream;
use futures::{Stream, StreamExt, TryStreamExt};
use futures::{Stream, StreamExt};
use starknet::signers::SigningKey;
use std::collections::HashSet;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::pin::Pin;
use tokio::sync::{broadcast, mpsc};
use tonic::{Request, Response, Status, Streaming};
use tracing::{debug, error, info};
use tonic::Response;
use tracing::info;
use zetina_common::hash;
use zetina_common::{
job::{Job, JobData},
Expand All @@ -22,66 +24,72 @@ use zetina_common::{
pub struct DelegatorGRPCServer {
signing_key: SigningKey,
job_topic_tx: mpsc::Sender<Vec<u8>>,
job_picked_rx: broadcast::Receiver<Job>,
job_witness_rx: broadcast::Receiver<JobWitness>,
}

impl DelegatorGRPCServer {
pub fn new(
signing_key: SigningKey,
job_topic_tx: mpsc::Sender<Vec<u8>>,
job_picked_rx: broadcast::Receiver<Job>,
job_witness_rx: broadcast::Receiver<JobWitness>,
) -> Self {
Self { signing_key, job_topic_tx, job_witness_rx }
Self { signing_key, job_topic_tx, job_picked_rx, job_witness_rx }
}
}

#[tonic::async_trait]
impl DelegatorService for DelegatorGRPCServer {
type DelegatorStream = Pin<Box<dyn Stream<Item = Result<DelegateResponse, Status>> + Send>>;
async fn delegator(
&self,
request: Request<Streaming<DelegateRequest>>,
) -> Result<Response<Self::DelegatorStream>, Status> {
info!("Got a request from {:?}", request.remote_addr());
let mut in_stream = request.into_inner().into_stream().fuse();
let job_channel = self.job_topic_tx.clone();
let mut witness_channel = self.job_witness_rx.resubscribe();
let signing_key = self.signing_key.clone();
type EventsStream =
Pin<Box<dyn Stream<Item = Result<EventsResponse, tonic::Status>> + Send + 'static>>;

let mut queue_set = HashSet::<u64>::new();
async fn delegate(
&self,
request: tonic::Request<DelegateRequest>,
) -> Result<tonic::Response<DelegateResponse>, tonic::Status> {
let cairo_pie = request.into_inner().cairo_pie;
let job_data = JobData::new(0, cairo_pie);
let job = Job::try_from_job_data(job_data, &self.signing_key);
let serialized_job = serde_json::to_string(&job).unwrap();
self.job_topic_tx.send(serialized_job.into()).await.unwrap();
info!("Sent a new job: {}", hash!(&job));
Ok(Response::new(DelegateResponse { job_hash: hash!(&job) }))
}

let out_stream = stream! {
async fn events(
&self,
request: tonic::Request<EventsRequest>,
) -> Result<tonic::Response<Self::EventsStream>, tonic::Status> {
let job_hash = request.into_inner().job_hash;
let mut job_picked_rx = self.job_picked_rx.resubscribe();
let mut job_witness_rx = self.job_witness_rx.resubscribe();
let out_stream: Self::EventsStream = stream! {
loop {
tokio::select! {
Some(request) = in_stream.next() => {
match request {
Ok(r) => {
let job_data = JobData::new(0, r.cairo_pie);
let job = Job::try_from_job_data(job_data, &signing_key);
queue_set.insert(hash!(job));
let serialized_job = serde_json::to_string(&job).unwrap();
job_channel.send(serialized_job.into()).await.unwrap();
info!("Sent a new job: {}", hash!(&job));
}
Err(err) => error!("Error: {}",err)
Ok(job) = job_picked_rx.recv() => {
if hash!(job) == job_hash {
info!("Picked job sent via stream: {}", hash!(&job));
yield Ok(EventsResponse{
event_type: EventType::Picked.into(),
event: Some(Event::Picked(Picked{}))
})
}
}
Ok(rx) = witness_channel.recv() => {
debug!("Received job witness: {}", &rx.job_hash);
if let Some(job_hash) = queue_set.take(&rx.job_hash) {
info!("Received awaited job witness: {}", &job_hash);
yield Ok(DelegateResponse { job_hash, proof: rx.proof })
},
Ok(job_witness) = job_witness_rx.recv() => {
if job_witness.job_hash == job_hash {
info!("Proven job sent via stream: {}", job_witness.job_hash);
yield Ok(EventsResponse{
event_type: EventType::Proven.into(),
event: Some(Event::Proven(Proven{proof: job_witness.proof}))
})
}
}
else => {
error!("Stream cancelled!");
yield Err(Status::cancelled(""))
}
else => break
}
}
}
.boxed();

Ok(Response::new(out_stream))
}
}

0 comments on commit 0f13cdd

Please sign in to comment.