You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
It took me some time to debug.
recv() is not cancel-safe because of self.current_request.take(). I have to spawn a tokio task just for ZMQ, if I want to use tokio::select {} with ReqSocket. There might be other cases where it's not safe to cancel, and lead to inconsistent data.
#[async_trait]implSocketRecvforReqSocket{asyncfnrecv(&mutself) -> ZmqResult<ZmqMessage>{matchself.current_request.take(){Some(peer_id) => {ifletSome(mut peer) = self.backend.peers.get_mut(&peer_id){let message = peer.recv_queue.next().await;match message {Some(Ok(Message::Message(mut m))) => {assert!(m.len() > 1);assert!(m.pop_front().unwrap().is_empty());// Ensure that we have delimeter as first partOk(m)}Some(Ok(_)) => todo!(),Some(Err(error)) => Err(error.into()),None => Err(ZmqError::NoMessage),}}else{Err(ZmqError::Other("Server disconnected"))}}None => Err(ZmqError::Other("Unable to recv. No request in progress")),}}}
The text was updated successfully, but these errors were encountered:
It took me some time to debug.
recv() is not cancel-safe because of
self.current_request.take()
. I have to spawn a tokio task just for ZMQ, if I want to use tokio::select {} with ReqSocket. There might be other cases where it's not safe to cancel, and lead to inconsistent data.The text was updated successfully, but these errors were encountered: