Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Okm165 committed Aug 14, 2024
1 parent 6682339 commit d37ead4
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 57 deletions.
66 changes: 30 additions & 36 deletions crates/delegator/src/bid_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use zetina_common::process::Process;

pub struct BidQueue {}

pub type BidQueueResult = Result<(kad::RecordKey, BTreeMap<u64, Vec<PeerId>>), BidControllerError>;

impl BidQueue {
pub fn new() -> Self {
Self {}
Expand All @@ -26,46 +28,38 @@ impl BidQueue {
) {
let (terminate_tx, mut terminate_rx) = mpsc::channel::<()>(10);
let (bid_tx, mut bid_rx) = mpsc::channel::<(u64, PeerId)>(10);
let future: Pin<
Box<
dyn Future<
Output = Result<
(kad::RecordKey, BTreeMap<u64, Vec<PeerId>>),
BidControllerError,
>,
> + Send
+ '_,
>,
> = Box::pin(async move {
let duration = Duration::from_secs(5);
let mut bids: Option<BTreeMap<u64, Vec<PeerId>>> = Some(BTreeMap::new());
loop {
tokio::select! {
Some((price, peerid)) = bid_rx.recv() => {
match &mut bids {
Some(bids) => {
match bids.get_mut(&price) {
Some(vec) => {
vec.push(peerid);
},
None => {
bids.insert(price, vec![peerid]);
let future: Pin<Box<dyn Future<Output = BidQueueResult> + Send + '_>> = Box::pin(
async move {
let duration = Duration::from_secs(5);
let mut bids: Option<BTreeMap<u64, Vec<PeerId>>> = Some(BTreeMap::new());
loop {
tokio::select! {
Some((price, peerid)) = bid_rx.recv() => {
match &mut bids {
Some(bids) => {
match bids.get_mut(&price) {
Some(vec) => {
vec.push(peerid);
},
None => {
bids.insert(price, vec![peerid]);
}
}
}
},
None => break Err(BidControllerError::BidsTerminated)
},
None => break Err(BidControllerError::BidsTerminated)
}
}
_ = sleep(duration) => {
break Ok((job_hash, bids.take().ok_or(BidControllerError::BidsTerminated)?))
}
_ = terminate_rx.recv() => {
break Err(BidControllerError::TaskTerminated);
}
else => break Err(BidControllerError::TaskTerminated)
}
_ = sleep(duration) => {
break Ok((job_hash, bids.take().ok_or(BidControllerError::BidsTerminated)?))
}
_ = terminate_rx.recv() => {
break Err(BidControllerError::TaskTerminated);
}
else => break Err(BidControllerError::TaskTerminated)
}
}
});
},
);

(Process::new(future, terminate_tx), bid_tx)
}
Expand Down
11 changes: 5 additions & 6 deletions crates/delegator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
FieldElement::from_byte_slice_be(private_key.as_slice()).unwrap(),
);

let mut swarm_runner = SwarmRunner::new(
let swarm_runner = SwarmRunner::new(
cli.listen_address.parse()?,
cli.dial_addresses
.iter()
.map(|addr| Multiaddr::from_str(addr))
.collect::<Result<Vec<Multiaddr>, _>>()?,
p2p_keypair,
Multiaddr::from_str(&cli.address).unwrap(),
)?;

cli.dial_addresses
.into_iter()
.try_for_each(|addr| swarm_runner.swarm.dial(Multiaddr::from_str(&addr).unwrap()))
.unwrap();

let (gossipsub_tx, gossipsub_rx) = mpsc::channel::<GossipsubMessage>(100);
let (kademlia_tx, kademlia_rx) = mpsc::channel::<KademliaMessage>(100);
let swarm_events = swarm_runner.run(gossipsub_rx, kademlia_rx);
Expand Down
13 changes: 6 additions & 7 deletions crates/executor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.join("../../");
let bootloader_program_path = ws_root.join("target/bootloader.json");

let mut swarm_runner = SwarmRunner::new(
let swarm_runner = SwarmRunner::new(
cli.listen_address.parse()?,
cli.dial_addresses
.iter()
.map(|addr| Multiaddr::from_str(addr))
.collect::<Result<Vec<Multiaddr>, _>>()?,
p2p_keypair,
Multiaddr::from_str(&cli.address).unwrap(),
Multiaddr::from_str(&cli.address)?,
)?;

cli.dial_addresses
.into_iter()
.try_for_each(|addr| swarm_runner.swarm.dial(Multiaddr::from_str(&addr).unwrap()))
.unwrap();

let (gossipsub_tx, gossipsub_rx) = mpsc::channel::<GossipsubMessage>(100);
let (kademlia_tx, kademlia_rx) = mpsc::channel::<KademliaMessage>(100);
let swarm_events = swarm_runner.run(gossipsub_rx, kademlia_rx);
Expand Down
18 changes: 14 additions & 4 deletions crates/peer/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub struct PeerBehaviour {

pub struct SwarmRunner {
pub swarm: Swarm<PeerBehaviour>,
pub listen_multiaddr: Multiaddr,
pub dial_multiaddrs: Vec<Multiaddr>,
pub p2p_keypair: Keypair,
pub p2p_multiaddr: Multiaddr,
}

Expand Down Expand Up @@ -88,12 +91,14 @@ pub enum DelegationMessage {
impl SwarmRunner {
pub fn new(
listen_multiaddr: Multiaddr,
dial_multiaddrs: Vec<Multiaddr>,
p2p_keypair: Keypair,
p2p_multiaddr: Multiaddr,
) -> Result<Self, Box<dyn std::error::Error>> {
let mut config = Config::default();
config.set_max_packet_size(1024 * 1024 * 100);
let mut swarm = SwarmBuilder::with_existing_identity(p2p_keypair)
config.set_query_timeout(Duration::from_secs(60));
let mut swarm = SwarmBuilder::with_existing_identity(p2p_keypair.to_owned())
.with_tokio()
.with_tcp(
tcp::Config::default().port_reuse(true),
Expand All @@ -115,17 +120,19 @@ impl SwarmRunner {
),
gossipsub: Self::init_gossip(p2p_keypair).unwrap(),
})?
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(10)))
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
.build();

swarm.behaviour_mut().gossipsub.subscribe(&IdentTopic::new(Topic::Networking.as_str()))?;
swarm.behaviour_mut().gossipsub.subscribe(&IdentTopic::new(Topic::Market.as_str()))?;
swarm.behaviour_mut().gossipsub.subscribe(&IdentTopic::new(Topic::Delegation.as_str()))?;
swarm.behaviour_mut().kademlia.set_mode(Some(Mode::Server));
// swarm.listen_on("/ip4/0.0.0.0/udp/5678/quic-v1".parse()?)?;
swarm.listen_on(listen_multiaddr)?;
swarm.listen_on(listen_multiaddr.to_owned())?;

Ok(SwarmRunner { swarm, p2p_multiaddr })
dial_multiaddrs.iter().try_for_each(|addr| swarm.dial(addr.clone()))?;

Ok(SwarmRunner { swarm, listen_multiaddr, dial_multiaddrs, p2p_keypair, p2p_multiaddr })
}

fn init_gossip(
Expand Down Expand Up @@ -229,6 +236,9 @@ impl SwarmRunner {
if num_established == 0 {
self.swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id);
self.swarm.behaviour_mut().kademlia.remove_address(&peer_id, endpoint.get_remote_address());
if let Err(err) = self.swarm.dial(endpoint.get_remote_address().to_owned()) {
error!("Failed to re-dial peer: {err:?}");
}
}
}
SwarmEvent::Behaviour(PeerBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { id, result, stats, step })) => {
Expand Down
1 change: 0 additions & 1 deletion crates/prover/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub mod errors;
pub mod stone_prover;
#[allow(async_fn_in_trait)]
pub mod traits;
4 changes: 2 additions & 2 deletions crates/prover/src/stone_prover/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ pub fn params(n_steps: u64) -> Params {
)
.collect(),
last_layer_degree_bound,
n_queries: 20,
n_queries: 10,
proof_of_work_bits: 30,
},
log_n_cosets: 2,
log_n_cosets: 1,
},
..Default::default()
}
Expand Down
1 change: 0 additions & 1 deletion crates/runner/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub mod cairo_runner;
pub mod errors;
#[allow(async_fn_in_trait)]
pub mod traits;

0 comments on commit d37ead4

Please sign in to comment.