Skip to content

Commit

Permalink
Merge pull request #30 from iosis-tech/refactor
Browse files Browse the repository at this point in the history
Refactor
  • Loading branch information
Okm165 authored Aug 19, 2024
2 parents 73d5daf + 3fdb79c commit 1aaaecf
Show file tree
Hide file tree
Showing 16 changed files with 880 additions and 914 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ bootloader*.json
*.swp
*.swo
.env
.resources

# IDE-specific
.vscode/
Expand Down
4 changes: 3 additions & 1 deletion crates/delegator/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub struct JobEventsRequest {
#[derive(Debug, Serialize)]
#[serde(tag = "type", content = "data")]
pub enum JobEventsResponse {
Propagated,
BidReceived(String),
Delegated(String),
Finished(Vec<u8>),
Expand All @@ -70,7 +71,7 @@ pub async fn job_events_handler(
Query(input): Query<JobEventsRequest>,
) -> Sse<impl Stream<Item = Result<Event, io::Error>>> {
let stream = stream! {
let job_key = kad::RecordKey::new(
let job_key = kad::RecordKey::new(
&hex::decode(input.job_key)
.map_err(|e| io::Error::new(io::ErrorKind::BrokenPipe, e.to_string()))?
);
Expand All @@ -81,6 +82,7 @@ pub async fn job_events_handler(
yield Event::default()
.json_data(
match event {
DelegatorEvent::Propagated => { JobEventsResponse::Propagated },
DelegatorEvent::BidReceived(peer_id) => { JobEventsResponse::BidReceived(peer_id.to_base58()) },
DelegatorEvent::Delegated(peer_id) => { JobEventsResponse::Delegated(peer_id.to_base58()) },
DelegatorEvent::Finished(data) => { JobEventsResponse::Finished(data) },
Expand Down
46 changes: 33 additions & 13 deletions crates/delegator/src/delegator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ use thiserror::Error;
use tokio::sync::{broadcast, mpsc};
use tokio::{sync::mpsc::Sender, task::JoinHandle};
use tokio_stream::StreamExt;
use tracing::{error, info};
use tracing::{error, info, warn};
use zetina_common::graceful_shutdown::shutdown_signal;
use zetina_common::hash;
use zetina_common::job::{Job, JobBid, JobData};
use zetina_common::job_witness::JobWitness;
use zetina_common::process::Process;
use zetina_peer::swarm::{
use zetina_peer::{
DelegationMessage, GossipsubMessage, KademliaMessage, MarketMessage, PeerBehaviourEvent, Topic,
};

Expand Down Expand Up @@ -91,7 +91,8 @@ impl Delegator {
info!("Propagated job: {} for bidding", hex::encode(&key));
let (process, bid_tx) = BidQueue::run(key.to_owned());
job_bid_scheduler.push(process);
job_hash_store.insert(key, bid_tx);
job_hash_store.insert(key.to_owned(), bid_tx);
events_tx.send((key, DelegatorEvent::Propagated))?;
},
kad::QueryResult::GetRecord(Ok(
kad::GetRecordOk::FoundRecord(kad::PeerRecord {
Expand All @@ -111,16 +112,34 @@ impl Delegator {
_ => {}
}
}
Some(Ok((job_key, bids))) = job_bid_scheduler.next() => {
let bid = bids.first_key_value().unwrap();
let price = *bid.0;
let identity = *bid.1.first().unwrap();
info!("Job {} delegated to best bidder: {}", hex::encode(&job_key), identity);
gossipsub_tx.send(GossipsubMessage {
topic: Topic::Delegation.into(),
data: serde_json::to_vec(&DelegationMessage::Delegate(JobBid{identity, job_key: job_key.to_owned(), price}))?
}).await?;
events_tx.send((job_key, DelegatorEvent::Delegated(identity)))?;
Some(Ok((job_key, mut bids))) = job_bid_scheduler.next() => {
if let Some((price, identities)) = bids.pop_first() {
if identities.is_empty() {
warn!("Job {} did not receive any bids", hex::encode(&job_key));
} else {
for identity in identities {
let result = Box::pin(async {
gossipsub_tx.send(GossipsubMessage {
topic: Topic::Delegation.into(),
data: serde_json::to_vec(&DelegationMessage::Delegate(JobBid {
identity,
job_key: job_key.clone(),
price,
}))?,
}).await?;

events_tx.send((job_key.clone(), DelegatorEvent::Delegated(identity)))?;
info!("Job {} delegated to best bidder: {}", hex::encode(&job_key), &identity);
Ok::<(), Error>(())
}).await;

match result {
Ok(_) => break, // Break after successful delegation
Err(err) => error!(?err, "Failed to delegate job {}", hex::encode(&job_key)),
}
}
}
}
}
_ = shutdown_signal() => {
break
Expand All @@ -147,6 +166,7 @@ impl Drop for Delegator {

#[derive(Debug, Clone)]
pub enum DelegatorEvent {
Propagated,
BidReceived(PeerId),
Delegated(PeerId),
Finished(Vec<u8>),
Expand Down
2 changes: 1 addition & 1 deletion crates/delegator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tower_http::{
};
use tracing_subscriber::EnvFilter;
use zetina_common::{graceful_shutdown::shutdown_signal, job::JobData};
use zetina_peer::swarm::{GossipsubMessage, KademliaMessage, SwarmRunner};
use zetina_peer::{GossipsubMessage, KademliaMessage, SwarmRunner};

#[derive(Parser)]
struct Cli {
Expand Down
4 changes: 2 additions & 2 deletions crates/executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use zetina_common::{
graceful_shutdown::shutdown_signal, job::JobBid, job_trace::JobTrace, job_witness::JobWitness,
process::Process,
};
use zetina_peer::swarm::{
use zetina_peer::{
DelegationMessage, GossipsubMessage, KademliaMessage, MarketMessage, PeerBehaviourEvent, Topic,
};
use zetina_prover::{
Expand Down Expand Up @@ -62,7 +62,7 @@ impl Executor {
data: serde_json::to_vec(&MarketMessage::JobBid(JobBid {
identity,
job_key,
price: (runner_scheduler.len() * prover_scheduler.len()) as u64,
price: (runner_scheduler.len() + 2 * prover_scheduler.len()) as u64,
}))?
})
.await?
Expand Down
2 changes: 1 addition & 1 deletion crates/executor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::{net::TcpListener, sync::mpsc};
use tower_http::{timeout::TimeoutLayer, trace::TraceLayer};
use tracing_subscriber::EnvFilter;
use zetina_common::graceful_shutdown::shutdown_signal;
use zetina_peer::swarm::{GossipsubMessage, KademliaMessage, SwarmRunner};
use zetina_peer::{GossipsubMessage, KademliaMessage, SwarmRunner};
use zetina_prover::stone_prover::StoneProver;
use zetina_runner::cairo_runner::CairoRunner;

Expand Down
Loading

0 comments on commit 1aaaecf

Please sign in to comment.