Skip to content

Commit

Permalink
zetina working
Browse files Browse the repository at this point in the history
  • Loading branch information
Okm165 committed Jun 30, 2024
1 parent bf5c464 commit 187e57a
Show file tree
Hide file tree
Showing 20 changed files with 1,513 additions and 194 deletions.
34 changes: 32 additions & 2 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,32 @@
target
*docker*
# Ignore Python virtual environment
venv/
env/

# Ignore Rust-related files
debug/
target/
Cargo.lock
*.rs.bk
*.pdb
/target
Scarb.lock
.snfoundry_cache/
stone-prover
bootloader*.json

# Ignore Python-specific files
*.pyc
__pycache__/
dist/
build/
*.egg-info/

# Ignore IPython/Jupyter Notebook checkpoints
.ipynb_checkpoints/

# Additional Docker-specific ignores
.dockerignore
.dockerignore.template
.dockerignore.generated
.dockerignore.custom
.dockerignore.local
21 changes: 20 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,23 @@ Scarb.lock

stone-prover

bootloader*.json
bootloader*.json

# Python
*.pyc
__pycache__/
venv/
env/
dist/
build/
*.egg-info/

# PyInstaller
dist/
build/

# IPython Notebook
.ipynb_checkpoints

# Jupyter Notebook
.ipynb_checkpoints/
19 changes: 7 additions & 12 deletions crates/delegator/src/delegator.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use futures::executor::block_on;
use libp2p::gossipsub::Event;
use std::hash::{DefaultHasher, Hash, Hasher};
use tokio::{
sync::{broadcast, mpsc},
task::JoinHandle,
};
use tokio_util::sync::CancellationToken;
use tracing::info;
use zetina_common::{
graceful_shutdown::shutdown_signal,
hash,
job::Job,
job_witness::JobWitness,
Expand All @@ -16,7 +15,6 @@ use zetina_common::{
};

pub struct Delegator {
cancellation_token: CancellationToken,
handle: Option<JoinHandle<Result<(), DelegatorError>>>,
}

Expand All @@ -25,10 +23,7 @@ impl Delegator {
job_witness_tx: broadcast::Sender<JobWitness>,
mut events_rx: mpsc::Receiver<Event>,
) -> Self {
let cancellation_token = CancellationToken::new();

Self {
cancellation_token: cancellation_token.to_owned(),
handle: Some(tokio::spawn(async move {
loop {
tokio::select! {
Expand All @@ -48,7 +43,7 @@ impl Delegator {
// Received a finished-job message from the network
if message.topic == gossipsub_ident_topic(Network::Sepolia, Topic::FinishedJob).into() {
let job_witness: JobWitness = serde_json::from_slice(&message.data)?;
info!("Received finished job event: {}", hash!(&job_witness));
info!("Received finished job event: {}", &job_witness.job_hash);
job_witness_tx.send(job_witness)?;
}
},
Expand All @@ -61,7 +56,7 @@ impl Delegator {
_ => {}
}
},
_ = cancellation_token.cancelled() => {
_ = shutdown_signal() => {
break
}
else => break
Expand All @@ -75,12 +70,12 @@ impl Delegator {

impl Drop for Delegator {
fn drop(&mut self) {
self.cancellation_token.cancel();
block_on(async move {
if let Some(handle) = self.handle.take() {
let handle = self.handle.take();
tokio::spawn(async move {
if let Some(handle) = handle {
handle.await.unwrap().unwrap();
}
})
});
}
}

Expand Down
67 changes: 55 additions & 12 deletions crates/delegator/src/swarm.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
pub mod event_loop;

use event_loop::swarm_loop;
use futures::executor::block_on;
use futures::FutureExt;
use futures::StreamExt;
use libp2p::gossipsub::{self, IdentTopic};
use libp2p::identity::Keypair;
use libp2p::swarm::NetworkBehaviour;
use libp2p::swarm::{NetworkBehaviour, SwarmEvent};
use libp2p::{mdns, noise, tcp, yamux, SwarmBuilder};
use std::error::Error;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tracing::{debug, error};
use zetina_common::graceful_shutdown::shutdown_signal;

#[derive(NetworkBehaviour)]
pub struct PeerBehaviour {
Expand All @@ -26,7 +24,7 @@ impl SwarmRunner {
pub fn new(
p2p_local_keypair: &Keypair,
subscribe_topics: Vec<IdentTopic>,
transmit_topics: Vec<(IdentTopic, mpsc::Receiver<Vec<u8>>)>,
mut transmit_topics: Vec<(IdentTopic, mpsc::Receiver<Vec<u8>>)>,
swarm_events_tx: mpsc::Sender<gossipsub::Event>,
) -> Result<Self, Box<dyn Error>> {
let mdns = mdns::tokio::Behaviour::new(
Expand All @@ -38,7 +36,11 @@ impl SwarmRunner {
let local_keypair = p2p_local_keypair.clone();
let mut swarm = SwarmBuilder::with_existing_identity(local_keypair)
.with_tokio()
.with_tcp(tcp::Config::default(), noise::Config::new, yamux::Config::default)?
.with_tcp(
tcp::Config::default().port_reuse(true),
noise::Config::new,
yamux::Config::default,
)?
.with_quic()
.with_behaviour(|_| behaviour)?
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
Expand All @@ -53,7 +55,47 @@ impl SwarmRunner {

Ok(SwarmRunner {
handle: Some(tokio::spawn(async move {
swarm_loop(swarm, transmit_topics, swarm_events_tx).boxed().await
loop {
tokio::select! {
Some(data) = transmit_topics[0].1.recv() => {
debug!("Publishing to topic: {:?}", transmit_topics[0].0);
if let Err(e) = swarm
.behaviour_mut().gossipsub
.publish(transmit_topics[0].0.clone(), data) {
error!("Publish error: {e:?}");
}
},
event = swarm.select_next_some() => match event {
SwarmEvent::Behaviour(PeerBehaviourEvent::Mdns(mdns::Event::Discovered(list))) => {
for (peer_id, _multiaddr) in list {
debug!("mDNS discovered a new peer: {peer_id}");
swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
}
},
SwarmEvent::Behaviour(PeerBehaviourEvent::Mdns(mdns::Event::Expired(list))) => {
for (peer_id, _multiaddr) in list {
debug!("mDNS discover peer has expired: {peer_id}");
swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id);
}
},
SwarmEvent::Behaviour(PeerBehaviourEvent::Gossipsub(gossipsub::Event::Message {
propagation_source,
message_id,
message,
})) => {
swarm_events_tx.send(gossipsub::Event::Message { propagation_source, message_id, message }).await.unwrap();
},
SwarmEvent::NewListenAddr { address, .. } => {
debug!("Local node is listening on {address}");
}
_ => {}
},
_ = shutdown_signal() => {
break
}
else => break
}
}
})),
})
}
Expand All @@ -74,10 +116,11 @@ impl SwarmRunner {

impl Drop for SwarmRunner {
fn drop(&mut self) {
block_on(async move {
if let Some(handle) = self.handle.take() {
let handle = self.handle.take();
tokio::spawn(async move {
if let Some(handle) = handle {
handle.await.unwrap();
}
})
});
}
}
59 changes: 0 additions & 59 deletions crates/delegator/src/swarm/event_loop.rs

This file was deleted.

29 changes: 16 additions & 13 deletions crates/delegator/src/tonic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,9 @@ use starknet::signers::SigningKey;
use std::collections::HashSet;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::pin::Pin;
use tokio::{
select,
sync::{broadcast, mpsc},
};
use tokio::sync::{broadcast, mpsc};
use tonic::{Request, Response, Status, Streaming};
use tracing::{debug, info};
use tracing::{debug, error, info};
use zetina_common::hash;
use zetina_common::{
job::{Job, JobData},
Expand Down Expand Up @@ -55,14 +52,19 @@ impl DelegatorService for DelegatorGRPCServer {

let out_stream = stream! {
loop {
select! {
Ok(request) = in_stream.select_next_some() => {
let job_data = JobData::new(0, request.cairo_pie);
let job = Job::try_from_job_data(job_data, &signing_key);
queue_set.insert(hash!(job));
let serialized_job = serde_json::to_string(&job).unwrap();
job_channel.send(serialized_job.into()).await.unwrap();
info!("Sent a new job: {}", hash!(&job));
tokio::select! {
Some(request) = in_stream.next() => {
match request {
Ok(r) => {
let job_data = JobData::new(0, r.cairo_pie);
let job = Job::try_from_job_data(job_data, &signing_key);
queue_set.insert(hash!(job));
let serialized_job = serde_json::to_string(&job).unwrap();
job_channel.send(serialized_job.into()).await.unwrap();
info!("Sent a new job: {}", hash!(&job));
}
Err(err) => error!("Error: {}",err)
}
}
Ok(rx) = witness_channel.recv() => {
debug!("Received job witness: {}", &rx.job_hash);
Expand All @@ -72,6 +74,7 @@ impl DelegatorService for DelegatorGRPCServer {
}
}
else => {
error!("Stream cancelled!");
yield Err(Status::cancelled(""))
}
}
Expand Down
Loading

0 comments on commit 187e57a

Please sign in to comment.