Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Working on reconnect logic #154

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions examples/async_helpers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Helper functions to be runtime agnostic
use futures::Future;

#[cfg(feature = "tokio-runtime")]
extern crate tokio;
Expand All @@ -20,3 +21,23 @@ pub async fn sleep(duration: std::time::Duration) {
pub async fn sleep(duration: std::time::Duration) {
async_std::task::sleep(duration).await
}

#[allow(unused)]
#[cfg(feature = "tokio-runtime")]
pub fn spawn<T>(future: T)
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
tokio::spawn(future);
}

#[allow(unused)]
#[cfg(feature = "async-std-runtime")]
pub fn spawn<T>(future: T)
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
async_std::spawn(future);
}
24 changes: 24 additions & 0 deletions examples/dealer_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
mod async_helpers;

use futures::StreamExt;
use std::{error::Error, time::Duration};
use zeromq::prelude::*;

#[async_helpers::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mut client = zeromq::DealerSocket::new();
let mut monitor = client.monitor();
async_helpers::spawn(async move {
while let Some(event) = monitor.next().await {
dbg!(event);
}
});

client.connect("tcp://127.0.0.1:5559").await?;

loop {
let result = client.send("Test message".into()).await;
dbg!(result);
async_helpers::sleep(Duration::from_secs(1)).await;
}
}
17 changes: 17 additions & 0 deletions examples/router_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
mod async_helpers;

use std::{convert::TryFrom, error::Error};
use zeromq::{prelude::*, util::PeerIdentity, SocketOptions};

#[async_helpers::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mut options = SocketOptions::default();
options.peer_identity(PeerIdentity::try_from(Vec::from("SomeCustomId")).unwrap());
let mut frontend = zeromq::RouterSocket::with_options(options);
frontend.bind("tcp://127.0.0.1:5559").await?;

loop {
let message = frontend.recv().await?;
dbg!(message);
}
}
66 changes: 49 additions & 17 deletions src/backend.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::codec::{FramedIo, Message, ZmqFramedRead, ZmqFramedWrite};
use crate::fair_queue::QueueInner;
use crate::util::PeerIdentity;
use crate::util::{self, PeerIdentity};
use crate::{
MultiPeerBackend, SocketBackend, SocketEvent, SocketOptions, SocketType, ZmqError, ZmqResult,
async_rt, Endpoint, MultiPeerBackend, SocketBackend, SocketEvent, SocketOptions, SocketType,
ZmqError, ZmqResult,
};
use async_trait::async_trait;
use crossbeam::queue::SegQueue;
Expand All @@ -23,6 +24,7 @@ pub(crate) struct GenericSocketBackend {
socket_type: SocketType,
socket_options: SocketOptions,
pub(crate) socket_monitor: Mutex<Option<mpsc::Sender<SocketEvent>>>,
connect_endpoints: DashMap<PeerIdentity, Endpoint>,
}

impl GenericSocketBackend {
Expand All @@ -38,10 +40,14 @@ impl GenericSocketBackend {
socket_type,
socket_options: options,
socket_monitor: Mutex::new(None),
connect_endpoints: DashMap::new(),
}
}

pub(crate) async fn send_round_robin(&self, message: Message) -> ZmqResult<PeerIdentity> {
pub(crate) async fn send_round_robin(
self: &Arc<Self>,
message: Message,
) -> ZmqResult<PeerIdentity> {
// In normal scenario this will always be only 1 iteration
// There can be special case when peer has disconnected and his id is still in
// RR queue This happens because SegQueue don't have an api to delete
Expand Down Expand Up @@ -71,7 +77,7 @@ impl GenericSocketBackend {
Ok(next_peer_id)
}
Err(e) => {
self.peer_disconnected(&next_peer_id);
self.clone().peer_disconnected(&next_peer_id);
Err(e.into())
}
};
Expand Down Expand Up @@ -99,25 +105,51 @@ impl SocketBackend for GenericSocketBackend {

#[async_trait]
impl MultiPeerBackend for GenericSocketBackend {
async fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo) {
async fn peer_connected(
self: Arc<Self>,
peer_id: &PeerIdentity,
io: FramedIo,
endpoint: Option<Endpoint>,
) {
let (recv_queue, send_queue) = io.into_parts();
self.peers.insert(peer_id.clone(), Peer { send_queue });
self.round_robin.push(peer_id.clone());
match &self.fair_queue_inner {
None => {}
Some(inner) => {
inner.lock().insert(peer_id.clone(), recv_queue);
}
};

if let Some(queue_inner) = &self.fair_queue_inner {
queue_inner.lock().insert(peer_id.clone(), recv_queue);
}

if let Some(e) = endpoint {
self.connect_endpoints.insert(peer_id.clone(), e);
}
}

fn peer_disconnected(&self, peer_id: &PeerIdentity) {
fn peer_disconnected(self: Arc<Self>, peer_id: &PeerIdentity) {
if let Some(monitor) = self.monitor().lock().as_mut() {
let _ = monitor.try_send(SocketEvent::Disconnected(peer_id.clone()));
}

self.peers.remove(peer_id);
match &self.fair_queue_inner {
None => {}
Some(inner) => {
inner.lock().remove(peer_id);
}
if let Some(inner) = &self.fair_queue_inner {
inner.lock().remove(peer_id);
}

let endpoint = match self.connect_endpoints.remove(peer_id) {
Some((_, e)) => e,
None => return,
};
let backend = self;

async_rt::task::spawn(async move {
let (socket, endpoint) = util::connect_forever(endpoint)
.await
.expect("Failed to connect");
let peer_id = util::peer_connected(socket, backend.clone(), Some(endpoint.clone()))
.await
.expect("Failed to handshake");
if let Some(monitor) = backend.monitor().lock().as_mut() {
let _ = monitor.try_send(SocketEvent::Connected(endpoint, peer_id));
}
});
}
}
21 changes: 14 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,13 @@ pub trait MultiPeerBackend: SocketBackend {
/// This should not be public..
/// Find a better way of doing this

async fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo);
fn peer_disconnected(&self, peer_id: &PeerIdentity);
async fn peer_connected(
self: Arc<Self>,
peer_id: &PeerIdentity,
io: FramedIo,
endpoint: Option<Endpoint>,
);
fn peer_disconnected(self: Arc<Self>, peer_id: &PeerIdentity);
}

pub trait SocketBackend: Send + Sync {
Expand Down Expand Up @@ -192,7 +197,7 @@ pub trait Socket: Sized + Send {
async move {
let result = match result {
Ok((socket, endpoint)) => {
match util::peer_connected(socket, cloned_backend.clone()).await {
match util::peer_connected(socket, cloned_backend.clone(), None).await {
Ok(peer_id) => Ok((endpoint, peer_id)),
Err(e) => Err(e),
}
Expand Down Expand Up @@ -260,10 +265,12 @@ pub trait Socket: Sized + Send {
let endpoint = endpoint.try_into()?;

let result = match util::connect_forever(endpoint).await {
Ok((socket, endpoint)) => match util::peer_connected(socket, backend).await {
Ok(peer_id) => Ok((endpoint, peer_id)),
Err(e) => Err(e),
},
Ok((socket, endpoint)) => {
match util::peer_connected(socket, backend, Some(endpoint.clone())).await {
Ok(peer_id) => Ok((endpoint, peer_id)),
Err(e) => Err(e),
}
}
Err(e) => Err(e),
};
match result {
Expand Down
11 changes: 8 additions & 3 deletions src/pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,12 @@ impl SocketBackend for PubSocketBackend {

#[async_trait]
impl MultiPeerBackend for PubSocketBackend {
async fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo) {
async fn peer_connected(
self: Arc<Self>,
peer_id: &PeerIdentity,
io: FramedIo,
endpoint: Option<Endpoint>,
) {
let (mut recv_queue, send_queue) = io.into_parts();
// TODO provide handling for recv_queue
let (sender, stop_receiver) = oneshot::channel();
Expand Down Expand Up @@ -143,7 +148,7 @@ impl MultiPeerBackend for PubSocketBackend {
});
}

fn peer_disconnected(&self, peer_id: &PeerIdentity) {
fn peer_disconnected(self: Arc<Self>, peer_id: &PeerIdentity) {
log::info!("Client disconnected {:?}", peer_id);
self.subscribers.remove(peer_id);
}
Expand Down Expand Up @@ -197,7 +202,7 @@ impl SocketSend for PubSocket {
}
}
for peer in dead_peers {
self.backend.peer_disconnected(&peer);
self.backend.clone().peer_disconnected(&peer);
}
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl SocketRecv for PullSocket {
}
Some((_peer_id, Ok(msg))) => todo!("Unimplemented message: {:?}", msg),
Some((peer_id, Err(_))) => {
self.backend.peer_disconnected(&peer_id);
self.backend.clone().peer_disconnected(&peer_id);
}
None => todo!(),
};
Expand Down
9 changes: 7 additions & 2 deletions src/rep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,12 @@ impl Socket for RepSocket {

#[async_trait]
impl MultiPeerBackend for RepSocketBackend {
async fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo) {
async fn peer_connected(
self: Arc<Self>,
peer_id: &PeerIdentity,
io: FramedIo,
endpoint: Option<Endpoint>,
) {
let (recv_queue, send_queue) = io.into_parts();

self.peers.insert(
Expand All @@ -89,7 +94,7 @@ impl MultiPeerBackend for RepSocketBackend {
.insert(peer_id.clone(), recv_queue);
}

fn peer_disconnected(&self, peer_id: &PeerIdentity) {
fn peer_disconnected(self: Arc<Self>, peer_id: &PeerIdentity) {
if let Some(monitor) = self.monitor().lock().as_mut() {
let _ = monitor.try_send(SocketEvent::Disconnected(peer_id.clone()));
}
Expand Down
9 changes: 7 additions & 2 deletions src/req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,12 @@ impl Socket for ReqSocket {

#[async_trait]
impl MultiPeerBackend for ReqSocketBackend {
async fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo) {
async fn peer_connected(
self: Arc<Self>,
peer_id: &PeerIdentity,
io: FramedIo,
endpoint: Option<Endpoint>,
) {
let (recv_queue, send_queue) = io.into_parts();
self.peers.insert(
peer_id.clone(),
Expand All @@ -141,7 +146,7 @@ impl MultiPeerBackend for ReqSocketBackend {
self.round_robin.push(peer_id.clone());
}

fn peer_disconnected(&self, peer_id: &PeerIdentity) {
fn peer_disconnected(self: Arc<Self>, peer_id: &PeerIdentity) {
self.peers.remove(peer_id);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl SocketRecv for RouterSocket {
}
Some((_peer_id, Ok(msg))) => todo!("Unimplemented message: {:?}", msg),
Some((peer_id, Err(_))) => {
self.backend.peer_disconnected(&peer_id);
self.backend.clone().peer_disconnected(&peer_id);
}
None => todo!(),
};
Expand Down
11 changes: 8 additions & 3 deletions src/sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,12 @@ impl SocketBackend for SubSocketBackend {

#[async_trait]
impl MultiPeerBackend for SubSocketBackend {
async fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo) {
async fn peer_connected(
self: Arc<Self>,
peer_id: &PeerIdentity,
io: FramedIo,
endpoint: Option<Endpoint>,
) {
let (recv_queue, mut send_queue) = io.into_parts();

let subs_msgs: Vec<ZmqMessage> = self
Expand All @@ -106,7 +111,7 @@ impl MultiPeerBackend for SubSocketBackend {
};
}

fn peer_disconnected(&self, peer_id: &PeerIdentity) {
fn peer_disconnected(self: Arc<Self>, peer_id: &PeerIdentity) {
self.peers.remove(peer_id);
}
}
Expand Down Expand Up @@ -192,7 +197,7 @@ impl SocketRecv for SubSocket {
}
Some((_peer_id, Ok(msg))) => todo!("Unimplemented message: {:?}", msg),
Some((peer_id, Err(_))) => {
self.backend.peer_disconnected(&peer_id);
self.backend.clone().peer_disconnected(&peer_id);
}
None => todo!(),
}
Expand Down
3 changes: 2 additions & 1 deletion src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ pub(crate) async fn ready_exchange(
pub(crate) async fn peer_connected(
mut raw_socket: FramedIo,
backend: Arc<dyn MultiPeerBackend>,
endpoint: Option<Endpoint>,
) -> ZmqResult<PeerIdentity> {
greet_exchange(&mut raw_socket).await?;
let mut props = None;
Expand All @@ -191,7 +192,7 @@ pub(crate) async fn peer_connected(
props = Some(connect_ops);
}
let peer_id = ready_exchange(&mut raw_socket, backend.socket_type(), props).await?;
backend.peer_connected(&peer_id, raw_socket).await;
backend.peer_connected(&peer_id, raw_socket, endpoint).await;
Ok(peer_id)
}

Expand Down