From 28e5fa6b7c6999bd287ccae9319cf64ff3cc0432 Mon Sep 17 00:00:00 2001 From: Bartosz Nowak Date: Sun, 11 Aug 2024 20:36:56 +0200 Subject: [PATCH 1/4] jobs store in DHT --- crates/common/src/job.rs | 13 +-- crates/common/src/job_trace.rs | 35 ++++++- crates/common/src/job_witness.rs | 3 +- crates/delegator/src/api.rs | 15 ++- crates/delegator/src/bid_queue.rs | 18 ++-- crates/delegator/src/delegator.rs | 70 ++++++++----- crates/delegator/src/main.rs | 13 +-- crates/executor/src/executor.rs | 52 +++++++--- crates/executor/src/main.rs | 8 +- crates/peer/Cargo.toml | 1 + crates/peer/src/swarm.rs | 98 +++++++++++++++++-- crates/prover/Cargo.toml | 2 + crates/prover/src/stone_prover/mod.rs | 9 +- .../prover/src/stone_prover/tests/models.rs | 7 +- crates/runner/Cargo.toml | 1 + crates/runner/src/cairo_runner/mod.rs | 7 +- 16 files changed, 255 insertions(+), 97 deletions(-) diff --git a/crates/common/src/job.rs b/crates/common/src/job.rs index 7bdd242..408e36b 100644 --- a/crates/common/src/job.rs +++ b/crates/common/src/job.rs @@ -1,6 +1,6 @@ use crate::hash; use cairo_vm::vm::runners::cairo_pie::CairoPie; -use libp2p::PeerId; +use libp2p::{kad, PeerId}; use serde::{Deserialize, Serialize}; use starknet::signers::{SigningKey, VerifyingKey}; use starknet_crypto::{poseidon_hash_many, FieldElement, Signature}; @@ -101,17 +101,10 @@ impl Display for Job { } } -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct JobBid { pub identity: PeerId, - pub job_hash: u64, - pub price: u64, -} - -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] -pub struct JobDelegation { - pub identity: PeerId, - pub job: Job, + pub job_key: kad::RecordKey, pub price: u64, } diff --git a/crates/common/src/job_trace.rs b/crates/common/src/job_trace.rs index ac674a4..f4421c7 100644 --- a/crates/common/src/job_trace.rs +++ b/crates/common/src/job_trace.rs @@ -1,7 +1,9 @@ use crate::hash; +use libp2p::kad; use std::{ fmt::Display, hash::{DefaultHasher, Hash, Hasher}, + mem::ManuallyDrop, }; use tempfile::NamedTempFile; @@ -13,11 +15,38 @@ use tempfile::NamedTempFile; #[derive(Debug)] pub struct JobTrace { - pub job_hash: u64, + pub job_key: kad::RecordKey, pub air_public_input: NamedTempFile, // Temporary file containing the public input pub air_private_input: NamedTempFile, // Temporary file containing the private input; memory and trace files must exist for this to be valid - pub memory: NamedTempFile, // Temporary file containing memory data (required for air_private_input validity) - pub trace: NamedTempFile, // Temporary file containing trace data (required for air_private_input validity) + pub memory: ManuallyDrop, // Temporary file containing memory data (required for air_private_input validity) + pub trace: ManuallyDrop, // Temporary file containing trace data (required for air_private_input validity) +} + +impl JobTrace { + pub fn new( + job_key: kad::RecordKey, + air_public_input: NamedTempFile, + air_private_input: NamedTempFile, + memory: NamedTempFile, + trace: NamedTempFile, + ) -> Self { + Self { + job_key, + air_public_input, + air_private_input, + memory: ManuallyDrop::new(memory), + trace: ManuallyDrop::new(trace), + } + } +} + +impl Drop for JobTrace { + fn drop(&mut self) { + unsafe { + ManuallyDrop::drop(&mut self.memory); + ManuallyDrop::drop(&mut self.trace); + } + } } impl Hash for JobTrace { diff --git a/crates/common/src/job_witness.rs b/crates/common/src/job_witness.rs index 3ccb237..9446be4 100644 --- a/crates/common/src/job_witness.rs +++ b/crates/common/src/job_witness.rs @@ -1,4 +1,5 @@ use crate::hash; +use libp2p::kad; use serde::{Deserialize, Serialize}; use std::{ fmt::Display, @@ -14,7 +15,7 @@ use std::{ #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] pub struct JobWitness { - pub job_hash: u64, + pub job_key: kad::RecordKey, pub proof: Vec, } diff --git a/crates/delegator/src/api.rs b/crates/delegator/src/api.rs index fe57936..0418192 100644 --- a/crates/delegator/src/api.rs +++ b/crates/delegator/src/api.rs @@ -6,7 +6,7 @@ use axum::{ }; use futures::StreamExt; use hyper::StatusCode; -use libp2p::PeerId; +use libp2p::{kad, PeerId}; use serde::{Deserialize, Serialize}; use std::hash::{DefaultHasher, Hash, Hasher}; use std::{io, time::Duration}; @@ -19,7 +19,7 @@ use crate::delegator::DelegatorEvent; #[derive(Debug)] pub struct ServerState { pub delegate_tx: mpsc::Sender, - pub events_rx: broadcast::Receiver<(u64, DelegatorEvent)>, + pub events_rx: broadcast::Receiver<(kad::RecordKey, DelegatorEvent)>, } impl Clone for ServerState { @@ -39,7 +39,7 @@ pub struct DelegateRequest { #[derive(Debug, Serialize)] pub struct DelegateResponse { - job_hash: String, + job_hash: kad::RecordKey, } pub async fn deletage_handler( @@ -47,14 +47,14 @@ pub async fn deletage_handler( Json(input): Json, ) -> Result, StatusCode> { let job_data = JobData::new(input.pie); - let job_data_hash = hash!(&job_data); + let job_data_hash = kad::RecordKey::new(&hash!(job_data).to_be_bytes()); state.delegate_tx.send(job_data).await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - Ok(Json(DelegateResponse { job_hash: job_data_hash.to_string() })) + Ok(Json(DelegateResponse { job_hash: job_data_hash })) } #[derive(Debug, Deserialize)] pub struct JobEventsRequest { - job_hash: String, + job_hash: kad::RecordKey, } #[derive(Debug, Serialize)] @@ -70,8 +70,7 @@ pub async fn job_events_handler( Query(input): Query, ) -> Sse>> { let stream = stream! { - let job_hash = input.job_hash.parse::() - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?; + let job_hash = input.job_hash; loop { tokio::select! { Ok((hash, event)) = state.events_rx.recv() => { diff --git a/crates/delegator/src/bid_queue.rs b/crates/delegator/src/bid_queue.rs index f10fa60..c958442 100644 --- a/crates/delegator/src/bid_queue.rs +++ b/crates/delegator/src/bid_queue.rs @@ -1,7 +1,7 @@ -use libp2p::PeerId; +use libp2p::{kad, PeerId}; use std::{collections::BTreeMap, future::Future, pin::Pin, time::Duration}; use tokio::{sync::mpsc, time::sleep}; -use zetina_common::{job::Job, process::Process}; +use zetina_common::process::Process; pub struct BidQueue {} @@ -19,17 +19,21 @@ impl Default for BidQueue { impl BidQueue { pub fn run<'future>( - job: Job, + job_hash: kad::RecordKey, ) -> ( - Process<'future, Result<(Job, BTreeMap>), BidControllerError>>, + Process<'future, Result<(kad::RecordKey, BTreeMap>), BidControllerError>>, mpsc::Sender<(u64, PeerId)>, ) { let (terminate_tx, mut terminate_rx) = mpsc::channel::<()>(10); let (bid_tx, mut bid_rx) = mpsc::channel::<(u64, PeerId)>(10); let future: Pin< Box< - dyn Future>), BidControllerError>> - + Send + dyn Future< + Output = Result< + (kad::RecordKey, BTreeMap>), + BidControllerError, + >, + > + Send + '_, >, > = Box::pin(async move { @@ -53,7 +57,7 @@ impl BidQueue { } } _ = sleep(duration) => { - break Ok((job, bids.take().ok_or(BidControllerError::BidsTerminated)?)) + break Ok((job_hash, bids.take().ok_or(BidControllerError::BidsTerminated)?)) } _ = terminate_rx.recv() => { break Err(BidControllerError::TaskTerminated); diff --git a/crates/delegator/src/delegator.rs b/crates/delegator/src/delegator.rs index 0b2798d..6b3fad5 100644 --- a/crates/delegator/src/delegator.rs +++ b/crates/delegator/src/delegator.rs @@ -1,7 +1,7 @@ use crate::bid_queue::{BidControllerError, BidQueue}; use futures::stream::FuturesUnordered; use futures::Stream; -use libp2p::{gossipsub, PeerId}; +use libp2p::{gossipsub, kad, PeerId}; use starknet::signers::SigningKey; use std::collections::{BTreeMap, HashMap}; use std::hash::{DefaultHasher, Hash, Hasher}; @@ -13,10 +13,10 @@ use tokio_stream::StreamExt; use tracing::{error, info}; use zetina_common::graceful_shutdown::shutdown_signal; use zetina_common::hash; -use zetina_common::job::{Job, JobData, JobDelegation}; +use zetina_common::job::{Job, JobBid, JobData}; use zetina_common::process::Process; use zetina_peer::swarm::{ - DelegationMessage, GossipsubMessage, MarketMessage, PeerBehaviourEvent, Topic, + DelegationMessage, GossipsubMessage, KademliaMessage, MarketMessage, PeerBehaviourEvent, Topic, }; pub struct Delegator { @@ -27,28 +27,28 @@ impl Delegator { pub fn new( mut swarm_events: Pin + Send>>, gossipsub_tx: Sender, + kademlia_tx: Sender, mut delegate_rx: mpsc::Receiver, - events_tx: broadcast::Sender<(u64, DelegatorEvent)>, + events_tx: broadcast::Sender<(kad::RecordKey, DelegatorEvent)>, signing_key: SigningKey, ) -> Self { Self { handle: Some(tokio::spawn(async move { let mut job_bid_scheduler = FuturesUnordered::< - Process>), BidControllerError>>, + Process< + Result<(kad::RecordKey, BTreeMap>), BidControllerError>, + >, >::new(); - let mut job_hash_store = HashMap::>::new(); + let mut job_hash_store = + HashMap::>::new(); loop { tokio::select! { Some(job_data) = delegate_rx.recv() => { let job = Job::try_from_job_data(job_data, &signing_key); - gossipsub_tx.send(GossipsubMessage { - topic: Topic::Market.into(), - data: serde_json::to_vec(&MarketMessage::Job(job.to_owned()))? - }).await?; - info!("Propagated job: {} for bidding", hash!(job)); - let (process, bid_tx) = BidQueue::run(job.to_owned()); - job_bid_scheduler.push(process); - job_hash_store.insert(hash!(job), bid_tx); + let job_key = kad::RecordKey::new(&hash!(job).to_be_bytes()); + kademlia_tx.send(KademliaMessage::PUT( + (job_key, serde_json::to_vec(&job)?) + )).await?; }, Some(event) = swarm_events.next() => { match event { @@ -56,10 +56,10 @@ impl Delegator { if message.topic == Topic::Market.into() { match serde_json::from_slice::(&message.data)? { MarketMessage::JobBid(job_bid) => { - if let Some(bid_tx) = job_hash_store.get_mut(&job_bid.job_hash) { - info!("Received job bid: {} price: {} from: {}", job_bid.job_hash, job_bid.price, job_bid.identity); + if let Some(bid_tx) = job_hash_store.get_mut(&job_bid.job_key) { + info!("Received job bid: {} price: {} from: {}", hex::encode(&job_bid.job_key), job_bid.price, job_bid.identity); bid_tx.send((job_bid.price, job_bid.identity)).await?; - events_tx.send((job_bid.job_hash, DelegatorEvent::BidReceived(job_bid.identity)))?; + events_tx.send((job_bid.job_key, DelegatorEvent::BidReceived(job_bid.identity)))?; } } _ => {} @@ -68,27 +68,42 @@ impl Delegator { if message.topic == Topic::Delegation.into() { match serde_json::from_slice::(&message.data)? { DelegationMessage::Finished(job_witness) => { - info!("Received finished job: {}", job_witness.job_hash); - events_tx.send((job_witness.job_hash, DelegatorEvent::Finished(job_witness.proof)))?; + info!("Received finished job: {}", hex::encode(&job_witness.job_key)); + events_tx.send((job_witness.job_key, DelegatorEvent::Finished(job_witness.proof)))?; } _ => {} } } + }, + PeerBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { result, ..}) => { + match result { + kad::QueryResult::PutRecord(Ok(kad::PutRecordOk { key })) => { + gossipsub_tx.send(GossipsubMessage { + topic: Topic::Market.into(), + data: serde_json::to_vec(&MarketMessage::JobBidPropagation(key.to_owned()))? + }).await?; + info!("Propagated job: {} for bidding", hex::encode(&key)); + let (process, bid_tx) = BidQueue::run(key.to_owned()); + job_bid_scheduler.push(process); + job_hash_store.insert(key, bid_tx); + } + _ => {} + } } _ => {} } } - Some(Ok((job, bids))) = job_bid_scheduler.next() => { - job_hash_store.remove(&hash!(job)); + Some(Ok((job_key, bids))) = job_bid_scheduler.next() => { + job_hash_store.remove(&job_key); let bid = bids.first_key_value().unwrap(); let price = *bid.0; let identity = *bid.1.first().unwrap(); - info!("Job {} delegated to best bidder: {}", hash!(job), identity); + info!("Job {} delegated to best bidder: {}", hex::encode(&job_key), identity); gossipsub_tx.send(GossipsubMessage { topic: Topic::Delegation.into(), - data: serde_json::to_vec(&DelegationMessage::Delegate(JobDelegation{identity, job: job.to_owned(), price}))? + data: serde_json::to_vec(&DelegationMessage::Delegate(JobBid{identity, job_key: job_key.to_owned(), price}))? }).await?; - events_tx.send((hash!(job), DelegatorEvent::Delegated(identity)))?; + events_tx.send((job_key, DelegatorEvent::Delegated(identity)))?; } _ = shutdown_signal() => { break @@ -125,8 +140,13 @@ pub enum Error { #[error("mpsc_send_error GossipsubMessage")] MpscSendErrorGossipsubMessage(#[from] mpsc::error::SendError), + #[error("mpsc_send_error KademliaMessage")] + MpscSendErrorKademliaMessage(#[from] mpsc::error::SendError), + #[error("mpsc_send_error DelegatorEvent")] - BreadcastSendErrorDelegatorEvent(#[from] broadcast::error::SendError<(u64, DelegatorEvent)>), + BreadcastSendErrorDelegatorEvent( + #[from] broadcast::error::SendError<(kad::RecordKey, DelegatorEvent)>, + ), #[error("mpsc_send_error JobBid")] MpscSendErrorJobBid(#[from] mpsc::error::SendError<(u64, PeerId)>), diff --git a/crates/delegator/src/main.rs b/crates/delegator/src/main.rs index e2b50cc..d27d713 100644 --- a/crates/delegator/src/main.rs +++ b/crates/delegator/src/main.rs @@ -10,7 +10,7 @@ use axum::{ }; use clap::Parser; use delegator::{Delegator, DelegatorEvent}; -use libp2p::Multiaddr; +use libp2p::{kad, Multiaddr}; use starknet::{core::types::FieldElement, signers::SigningKey}; use std::{str::FromStr, time::Duration}; use tokio::{ @@ -24,7 +24,7 @@ use tower_http::{ }; use tracing_subscriber::EnvFilter; use zetina_common::{graceful_shutdown::shutdown_signal, job::JobData}; -use zetina_peer::swarm::{GossipsubMessage, SwarmRunner}; +use zetina_peer::swarm::{GossipsubMessage, KademliaMessage, SwarmRunner}; #[derive(Parser)] struct Cli { @@ -71,11 +71,12 @@ async fn main() -> Result<(), Box> { .unwrap(); let (gossipsub_tx, gossipsub_rx) = mpsc::channel::(100); - let (delegate_tx, delegate_rx) = mpsc::channel::(100); - let (events_tx, events_rx) = broadcast::channel::<(u64, DelegatorEvent)>(100); - let swarm_events = swarm_runner.run(gossipsub_rx); + let (kademlia_tx, kademlia_rx) = mpsc::channel::(100); + let swarm_events = swarm_runner.run(gossipsub_rx, kademlia_rx); - Delegator::new(swarm_events, gossipsub_tx, delegate_rx, events_tx, signing_key); + let (delegate_tx, delegate_rx) = mpsc::channel::(100); + let (events_tx, events_rx) = broadcast::channel::<(kad::RecordKey, DelegatorEvent)>(100); + Delegator::new(swarm_events, gossipsub_tx, kademlia_tx, delegate_rx, events_tx, signing_key); // Create a `TcpListener` using tokio. let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap(); diff --git a/crates/executor/src/executor.rs b/crates/executor/src/executor.rs index 753d226..16c5fd8 100644 --- a/crates/executor/src/executor.rs +++ b/crates/executor/src/executor.rs @@ -1,18 +1,19 @@ use futures::{stream::FuturesUnordered, Stream}; -use libp2p::{gossipsub, PeerId}; -use std::hash::{DefaultHasher, Hash, Hasher}; +use libp2p::{gossipsub, kad, PeerId}; +use std::collections::HashSet; use std::pin::Pin; use thiserror::Error; use tokio::sync::mpsc; use tokio::{sync::mpsc::Sender, task::JoinHandle}; use tokio_stream::StreamExt; use tracing::{error, info}; +use zetina_common::job::Job; use zetina_common::{ - graceful_shutdown::shutdown_signal, hash, job::JobBid, job_trace::JobTrace, - job_witness::JobWitness, process::Process, + graceful_shutdown::shutdown_signal, job::JobBid, job_trace::JobTrace, job_witness::JobWitness, + process::Process, }; use zetina_peer::swarm::{ - DelegationMessage, GossipsubMessage, MarketMessage, PeerBehaviourEvent, Topic, + DelegationMessage, GossipsubMessage, KademliaMessage, MarketMessage, PeerBehaviourEvent, Topic, }; use zetina_prover::{ errors::ProverControllerError, stone_prover::StoneProver, traits::ProverController, @@ -30,6 +31,7 @@ impl Executor { identity: PeerId, mut swarm_events: Pin + Send>>, gossipsub_tx: Sender, + kademlia_tx: Sender, runner: CairoRunner, prover: StoneProver, ) -> Self { @@ -41,6 +43,8 @@ impl Executor { Process<'_, Result>, >::new(); + let mut job_hash_store = HashSet::::new(); + loop { tokio::select! { Some(event) = swarm_events.next() => { @@ -48,13 +52,13 @@ impl Executor { PeerBehaviourEvent::Gossipsub(gossipsub::Event::Message { message, .. }) => { if message.topic == Topic::Market.into() { match serde_json::from_slice::(&message.data)? { - MarketMessage::Job(job) => { + MarketMessage::JobBidPropagation(job_key) => { gossipsub_tx .send(GossipsubMessage { topic: Topic::Market.into(), data: serde_json::to_vec(&MarketMessage::JobBid(JobBid { identity, - job_hash: hash!(job), + job_key, price: (runner_scheduler.len() * prover_scheduler.len()) as u64, }))? }) @@ -67,23 +71,42 @@ impl Executor { match serde_json::from_slice::(&message.data)? { DelegationMessage::Delegate(job_delegation) => { if job_delegation.identity == identity { - info!("Scheduled running of job: {}", hash!(job_delegation.job)); - runner_scheduler.push(runner.run(job_delegation.job)?); + info!("received delegation of job: {}", hex::encode(&job_delegation.job_key)); + job_hash_store.insert(job_delegation.job_key.to_owned()); + kademlia_tx.send(KademliaMessage::GET(job_delegation.job_key)).await?; } } _ => {} } } } + PeerBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { result, ..}) => { + match result { + kad::QueryResult::GetRecord(Ok( + kad::GetRecordOk::FoundRecord(kad::PeerRecord { + record: kad::Record { key, value, .. }, + .. + }) + )) => { + if job_hash_store.contains(&key) { + let job: Job = serde_json::from_slice(&value)?; + info!("received delegation of job: {}", hex::encode(&key)); + runner_scheduler.push(runner.run(job)?); + job_hash_store.remove(&key); + } + } + _ => {} + } + } _ => {} } } Some(Ok(job_trace)) = runner_scheduler.next() => { - info!("Scheduled proving of job_trace: {}", &job_trace.job_hash); + info!("Scheduled proving of job_trace: {}", hex::encode(&job_trace.job_key)); prover_scheduler.push(prover.run(job_trace)?); }, Some(Ok(job_witness)) = prover_scheduler.next() => { - info!("Finished proving: {}", &job_witness.job_hash); + info!("Finished proving: {}", hex::encode(&job_witness.job_key)); gossipsub_tx.send(GossipsubMessage { topic: Topic::Delegation.into(), data: serde_json::to_vec(&DelegationMessage::Finished(job_witness))? @@ -120,8 +143,11 @@ pub enum Error { #[error("runner_controller_error")] RunnerControllerError(#[from] RunnerControllerError), - #[error("mpsc_send_error")] - MpscSendError(#[from] mpsc::error::SendError), + #[error("mpsc_send_error GossipsubMessage")] + MpscSendErrorGossipsubMessage(#[from] mpsc::error::SendError), + + #[error("mpsc_send_error KademliaMessage")] + MpscSendErrorKademliaMessage(#[from] mpsc::error::SendError), #[error("io")] Io(#[from] std::io::Error), diff --git a/crates/executor/src/main.rs b/crates/executor/src/main.rs index 58f257b..cf11c53 100644 --- a/crates/executor/src/main.rs +++ b/crates/executor/src/main.rs @@ -11,7 +11,7 @@ use tokio::{net::TcpListener, sync::mpsc}; use tower_http::{timeout::TimeoutLayer, trace::TraceLayer}; use tracing_subscriber::EnvFilter; use zetina_common::graceful_shutdown::shutdown_signal; -use zetina_peer::swarm::{GossipsubMessage, SwarmRunner}; +use zetina_peer::swarm::{GossipsubMessage, KademliaMessage, SwarmRunner}; use zetina_prover::stone_prover::StoneProver; use zetina_runner::cairo_runner::CairoRunner; @@ -68,12 +68,12 @@ async fn main() -> Result<(), Box> { .unwrap(); let (gossipsub_tx, gossipsub_rx) = mpsc::channel::(100); - let swarm_events = swarm_runner.run(gossipsub_rx); + let (kademlia_tx, kademlia_rx) = mpsc::channel::(100); + let swarm_events = swarm_runner.run(gossipsub_rx, kademlia_rx); let runner = CairoRunner::new(bootloader_program_path, signing_key.verifying_key()); let prover = StoneProver::new(); - - Executor::new(identity, swarm_events, gossipsub_tx, runner, prover); + Executor::new(identity, swarm_events, gossipsub_tx, kademlia_tx, runner, prover); // Create a `TcpListener` using tokio. let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap(); diff --git a/crates/peer/Cargo.toml b/crates/peer/Cargo.toml index bbabccf..d7e5cdc 100644 --- a/crates/peer/Cargo.toml +++ b/crates/peer/Cargo.toml @@ -11,6 +11,7 @@ version.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +hex.workspace = true async-stream.workspace = true futures.workspace = true thiserror.workspace = true diff --git a/crates/peer/src/swarm.rs b/crates/peer/src/swarm.rs index 04e4e27..dc649d0 100644 --- a/crates/peer/src/swarm.rs +++ b/crates/peer/src/swarm.rs @@ -3,20 +3,23 @@ use futures::stream::Stream; use libp2p::futures::StreamExt; use libp2p::gossipsub::{self, IdentTopic, TopicHash}; use libp2p::identity::Keypair; +use libp2p::kad::store::MemoryStore; +use libp2p::kad::Mode; use libp2p::swarm::{DialError, NetworkBehaviour, SwarmEvent}; -use libp2p::{noise, tcp, yamux, Multiaddr, Swarm, SwarmBuilder}; +use libp2p::{kad, noise, tcp, yamux, Multiaddr, Swarm, SwarmBuilder}; use serde::{Deserialize, Serialize}; use std::pin::Pin; use std::time::Duration; use tokio::sync::mpsc; use tracing::{debug, error, info}; use zetina_common::graceful_shutdown::shutdown_signal; -use zetina_common::job::{Job, JobBid, JobDelegation}; +use zetina_common::job::{Job, JobBid}; use zetina_common::job_witness::JobWitness; #[derive(NetworkBehaviour)] pub struct PeerBehaviour { gossipsub: gossipsub::Behaviour, + kademlia: kad::Behaviour, } pub struct SwarmRunner { @@ -53,11 +56,18 @@ impl From for IdentTopic { } } +#[derive(Debug)] pub struct GossipsubMessage { pub topic: IdentTopic, pub data: Vec, } +#[derive(Debug)] +pub enum KademliaMessage { + GET(kad::RecordKey), + PUT((kad::RecordKey, Vec)), +} + #[derive(Debug, Serialize, Deserialize)] pub enum NetworkingMessage { Multiaddr(Multiaddr), @@ -66,12 +76,13 @@ pub enum NetworkingMessage { #[derive(Debug, Serialize, Deserialize)] pub enum MarketMessage { Job(Job), + JobBidPropagation(kad::RecordKey), JobBid(JobBid), } #[derive(Debug, Serialize, Deserialize)] pub enum DelegationMessage { - Delegate(JobDelegation), + Delegate(JobBid), Finished(JobWitness), } @@ -90,6 +101,10 @@ impl SwarmRunner { )? .with_quic() .with_behaviour(|p2p_keypair| PeerBehaviour { + kademlia: kad::Behaviour::new( + p2p_keypair.public().to_peer_id(), + MemoryStore::new(p2p_keypair.public().to_peer_id()), + ), gossipsub: Self::init_gossip(p2p_keypair).unwrap(), })? .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60))) @@ -98,6 +113,7 @@ impl SwarmRunner { swarm.behaviour_mut().gossipsub.subscribe(&IdentTopic::new(Topic::Networking.as_str()))?; swarm.behaviour_mut().gossipsub.subscribe(&IdentTopic::new(Topic::Market.as_str()))?; swarm.behaviour_mut().gossipsub.subscribe(&IdentTopic::new(Topic::Delegation.as_str()))?; + swarm.behaviour_mut().kademlia.set_mode(Some(Mode::Server)); // swarm.listen_on("/ip4/0.0.0.0/udp/5678/quic-v1".parse()?)?; swarm.listen_on(listen_multiaddr)?; @@ -122,18 +138,38 @@ impl SwarmRunner { pub fn run( mut self, mut gossipsub_message: mpsc::Receiver, + mut kademlia_message: mpsc::Receiver, ) -> Pin + Send>> { let stream = stream! { loop { tokio::select! { Some(message) = gossipsub_message.recv() => { - debug!{"Sending gossipsub_message: topic {}, data {:?}", message.topic, message.data}; + debug!{"Sending gossipsub_message: topic {}, data {}", message.topic, hex::encode(&message.data)}; if let Err(e) = self.swarm .behaviour_mut() .gossipsub .publish(message.topic, message.data) { - error!("Publish error: {e:?}"); + error!("Gossipsub error: {e:?}"); + } + }, + Some(message) = kademlia_message.recv() => { + debug!{"Sending kademlia_message: {:?}", message}; + match message { + KademliaMessage::GET(key) => { + self.swarm.behaviour_mut().kademlia.get_record(kad::RecordKey::new(&key)); + }, + KademliaMessage::PUT((key, data)) => { + let record = kad::Record { + key: kad::RecordKey::new(&key), + value: data, + publisher: Some(*self.swarm.local_peer_id()), + expires: None, + }; + if let Err(e) = self.swarm.behaviour_mut().kademlia.put_record(record, kad::Quorum::One) { + error!("Kademlia error: {e:?}"); + } + }, } }, event = self.swarm.select_next_some() => match event { @@ -175,13 +211,59 @@ impl SwarmRunner { message, }); } - SwarmEvent::ConnectionEstablished { peer_id, connection_id, num_established, .. } => { + SwarmEvent::ConnectionEstablished { peer_id, connection_id, num_established, endpoint, .. } => { info!{"Connection established: peer_id {}, connection_id {}, num_established {}", peer_id, connection_id, num_established}; self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id); + self.swarm.behaviour_mut().kademlia.add_address(&peer_id, endpoint.get_remote_address().to_owned()); } - SwarmEvent::ConnectionClosed { peer_id, connection_id, num_established, .. } => { + SwarmEvent::ConnectionClosed { peer_id, connection_id, num_established, endpoint, .. } => { info!{"Connection closed: peer_id {}, connection_id {}, num_established {}", peer_id, connection_id, num_established}; - self.swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id); + if num_established == 0 { + self.swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id); + self.swarm.behaviour_mut().kademlia.remove_address(&peer_id, endpoint.get_remote_address()); + } + } + SwarmEvent::Behaviour(PeerBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { id, result, stats, step })) => { + match result { + kad::QueryResult::GetProviders(Ok(kad::GetProvidersOk::FoundProviders { key, providers, .. })) => { + for peer in providers { + info!("Peer {peer:?} provides key {}", hex::encode(&key)); + } + } + kad::QueryResult::GetProviders(Err(err)) => { + error!("Failed to get providers: {err:?}"); + } + kad::QueryResult::GetRecord(Ok(kad::GetRecordOk::FoundRecord(kad::PeerRecord {peer, record}))) => { + info!("Successfully got record {}", hex::encode(&record.key)); + + yield PeerBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { id, + result: kad::QueryResult::GetRecord(Ok(kad::GetRecordOk::FoundRecord(kad::PeerRecord {peer, record}))), + stats, step }) + } + kad::QueryResult::GetRecord(Ok(_)) => {} + kad::QueryResult::GetRecord(Err(err)) => { + error!("Failed to get record: {err:?}"); + } + kad::QueryResult::PutRecord(Ok(kad::PutRecordOk { key })) => { + info!("Successfully put record {}", hex::encode(&key)); + + yield PeerBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { id, + result: kad::QueryResult::PutRecord(Ok(kad::PutRecordOk { key })), + stats, step }) + } + kad::QueryResult::PutRecord(Err(err)) => { + error!("Failed to put record: {err:?}"); + } + kad::QueryResult::StartProviding(Ok(kad::AddProviderOk { key })) => { + info!("Successfully put provider record {}", hex::encode(&key)); + } + kad::QueryResult::StartProviding(Err(err)) => { + error!("Failed to put provider record: {err:?}"); + } + event => { + debug!("Unhandled event: {:?}", event); + } + } } SwarmEvent::Behaviour(event) => { yield event; diff --git a/crates/prover/Cargo.toml b/crates/prover/Cargo.toml index e9cf6dc..2f89ba0 100644 --- a/crates/prover/Cargo.toml +++ b/crates/prover/Cargo.toml @@ -11,6 +11,8 @@ version.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +hex.workspace = true +libp2p.workspace = true async-process.workspace = true cairo-proof-parser.workspace = true futures.workspace= true diff --git a/crates/prover/src/stone_prover/mod.rs b/crates/prover/src/stone_prover/mod.rs index 76bd689..4986858 100644 --- a/crates/prover/src/stone_prover/mod.rs +++ b/crates/prover/src/stone_prover/mod.rs @@ -8,14 +8,13 @@ use futures::Future; use serde_json::Value; use std::{ fs, - hash::{DefaultHasher, Hash, Hasher}, io::{Read, Write}, pin::Pin, }; use tempfile::NamedTempFile; use tokio::{process::Command, select, sync::mpsc}; use tracing::debug; -use zetina_common::{hash, job_trace::JobTrace, job_witness::JobWitness, process::Process}; +use zetina_common::{job_trace::JobTrace, job_witness::JobWitness, process::Process}; pub mod tests; pub mod types; @@ -67,9 +66,7 @@ impl ProverController for StoneProver { .stdout(Stdio::null()) .spawn()?; - let job_trace_hash = hash!(job_trace); - - debug!("task {} spawned", job_trace_hash); + debug!("task {} spawned", hex::encode(&job_trace.job_key)); loop { select! { @@ -91,7 +88,7 @@ impl ProverController for StoneProver { let mut proof = Vec::new(); out_file.read_to_end(&mut proof)?; - Ok(JobWitness { job_hash: job_trace.job_hash, proof }) + Ok(JobWitness { job_key: job_trace.job_key.to_owned(), proof }) }); Ok(Process::new(future, terminate_tx)) diff --git a/crates/prover/src/stone_prover/tests/models.rs b/crates/prover/src/stone_prover/tests/models.rs index b5d8650..b718f2b 100644 --- a/crates/prover/src/stone_prover/tests/models.rs +++ b/crates/prover/src/stone_prover/tests/models.rs @@ -1,3 +1,4 @@ +use libp2p::kad; use std::{env, fs, io::Write, path::PathBuf}; use tempfile::NamedTempFile; use zetina_common::job_trace::JobTrace; @@ -28,12 +29,12 @@ pub fn fixture() -> TestFixture { trace.write_all(&fs::read(trace_path).unwrap()).unwrap(); TestFixture { - job_trace: JobTrace { - job_hash: u64::default(), + job_trace: JobTrace::new( + kad::RecordKey::new(&[0]), air_public_input, air_private_input, memory, trace, - }, + ), } } diff --git a/crates/runner/Cargo.toml b/crates/runner/Cargo.toml index a3c0234..0866f59 100644 --- a/crates/runner/Cargo.toml +++ b/crates/runner/Cargo.toml @@ -11,6 +11,7 @@ version.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +hex.workspace = true async-process.workspace = true futures.workspace = true libp2p.workspace = true diff --git a/crates/runner/src/cairo_runner/mod.rs b/crates/runner/src/cairo_runner/mod.rs index 269f3d5..2453b9c 100644 --- a/crates/runner/src/cairo_runner/mod.rs +++ b/crates/runner/src/cairo_runner/mod.rs @@ -2,6 +2,7 @@ use self::types::input::SimpleBootloaderInput; use crate::{errors::RunnerControllerError, traits::RunnerController}; use async_process::Stdio; use futures::Future; +use libp2p::kad; use starknet::signers::VerifyingKey; use std::{ hash::{DefaultHasher, Hash, Hasher}, @@ -36,7 +37,7 @@ impl RunnerController for CairoRunner { let future: Pin< Box> + Send + '_>, > = Box::pin(async move { - let job_hash = hash!(job); + let job_key = kad::RecordKey::new(&hash!(job).to_be_bytes()); let layout: &str = Layout::Starknet.into(); let mut cairo_pie = NamedTempFile::new()?; @@ -77,7 +78,7 @@ impl RunnerController for CairoRunner { .stdout(Stdio::null()) .spawn()?; - debug!("task {} spawned", job_hash); + debug!("task {} spawned", hex::encode(&job_key)); loop { select! { @@ -95,7 +96,7 @@ impl RunnerController for CairoRunner { } } } - Ok(JobTrace { job_hash, air_public_input, air_private_input, memory, trace }) + Ok(JobTrace::new(job_key, air_public_input, air_private_input, memory, trace)) }); Ok(Process::new(future, terminate_tx)) From e32574aafa74d78dda0c625c2e19f222918bdc77 Mon Sep 17 00:00:00 2001 From: Bartosz Nowak Date: Mon, 12 Aug 2024 00:12:44 +0200 Subject: [PATCH 2/4] proof verification and dht limits increase --- crates/delegator/src/api.rs | 25 +++-- crates/peer/src/swarm.rs | 21 ++-- crates/prover/src/stone_prover/mod.rs | 6 +- dashboard/package-lock.json | 72 +++++++++++++ dashboard/package.json | 14 ++- dashboard/src/app/api.ts | 21 ++-- dashboard/src/app/globals.css | 10 ++ dashboard/src/app/page.tsx | 115 +++++++++++++++++--- dashboard/src/utils/loadModule.ts | 150 ++++++++++++++++++++++++++ dashboard/src/utils/types.ts | 13 +++ dashboard/src/worker.ts | 20 ++++ docker-compose.yaml | 8 +- 12 files changed, 430 insertions(+), 45 deletions(-) create mode 100644 dashboard/src/utils/loadModule.ts create mode 100644 dashboard/src/utils/types.ts create mode 100644 dashboard/src/worker.ts diff --git a/crates/delegator/src/api.rs b/crates/delegator/src/api.rs index 0418192..17b008a 100644 --- a/crates/delegator/src/api.rs +++ b/crates/delegator/src/api.rs @@ -6,7 +6,7 @@ use axum::{ }; use futures::StreamExt; use hyper::StatusCode; -use libp2p::{kad, PeerId}; +use libp2p::kad; use serde::{Deserialize, Serialize}; use std::hash::{DefaultHasher, Hash, Hasher}; use std::{io, time::Duration}; @@ -39,7 +39,7 @@ pub struct DelegateRequest { #[derive(Debug, Serialize)] pub struct DelegateResponse { - job_hash: kad::RecordKey, + job_key: String, } pub async fn deletage_handler( @@ -49,19 +49,19 @@ pub async fn deletage_handler( let job_data = JobData::new(input.pie); let job_data_hash = kad::RecordKey::new(&hash!(job_data).to_be_bytes()); state.delegate_tx.send(job_data).await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - Ok(Json(DelegateResponse { job_hash: job_data_hash })) + Ok(Json(DelegateResponse { job_key: hex::encode(job_data_hash) })) } #[derive(Debug, Deserialize)] pub struct JobEventsRequest { - job_hash: kad::RecordKey, + job_key: String, } #[derive(Debug, Serialize)] #[serde(tag = "type", content = "data")] pub enum JobEventsResponse { - BidReceived(PeerId), - Delegated(PeerId), + BidReceived(String), + Delegated(String), Finished(Vec), } @@ -70,16 +70,19 @@ pub async fn job_events_handler( Query(input): Query, ) -> Sse>> { let stream = stream! { - let job_hash = input.job_hash; + let job_key = kad::RecordKey::new( + &hex::decode(input.job_key) + .map_err(|e| io::Error::new(io::ErrorKind::BrokenPipe, e.to_string()))? + ); loop { tokio::select! { - Ok((hash, event)) = state.events_rx.recv() => { - if hash == job_hash { + Ok((key, event)) = state.events_rx.recv() => { + if key == job_key { yield Event::default() .json_data( match event { - DelegatorEvent::BidReceived(peer_id) => { JobEventsResponse::BidReceived(peer_id) }, - DelegatorEvent::Delegated(peer_id) => { JobEventsResponse::Delegated(peer_id) }, + DelegatorEvent::BidReceived(peer_id) => { JobEventsResponse::BidReceived(peer_id.to_base58()) }, + DelegatorEvent::Delegated(peer_id) => { JobEventsResponse::Delegated(peer_id.to_base58()) }, DelegatorEvent::Finished(data) => { JobEventsResponse::Finished(data) }, } ) diff --git a/crates/peer/src/swarm.rs b/crates/peer/src/swarm.rs index dc649d0..ff90649 100644 --- a/crates/peer/src/swarm.rs +++ b/crates/peer/src/swarm.rs @@ -3,8 +3,8 @@ use futures::stream::Stream; use libp2p::futures::StreamExt; use libp2p::gossipsub::{self, IdentTopic, TopicHash}; use libp2p::identity::Keypair; -use libp2p::kad::store::MemoryStore; -use libp2p::kad::Mode; +use libp2p::kad::store::{MemoryStore, MemoryStoreConfig}; +use libp2p::kad::{Config, Mode}; use libp2p::swarm::{DialError, NetworkBehaviour, SwarmEvent}; use libp2p::{kad, noise, tcp, yamux, Multiaddr, Swarm, SwarmBuilder}; use serde::{Deserialize, Serialize}; @@ -92,6 +92,8 @@ impl SwarmRunner { p2p_keypair: Keypair, p2p_multiaddr: Multiaddr, ) -> Result> { + let mut config = Config::default(); + config.set_max_packet_size(1024*1024*100); let mut swarm = SwarmBuilder::with_existing_identity(p2p_keypair) .with_tokio() .with_tcp( @@ -101,13 +103,20 @@ impl SwarmRunner { )? .with_quic() .with_behaviour(|p2p_keypair| PeerBehaviour { - kademlia: kad::Behaviour::new( + kademlia: kad::Behaviour::with_config( p2p_keypair.public().to_peer_id(), - MemoryStore::new(p2p_keypair.public().to_peer_id()), + MemoryStore::with_config( + p2p_keypair.public().to_peer_id(), + MemoryStoreConfig { + max_value_bytes: 1024*1024*100, + ..Default::default() + }, + ), + config ), gossipsub: Self::init_gossip(p2p_keypair).unwrap(), })? - .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60))) + .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(10))) .build(); swarm.behaviour_mut().gossipsub.subscribe(&IdentTopic::new(Topic::Networking.as_str()))?; @@ -163,7 +172,7 @@ impl SwarmRunner { let record = kad::Record { key: kad::RecordKey::new(&key), value: data, - publisher: Some(*self.swarm.local_peer_id()), + publisher: None, expires: None, }; if let Err(e) = self.swarm.behaviour_mut().kademlia.put_record(record, kad::Quorum::One) { diff --git a/crates/prover/src/stone_prover/mod.rs b/crates/prover/src/stone_prover/mod.rs index 4986858..358100d 100644 --- a/crates/prover/src/stone_prover/mod.rs +++ b/crates/prover/src/stone_prover/mod.rs @@ -124,10 +124,10 @@ pub fn params(n_steps: u64) -> Params { ) .collect(), last_layer_degree_bound, - n_queries: 1, - proof_of_work_bits: 1, + n_queries: 20, + proof_of_work_bits: 30, }, - log_n_cosets: 1, + log_n_cosets: 2, }, ..Default::default() } diff --git a/dashboard/package-lock.json b/dashboard/package-lock.json index 7c676c9..c52bcd8 100644 --- a/dashboard/package-lock.json +++ b/dashboard/package-lock.json @@ -16,6 +16,18 @@ "react": "^18", "react-dom": "^18", "react-dropzone": "^14.2.3", + "swiftness-dex-blake2s": "^0.0.7", + "swiftness-dex-keccak": "^0.0.7", + "swiftness-recursive-blake2s": "^0.0.7", + "swiftness-recursive-keccak": "^0.0.7", + "swiftness-recursive-with-poseidon-blake2s": "^0.0.7", + "swiftness-recursive-with-poseidon-keccak": "^0.0.7", + "swiftness-small-blake2s": "^0.0.7", + "swiftness-small-keccak": "^0.0.7", + "swiftness-starknet-blake2s": "^0.0.7", + "swiftness-starknet-keccak": "^0.0.7", + "swiftness-starknet-with-keccak-blake2s": "^0.0.7", + "swiftness-starknet-with-keccak-keccak": "^0.0.7", "zod": "^3.23.8" }, "devDependencies": { @@ -5282,6 +5294,66 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/swiftness-dex-blake2s": { + "version": "0.0.7", + "resolved": "https://registry.npmjs.org/swiftness-dex-blake2s/-/swiftness-dex-blake2s-0.0.7.tgz", + "integrity": "sha512-jGYc9emS65RLK/6+iPtjKT1j2lEKKkYAMM/hGT87Idd46AuYlePEL8Qjn8mUljkoTz5G3eu5X+Z+Rs2fXGoiYQ==" + }, + "node_modules/swiftness-dex-keccak": { + "version": "0.0.7", + "resolved": "https://registry.npmjs.org/swiftness-dex-keccak/-/swiftness-dex-keccak-0.0.7.tgz", + "integrity": "sha512-WhR8jo08RptvnJabTiqpwcAslXRQmjHpImHpOx+V8E7u82AXUMNuF4Q5cHx9+KY4U8G7eLVXJpNGUwr1SDbsyg==" + }, + "node_modules/swiftness-recursive-blake2s": { + "version": "0.0.7", + "resolved": "https://registry.npmjs.org/swiftness-recursive-blake2s/-/swiftness-recursive-blake2s-0.0.7.tgz", + "integrity": "sha512-aqVJhrkcjJjD96tBTvLctVSJGzVbBks1ahmIwTtfhtD30vJ58IlW+gmHF0lXaea475holswbYhBBVF8vXIMcfA==" + }, + "node_modules/swiftness-recursive-keccak": { + "version": "0.0.7", + "resolved": "https://registry.npmjs.org/swiftness-recursive-keccak/-/swiftness-recursive-keccak-0.0.7.tgz", + "integrity": "sha512-UO2A3iVkdvWhl5JgGt+B1c+tuJR1Xnx42pQjrWiFcTPkmxHCJo2MMfopz4H9mYe/Vj04S2dCUl9S8/bdkmJcdA==" + }, + "node_modules/swiftness-recursive-with-poseidon-blake2s": { + "version": "0.0.7", + "resolved": "https://registry.npmjs.org/swiftness-recursive-with-poseidon-blake2s/-/swiftness-recursive-with-poseidon-blake2s-0.0.7.tgz", + "integrity": "sha512-mWhi2c4PPU0ANTPrDl+CIWNJIdo3GoGKXsfEbngcRh/2rHm+Ufc+6JnldNVP7FRc7Dr58Ym7pwmhbbZzXRRf5w==" + }, + "node_modules/swiftness-recursive-with-poseidon-keccak": { + "version": "0.0.7", + "resolved": "https://registry.npmjs.org/swiftness-recursive-with-poseidon-keccak/-/swiftness-recursive-with-poseidon-keccak-0.0.7.tgz", + "integrity": "sha512-QEXte36px+sbcUZtPHilGk/qyFUKt1NsBU81T892N/m9TDccxvG2fGM9cObNMuRscKy8fsdMppA8AQcC88NBgA==" + }, + "node_modules/swiftness-small-blake2s": { + "version": "0.0.7", + "resolved": "https://registry.npmjs.org/swiftness-small-blake2s/-/swiftness-small-blake2s-0.0.7.tgz", + "integrity": "sha512-bVKFaBlG1wMXc5dTFthivgMwg8H5LzStifjme3cA51198NfPCaEZv8vDTa+pvMa1iSgkM0jInK3VVW652FuABw==" + }, + "node_modules/swiftness-small-keccak": { + "version": "0.0.7", + "resolved": "https://registry.npmjs.org/swiftness-small-keccak/-/swiftness-small-keccak-0.0.7.tgz", + "integrity": "sha512-gceslp9dpsSJxEqUn7E1uosyxB3ITKoWmCrcGPmIysB8Ff7/L1D8UxG4F5gsAdMsz5pdhAs0qKjMHl85953rhg==" + }, + "node_modules/swiftness-starknet-blake2s": { + "version": "0.0.7", + "resolved": "https://registry.npmjs.org/swiftness-starknet-blake2s/-/swiftness-starknet-blake2s-0.0.7.tgz", + "integrity": "sha512-4FCoVxuNLgdwFMOcfvYsCjnHy8XpODlIpScgaAtBtDkNK3X4i+PwtJSxKDEIf++jIfJJXQL0Wz/IL25pyPGg7Q==" + }, + "node_modules/swiftness-starknet-keccak": { + "version": "0.0.7", + "resolved": "https://registry.npmjs.org/swiftness-starknet-keccak/-/swiftness-starknet-keccak-0.0.7.tgz", + "integrity": "sha512-dU4UB10CGq/Me4ZGdSK+IegZPLsEEruHO+2lxrqScaVhi+Kpkkp4IYBRbMfwEeSA8ppISZR7MGzxujrLCqzCLQ==" + }, + "node_modules/swiftness-starknet-with-keccak-blake2s": { + "version": "0.0.7", + "resolved": "https://registry.npmjs.org/swiftness-starknet-with-keccak-blake2s/-/swiftness-starknet-with-keccak-blake2s-0.0.7.tgz", + "integrity": "sha512-Eeok1U3qbRDpJKL78WVwRo43x/C8dngnHzQl4CV5ILWdU3WIWPFjDilR7PeoVUTmGUOyUORYTyH/txA768UnfQ==" + }, + "node_modules/swiftness-starknet-with-keccak-keccak": { + "version": "0.0.7", + "resolved": "https://registry.npmjs.org/swiftness-starknet-with-keccak-keccak/-/swiftness-starknet-with-keccak-keccak-0.0.7.tgz", + "integrity": "sha512-2axDJTodoxTRXSAqiBie84woX5/WwC14Rf9g7lK9A7sQkmMyDZSkEuqUIIsP6l6PmqYUjkXIJGOMrNkP7YW4uA==" + }, "node_modules/tailwindcss": { "version": "3.4.4", "resolved": "https://registry.npmjs.org/tailwindcss/-/tailwindcss-3.4.4.tgz", diff --git a/dashboard/package.json b/dashboard/package.json index 46b0c5a..510dbbd 100644 --- a/dashboard/package.json +++ b/dashboard/package.json @@ -18,7 +18,19 @@ "react": "^18", "react-dom": "^18", "react-dropzone": "^14.2.3", - "zod": "^3.23.8" + "zod": "^3.23.8", + "swiftness-dex-blake2s": "^0.0.7", + "swiftness-dex-keccak": "^0.0.7", + "swiftness-recursive-blake2s": "^0.0.7", + "swiftness-recursive-keccak": "^0.0.7", + "swiftness-recursive-with-poseidon-blake2s": "^0.0.7", + "swiftness-recursive-with-poseidon-keccak": "^0.0.7", + "swiftness-small-blake2s": "^0.0.7", + "swiftness-small-keccak": "^0.0.7", + "swiftness-starknet-blake2s": "^0.0.7", + "swiftness-starknet-keccak": "^0.0.7", + "swiftness-starknet-with-keccak-blake2s": "^0.0.7", + "swiftness-starknet-with-keccak-keccak": "^0.0.7" }, "devDependencies": { "@types/node": "^20", diff --git a/dashboard/src/app/api.ts b/dashboard/src/app/api.ts index b7087d3..25733cb 100644 --- a/dashboard/src/app/api.ts +++ b/dashboard/src/app/api.ts @@ -1,31 +1,36 @@ import { z } from "zod"; +const hexStringSchema = z.string().regex(/^[0-9a-fA-F]+$/, 'Invalid hex string'); +const base58Pattern = /^[123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz]+$/; +const base58Schema = z.string().regex(base58Pattern, 'Invalid Base58 string'); +const bytesSchema = z.array(z.number()); + // Zod for DelegateRequest export const DelegateRequest = z.object({ - pie: z.array(z.number()), + pie: bytesSchema, }); export type DelegateRequest = z.infer; // Zod for DelegateResponse export const DelegateResponse = z.object({ - job_hash: z.coerce.bigint(), + job_key: hexStringSchema }); export type DelegateResponse = z.infer; // Zod for JobEventsRequest export const JobEventsRequest = z.object({ - job_hash: z.coerce.bigint(), + job_key: hexStringSchema }); export type JobEventsRequest = z.infer; export const JobEventsResponse = z.object({ - type: z.literal("Finished"), + type: z.literal("Finished").or(z.literal("Delegated")).or(z.literal("BidReceived")), data: z.any(), }); export type JobEventsResponse = z.infer; -export const JobHash = z.coerce.bigint(); -export type JobHash = z.infer; - -export const Proof = z.array(z.number()); +export const Proof = bytesSchema; export type Proof = z.infer; + +export const PeerId = base58Schema; +export type PeerId = z.infer; \ No newline at end of file diff --git a/dashboard/src/app/globals.css b/dashboard/src/app/globals.css index 0af548e..52df725 100644 --- a/dashboard/src/app/globals.css +++ b/dashboard/src/app/globals.css @@ -51,3 +51,13 @@ body { filter: blur(5px); z-index: -1; /* Ensure the image is behind any content in .background */ } + +/* Hide scrollbar for WebKit browsers (Chrome, Safari) */ +.scroll-container::-webkit-scrollbar { + display: none; /* Hide scrollbar */ +} + +/* Hide scrollbar for Firefox */ +.scroll-container { + scrollbar-width: none; /* Hide scrollbar */ +} diff --git a/dashboard/src/app/page.tsx b/dashboard/src/app/page.tsx index 80973be..1216a41 100644 --- a/dashboard/src/app/page.tsx +++ b/dashboard/src/app/page.tsx @@ -16,14 +16,18 @@ import { DelegateRequest, DelegateResponse, JobEventsResponse, + PeerId, Proof, } from "./api"; -import { useState } from "react"; +import { useEffect, useRef, useState } from "react"; import subscribeEvents from "./subscribeEvents"; -import assert from "assert"; +import { WorkerMessage, WorkerResponse } from "@/utils/types"; +import { matchCommitment, matchLayout } from "@/utils/loadModule"; const steps = [ "Job propagated to network", + "Job bidding", + "Job delegated", "Proof received", ]; @@ -80,7 +84,9 @@ function QontoStepIcon(props: StepIconProps) { ); } + export default function Home() { + const workerRef = useRef(); const darkTheme = createTheme({ palette: { mode: "dark", @@ -88,6 +94,58 @@ export default function Home() { }, }); + // Function to add a new log entry + const addLog = (message: string) => { + const now = new Date(); + const timestamp = now.toLocaleString(); // Get current date and time as a string + const logEntry = ( +
+ LOG: {timestamp} - {message} +
+ ); + setLogs((prevLogs) => [...prevLogs, logEntry]); + }; + + const verifyProof = async (proof: string) => { + const parsedProof = JSON.parse(proof); + + const layout = matchLayout(parsedProof.public_input.layout); + const commitment = matchCommitment(parsedProof.proof_parameters.pow_hash); + + workerRef.current = new Worker(new URL("../worker.ts", import.meta.url), { + type: "module", + }); + + workerRef.current.onmessage = (event: MessageEvent) => { + const { programHash, programOutput, error } = event.data; + + if (error) { + console.error(error); + addLog("Verification Failed"); + setButtonColor("error"); + } else { + addLog(`programHash: ${programHash}`); + addLog(`programOutput: ${programOutput}`); + addLog(`layout: ${layout}`); + addLog(`commitment: ${commitment}`); + addLog("Proof Verified"); + setButtonColor("success"); + } + + workerRef.current?.terminate(); + }; + + if (layout && commitment) { + const message: WorkerMessage = { + proof, + layout, + commitment, + }; + + workerRef.current.postMessage(message); + } + }; + const ondrop = ( acceptedFiles: T[], _fileRejections: FileRejection[], @@ -104,8 +162,6 @@ export default function Home() { pie: Array.from(fileBytes), }); - console.log(requestBody); - let subscriber: EventSource | null = null; try { @@ -127,25 +183,36 @@ export default function Home() { const data: DelegateResponse = DelegateResponse.parse( await response.json(), ); - console.log("Job hash:", data.job_hash); - + addLog(`Job ${data.job_key} sent to the p2p network`) setActiveStep(1); - setIsProcessing(data.job_hash); + setIsProcessing(data.job_key); subscriber = subscribeEvents( `${process.env.NEXT_PUBLIC_API_URL}/job_events`, - `job_hash=${data.job_hash.toString()}`, - (event) => { + `job_key=${data.job_key.toString()}`, + async (event) => { let job_event = JobEventsResponse.parse(event); + if (job_event.type == "BidReceived") { + let peer_id = PeerId.parse(job_event.data); + addLog(`Recived bid for job ${data.job_key} from peer ${peer_id}`) + setActiveStep(2); + } + if (job_event.type == "Delegated") { + let peer_id = PeerId.parse(job_event.data); + addLog(`Job ${data.job_key} delegated to peer ${peer_id}`) + setActiveStep(3); + } if (job_event.type == "Finished") { let proof = Proof.parse(job_event.data); - setActiveStep(3); + addLog(`Job ${data.job_key} proof received`) + setActiveStep(4); setDownloadBlob([ new Blob([new Uint8Array(proof)]), - `${data.job_hash}_proof.json`, + `${data.job_key}_proof.json`, ]); setIsProcessing(null); subscriber?.close(); + await verifyProof(new TextDecoder().decode(new Uint8Array(job_event.data))) } }, ); @@ -160,14 +227,32 @@ export default function Home() { reader.readAsArrayBuffer(file); }; - const [isProcessing, setIsProcessing] = useState(null); + const [isProcessing, setIsProcessing] = useState(null); + const [logs, setLogs] = useState([]); const [activeStep, setActiveStep] = useState(0); const [downloadBlob, setDownloadBlob] = useState<[Blob, string] | null>(null); + const [buttonColor, setButtonColor] = useState< + | "inherit" + | "primary" + | "secondary" + | "success" + | "error" + | "info" + | "warning" + >("primary"); const { getRootProps, getInputProps, isDragActive } = useDropzone({ onDrop: ondrop, }); + const scrollContainerRef = useRef(null); + + useEffect(() => { + if (scrollContainerRef.current) { + scrollContainerRef.current.scrollTop = scrollContainerRef.current.scrollHeight; + } + }, [logs]); + return (
@@ -213,10 +298,16 @@ export default function Home() { ))} +
+ {logs.map((log, index) => ( +
{log}
+ ))} +