diff --git a/crates/delegator/src/delegator.rs b/crates/delegator/src/delegator.rs index 6b3fad5..cc081f6 100644 --- a/crates/delegator/src/delegator.rs +++ b/crates/delegator/src/delegator.rs @@ -14,6 +14,7 @@ use tracing::{error, info}; use zetina_common::graceful_shutdown::shutdown_signal; use zetina_common::hash; use zetina_common::job::{Job, JobBid, JobData}; +use zetina_common::job_witness::JobWitness; use zetina_common::process::Process; use zetina_peer::swarm::{ DelegationMessage, GossipsubMessage, KademliaMessage, MarketMessage, PeerBehaviourEvent, Topic, @@ -41,6 +42,8 @@ impl Delegator { >::new(); let mut job_hash_store = HashMap::>::new(); + let mut proof_hash_store = HashMap::::new(); + loop { tokio::select! { Some(job_data) = delegate_rx.recv() => { @@ -67,9 +70,12 @@ impl Delegator { } if message.topic == Topic::Delegation.into() { match serde_json::from_slice::(&message.data)? { - DelegationMessage::Finished(job_witness) => { - info!("Received finished job: {}", hex::encode(&job_witness.job_key)); - events_tx.send((job_witness.job_key, DelegatorEvent::Finished(job_witness.proof)))?; + DelegationMessage::Finished(proof_key, job_key) => { + if job_hash_store.remove(&job_key).is_some() { + info!("Received finished job: {} proof key: {}", hex::encode(&job_key), hex::encode(&proof_key)); + proof_hash_store.insert(proof_key.to_owned(), job_key); + kademlia_tx.send(KademliaMessage::GET(proof_key)).await?; + } } _ => {} } @@ -86,7 +92,19 @@ impl Delegator { let (process, bid_tx) = BidQueue::run(key.to_owned()); job_bid_scheduler.push(process); job_hash_store.insert(key, bid_tx); - } + }, + kad::QueryResult::GetRecord(Ok( + kad::GetRecordOk::FoundRecord(kad::PeerRecord { + record: kad::Record { key, value, .. }, + .. + }) + )) => { + if let Some ((proof_key, job_key)) = proof_hash_store.remove_entry(&key) { + info!("job {} proof with key: {} returned in DHT", hex::encode(&job_key), hex::encode(&proof_key)); + let job_witness: JobWitness = serde_json::from_slice(&value)?; + events_tx.send((job_key, DelegatorEvent::Finished(job_witness.proof)))?; + } + }, _ => {} } } @@ -94,7 +112,6 @@ impl Delegator { } } Some(Ok((job_key, bids))) = job_bid_scheduler.next() => { - job_hash_store.remove(&job_key); let bid = bids.first_key_value().unwrap(); let price = *bid.0; let identity = *bid.1.first().unwrap(); diff --git a/crates/executor/src/executor.rs b/crates/executor/src/executor.rs index 16c5fd8..31fa25e 100644 --- a/crates/executor/src/executor.rs +++ b/crates/executor/src/executor.rs @@ -1,12 +1,14 @@ use futures::{stream::FuturesUnordered, Stream}; use libp2p::{gossipsub, kad, PeerId}; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; +use std::hash::{DefaultHasher, Hash, Hasher}; use std::pin::Pin; use thiserror::Error; use tokio::sync::mpsc; use tokio::{sync::mpsc::Sender, task::JoinHandle}; use tokio_stream::StreamExt; use tracing::{error, info}; +use zetina_common::hash; use zetina_common::job::Job; use zetina_common::{ graceful_shutdown::shutdown_signal, job::JobBid, job_trace::JobTrace, job_witness::JobWitness, @@ -44,6 +46,7 @@ impl Executor { >::new(); let mut job_hash_store = HashSet::::new(); + let mut proof_hash_store = HashMap::::new(); loop { tokio::select! { @@ -88,11 +91,19 @@ impl Executor { .. }) )) => { - if job_hash_store.contains(&key) { + if job_hash_store.remove(&key) { let job: Job = serde_json::from_slice(&value)?; info!("received delegation of job: {}", hex::encode(&key)); runner_scheduler.push(runner.run(job)?); - job_hash_store.remove(&key); + } + }, + kad::QueryResult::PutRecord(Ok(kad::PutRecordOk { key })) => { + if let Some ((proof_key, job_key)) = proof_hash_store.remove_entry(&key) { + info!("job {} proof with key: {} stored in DHT", hex::encode(&job_key), hex::encode(&proof_key)); + gossipsub_tx.send(GossipsubMessage { + topic: Topic::Delegation.into(), + data: serde_json::to_vec(&DelegationMessage::Finished(proof_key, job_key))? + }).await?; } } _ => {} @@ -106,11 +117,12 @@ impl Executor { prover_scheduler.push(prover.run(job_trace)?); }, Some(Ok(job_witness)) = prover_scheduler.next() => { - info!("Finished proving: {}", hex::encode(&job_witness.job_key)); - gossipsub_tx.send(GossipsubMessage { - topic: Topic::Delegation.into(), - data: serde_json::to_vec(&DelegationMessage::Finished(job_witness))? - }).await?; + let proof_key = kad::RecordKey::new(&hash!(job_witness).to_be_bytes()); + info!("Finished proving job: {} proof key: {}", hex::encode(&job_witness.job_key), hex::encode(&proof_key)); + proof_hash_store.insert(proof_key.to_owned(), job_witness.job_key.to_owned()); + kademlia_tx.send(KademliaMessage::PUT( + (proof_key, serde_json::to_vec(&job_witness)?) + )).await?; }, _ = shutdown_signal() => { break diff --git a/crates/peer/src/swarm.rs b/crates/peer/src/swarm.rs index 421243a..3992b4a 100644 --- a/crates/peer/src/swarm.rs +++ b/crates/peer/src/swarm.rs @@ -14,7 +14,6 @@ use tokio::sync::mpsc; use tracing::{debug, error, info}; use zetina_common::graceful_shutdown::shutdown_signal; use zetina_common::job::{Job, JobBid}; -use zetina_common::job_witness::JobWitness; #[derive(NetworkBehaviour)] pub struct PeerBehaviour { @@ -83,7 +82,7 @@ pub enum MarketMessage { #[derive(Debug, Serialize, Deserialize)] pub enum DelegationMessage { Delegate(JobBid), - Finished(JobWitness), + Finished(kad::RecordKey, kad::RecordKey), } impl SwarmRunner {