Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed hash table #27

Merged
merged 4 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions crates/common/src/job.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::hash;
use cairo_vm::vm::runners::cairo_pie::CairoPie;
use libp2p::PeerId;
use libp2p::{kad, PeerId};
use serde::{Deserialize, Serialize};
use starknet::signers::{SigningKey, VerifyingKey};
use starknet_crypto::{poseidon_hash_many, FieldElement, Signature};
Expand Down Expand Up @@ -101,17 +101,10 @@ impl Display for Job {
}
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobBid {
pub identity: PeerId,
pub job_hash: u64,
pub price: u64,
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct JobDelegation {
pub identity: PeerId,
pub job: Job,
pub job_key: kad::RecordKey,
pub price: u64,
}

Expand Down
35 changes: 32 additions & 3 deletions crates/common/src/job_trace.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::hash;
use libp2p::kad;
use std::{
fmt::Display,
hash::{DefaultHasher, Hash, Hasher},
mem::ManuallyDrop,
};
use tempfile::NamedTempFile;

Expand All @@ -13,11 +15,38 @@ use tempfile::NamedTempFile;

#[derive(Debug)]
pub struct JobTrace {
pub job_hash: u64,
pub job_key: kad::RecordKey,
pub air_public_input: NamedTempFile, // Temporary file containing the public input
pub air_private_input: NamedTempFile, // Temporary file containing the private input; memory and trace files must exist for this to be valid
pub memory: NamedTempFile, // Temporary file containing memory data (required for air_private_input validity)
pub trace: NamedTempFile, // Temporary file containing trace data (required for air_private_input validity)
pub memory: ManuallyDrop<NamedTempFile>, // Temporary file containing memory data (required for air_private_input validity)
pub trace: ManuallyDrop<NamedTempFile>, // Temporary file containing trace data (required for air_private_input validity)
}

impl JobTrace {
pub fn new(
job_key: kad::RecordKey,
air_public_input: NamedTempFile,
air_private_input: NamedTempFile,
memory: NamedTempFile,
trace: NamedTempFile,
) -> Self {
Self {
job_key,
air_public_input,
air_private_input,
memory: ManuallyDrop::new(memory),
trace: ManuallyDrop::new(trace),
}
}
}

impl Drop for JobTrace {
fn drop(&mut self) {
unsafe {
ManuallyDrop::drop(&mut self.memory);
ManuallyDrop::drop(&mut self.trace);
}
}
}

impl Hash for JobTrace {
Expand Down
3 changes: 2 additions & 1 deletion crates/common/src/job_witness.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::hash;
use libp2p::kad;
use serde::{Deserialize, Serialize};
use std::{
fmt::Display,
Expand All @@ -14,7 +15,7 @@ use std::{

#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct JobWitness {
pub job_hash: u64,
pub job_key: kad::RecordKey,
pub proof: Vec<u8>,
}

Expand Down
30 changes: 16 additions & 14 deletions crates/delegator/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use axum::{
};
use futures::StreamExt;
use hyper::StatusCode;
use libp2p::PeerId;
use libp2p::kad;
use serde::{Deserialize, Serialize};
use std::hash::{DefaultHasher, Hash, Hasher};
use std::{io, time::Duration};
Expand All @@ -19,7 +19,7 @@ use crate::delegator::DelegatorEvent;
#[derive(Debug)]
pub struct ServerState {
pub delegate_tx: mpsc::Sender<JobData>,
pub events_rx: broadcast::Receiver<(u64, DelegatorEvent)>,
pub events_rx: broadcast::Receiver<(kad::RecordKey, DelegatorEvent)>,
}

impl Clone for ServerState {
Expand All @@ -39,29 +39,29 @@ pub struct DelegateRequest {

#[derive(Debug, Serialize)]
pub struct DelegateResponse {
job_hash: String,
job_key: String,
}

pub async fn deletage_handler(
State(state): State<ServerState>,
Json(input): Json<DelegateRequest>,
) -> Result<Json<DelegateResponse>, StatusCode> {
let job_data = JobData::new(input.pie);
let job_data_hash = hash!(&job_data);
let job_data_hash = kad::RecordKey::new(&hash!(job_data).to_be_bytes());
state.delegate_tx.send(job_data).await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(DelegateResponse { job_hash: job_data_hash.to_string() }))
Ok(Json(DelegateResponse { job_key: hex::encode(job_data_hash) }))
}

#[derive(Debug, Deserialize)]
pub struct JobEventsRequest {
job_hash: String,
job_key: String,
}

#[derive(Debug, Serialize)]
#[serde(tag = "type", content = "data")]
pub enum JobEventsResponse {
BidReceived(PeerId),
Delegated(PeerId),
BidReceived(String),
Delegated(String),
Finished(Vec<u8>),
}

Expand All @@ -70,17 +70,19 @@ pub async fn job_events_handler(
Query(input): Query<JobEventsRequest>,
) -> Sse<impl Stream<Item = Result<Event, io::Error>>> {
let stream = stream! {
let job_hash = input.job_hash.parse::<u64>()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
let job_key = kad::RecordKey::new(
&hex::decode(input.job_key)
.map_err(|e| io::Error::new(io::ErrorKind::BrokenPipe, e.to_string()))?
);
loop {
tokio::select! {
Ok((hash, event)) = state.events_rx.recv() => {
if hash == job_hash {
Ok((key, event)) = state.events_rx.recv() => {
if key == job_key {
yield Event::default()
.json_data(
match event {
DelegatorEvent::BidReceived(peer_id) => { JobEventsResponse::BidReceived(peer_id) },
DelegatorEvent::Delegated(peer_id) => { JobEventsResponse::Delegated(peer_id) },
DelegatorEvent::BidReceived(peer_id) => { JobEventsResponse::BidReceived(peer_id.to_base58()) },
DelegatorEvent::Delegated(peer_id) => { JobEventsResponse::Delegated(peer_id.to_base58()) },
DelegatorEvent::Finished(data) => { JobEventsResponse::Finished(data) },
}
)
Expand Down
18 changes: 11 additions & 7 deletions crates/delegator/src/bid_queue.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use libp2p::PeerId;
use libp2p::{kad, PeerId};
use std::{collections::BTreeMap, future::Future, pin::Pin, time::Duration};
use tokio::{sync::mpsc, time::sleep};
use zetina_common::{job::Job, process::Process};
use zetina_common::process::Process;

pub struct BidQueue {}

Expand All @@ -19,17 +19,21 @@ impl Default for BidQueue {

impl BidQueue {
pub fn run<'future>(
job: Job,
job_hash: kad::RecordKey,
) -> (
Process<'future, Result<(Job, BTreeMap<u64, Vec<PeerId>>), BidControllerError>>,
Process<'future, Result<(kad::RecordKey, BTreeMap<u64, Vec<PeerId>>), BidControllerError>>,
mpsc::Sender<(u64, PeerId)>,
) {
let (terminate_tx, mut terminate_rx) = mpsc::channel::<()>(10);
let (bid_tx, mut bid_rx) = mpsc::channel::<(u64, PeerId)>(10);
let future: Pin<
Box<
dyn Future<Output = Result<(Job, BTreeMap<u64, Vec<PeerId>>), BidControllerError>>
+ Send
dyn Future<
Output = Result<
(kad::RecordKey, BTreeMap<u64, Vec<PeerId>>),
BidControllerError,
>,
> + Send
+ '_,
>,
> = Box::pin(async move {
Expand All @@ -53,7 +57,7 @@ impl BidQueue {
}
}
_ = sleep(duration) => {
break Ok((job, bids.take().ok_or(BidControllerError::BidsTerminated)?))
break Ok((job_hash, bids.take().ok_or(BidControllerError::BidsTerminated)?))
}
_ = terminate_rx.recv() => {
break Err(BidControllerError::TaskTerminated);
Expand Down
89 changes: 63 additions & 26 deletions crates/delegator/src/delegator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::bid_queue::{BidControllerError, BidQueue};
use futures::stream::FuturesUnordered;
use futures::Stream;
use libp2p::{gossipsub, PeerId};
use libp2p::{gossipsub, kad, PeerId};
use starknet::signers::SigningKey;
use std::collections::{BTreeMap, HashMap};
use std::hash::{DefaultHasher, Hash, Hasher};
Expand All @@ -13,10 +13,11 @@ use tokio_stream::StreamExt;
use tracing::{error, info};
use zetina_common::graceful_shutdown::shutdown_signal;
use zetina_common::hash;
use zetina_common::job::{Job, JobData, JobDelegation};
use zetina_common::job::{Job, JobBid, JobData};
use zetina_common::job_witness::JobWitness;
use zetina_common::process::Process;
use zetina_peer::swarm::{
DelegationMessage, GossipsubMessage, MarketMessage, PeerBehaviourEvent, Topic,
DelegationMessage, GossipsubMessage, KademliaMessage, MarketMessage, PeerBehaviourEvent, Topic,
};

pub struct Delegator {
Expand All @@ -27,68 +28,99 @@ impl Delegator {
pub fn new(
mut swarm_events: Pin<Box<dyn Stream<Item = PeerBehaviourEvent> + Send>>,
gossipsub_tx: Sender<GossipsubMessage>,
kademlia_tx: Sender<KademliaMessage>,
mut delegate_rx: mpsc::Receiver<JobData>,
events_tx: broadcast::Sender<(u64, DelegatorEvent)>,
events_tx: broadcast::Sender<(kad::RecordKey, DelegatorEvent)>,
signing_key: SigningKey,
) -> Self {
Self {
handle: Some(tokio::spawn(async move {
let mut job_bid_scheduler = FuturesUnordered::<
Process<Result<(Job, BTreeMap<u64, Vec<PeerId>>), BidControllerError>>,
Process<
Result<(kad::RecordKey, BTreeMap<u64, Vec<PeerId>>), BidControllerError>,
>,
>::new();
let mut job_hash_store = HashMap::<u64, mpsc::Sender<(u64, PeerId)>>::new();
let mut job_hash_store =
HashMap::<kad::RecordKey, mpsc::Sender<(u64, PeerId)>>::new();
let mut proof_hash_store = HashMap::<kad::RecordKey, kad::RecordKey>::new();

loop {
tokio::select! {
Some(job_data) = delegate_rx.recv() => {
let job = Job::try_from_job_data(job_data, &signing_key);
gossipsub_tx.send(GossipsubMessage {
topic: Topic::Market.into(),
data: serde_json::to_vec(&MarketMessage::Job(job.to_owned()))?
}).await?;
info!("Propagated job: {} for bidding", hash!(job));
let (process, bid_tx) = BidQueue::run(job.to_owned());
job_bid_scheduler.push(process);
job_hash_store.insert(hash!(job), bid_tx);
let job_key = kad::RecordKey::new(&hash!(job).to_be_bytes());
kademlia_tx.send(KademliaMessage::PUT(
(job_key, serde_json::to_vec(&job)?)
)).await?;
},
Some(event) = swarm_events.next() => {
match event {
PeerBehaviourEvent::Gossipsub(gossipsub::Event::Message { message, .. }) => {
if message.topic == Topic::Market.into() {
match serde_json::from_slice::<MarketMessage>(&message.data)? {
MarketMessage::JobBid(job_bid) => {
if let Some(bid_tx) = job_hash_store.get_mut(&job_bid.job_hash) {
info!("Received job bid: {} price: {} from: {}", job_bid.job_hash, job_bid.price, job_bid.identity);
if let Some(bid_tx) = job_hash_store.get_mut(&job_bid.job_key) {
info!("Received job bid: {} price: {} from: {}", hex::encode(&job_bid.job_key), job_bid.price, job_bid.identity);
bid_tx.send((job_bid.price, job_bid.identity)).await?;
events_tx.send((job_bid.job_hash, DelegatorEvent::BidReceived(job_bid.identity)))?;
events_tx.send((job_bid.job_key, DelegatorEvent::BidReceived(job_bid.identity)))?;
}
}
_ => {}
}
}
if message.topic == Topic::Delegation.into() {
match serde_json::from_slice::<DelegationMessage>(&message.data)? {
DelegationMessage::Finished(job_witness) => {
info!("Received finished job: {}", job_witness.job_hash);
events_tx.send((job_witness.job_hash, DelegatorEvent::Finished(job_witness.proof)))?;
DelegationMessage::Finished(proof_key, job_key) => {
if job_hash_store.remove(&job_key).is_some() {
info!("Received finished job: {} proof key: {}", hex::encode(&job_key), hex::encode(&proof_key));
proof_hash_store.insert(proof_key.to_owned(), job_key);
kademlia_tx.send(KademliaMessage::GET(proof_key)).await?;
}
}
_ => {}
}
}
},
PeerBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { result, ..}) => {
match result {
kad::QueryResult::PutRecord(Ok(kad::PutRecordOk { key })) => {
gossipsub_tx.send(GossipsubMessage {
topic: Topic::Market.into(),
data: serde_json::to_vec(&MarketMessage::JobBidPropagation(key.to_owned()))?
}).await?;
info!("Propagated job: {} for bidding", hex::encode(&key));
let (process, bid_tx) = BidQueue::run(key.to_owned());
job_bid_scheduler.push(process);
job_hash_store.insert(key, bid_tx);
},
kad::QueryResult::GetRecord(Ok(
kad::GetRecordOk::FoundRecord(kad::PeerRecord {
record: kad::Record { key, value, .. },
..
})
)) => {
if let Some ((proof_key, job_key)) = proof_hash_store.remove_entry(&key) {
info!("job {} proof with key: {} returned in DHT", hex::encode(&job_key), hex::encode(&proof_key));
let job_witness: JobWitness = serde_json::from_slice(&value)?;
events_tx.send((job_key, DelegatorEvent::Finished(job_witness.proof)))?;
}
},
_ => {}
}
}
_ => {}
}
}
Some(Ok((job, bids))) = job_bid_scheduler.next() => {
job_hash_store.remove(&hash!(job));
Some(Ok((job_key, bids))) = job_bid_scheduler.next() => {
let bid = bids.first_key_value().unwrap();
let price = *bid.0;
let identity = *bid.1.first().unwrap();
info!("Job {} delegated to best bidder: {}", hash!(job), identity);
info!("Job {} delegated to best bidder: {}", hex::encode(&job_key), identity);
gossipsub_tx.send(GossipsubMessage {
topic: Topic::Delegation.into(),
data: serde_json::to_vec(&DelegationMessage::Delegate(JobDelegation{identity, job: job.to_owned(), price}))?
data: serde_json::to_vec(&DelegationMessage::Delegate(JobBid{identity, job_key: job_key.to_owned(), price}))?
}).await?;
events_tx.send((hash!(job), DelegatorEvent::Delegated(identity)))?;
events_tx.send((job_key, DelegatorEvent::Delegated(identity)))?;
}
_ = shutdown_signal() => {
break
Expand Down Expand Up @@ -125,8 +157,13 @@ pub enum Error {
#[error("mpsc_send_error GossipsubMessage")]
MpscSendErrorGossipsubMessage(#[from] mpsc::error::SendError<GossipsubMessage>),

#[error("mpsc_send_error KademliaMessage")]
MpscSendErrorKademliaMessage(#[from] mpsc::error::SendError<KademliaMessage>),

#[error("mpsc_send_error DelegatorEvent")]
BreadcastSendErrorDelegatorEvent(#[from] broadcast::error::SendError<(u64, DelegatorEvent)>),
BreadcastSendErrorDelegatorEvent(
#[from] broadcast::error::SendError<(kad::RecordKey, DelegatorEvent)>,
),

#[error("mpsc_send_error JobBid")]
MpscSendErrorJobBid(#[from] mpsc::error::SendError<(u64, PeerId)>),
Expand Down
Loading
Loading