-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ensure we reconnect on failure #173
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -261,13 +261,17 @@ impl Client { | |
} | ||
// wait for at least one endpoint to connect | ||
futures::future::select_all(endpoints.iter().map(|x| x.connected().boxed())).await; | ||
// Sort by health score | ||
endpoints.sort_by_key(|endpoint| std::cmp::Reverse(endpoint.health().score())); | ||
// Pick the first one | ||
endpoints[0].clone() | ||
|
||
endpoints | ||
.iter() | ||
.max_by_key(|endpoint| endpoint.health().score()) | ||
.expect("No endpoints") | ||
.clone() | ||
}; | ||
|
||
let mut selected_endpoint = healthiest_endpoint(None).await; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is important. it ensures at least one endpoint is connected. selecting just the first one may result on endpoint connection failure and never connects so There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in that case we need a test. the current behaviour makes unit test non-deterministic as it may connect any of the dummy server so it is best to fix the waiting for connect behaviour anyway |
||
let mut selected_endpoint = endpoints[0].clone(); | ||
|
||
selected_endpoint.connected().await; | ||
|
||
let handle_message = |message: Message, endpoint: Arc<Endpoint>, rotation_notify: Arc<Notify>| { | ||
let tx = message_tx_bg.clone(); | ||
|
@@ -422,6 +426,10 @@ impl Client { | |
_ = selected_endpoint.health().unhealthy() => { | ||
// Current selected endpoint is unhealthy, try to rotate to another one. | ||
// In case of all endpoints are unhealthy, we don't want to keep rotating but stick with the healthiest one. | ||
|
||
// The ws client maybe in a state that requires a reconnect | ||
selected_endpoint.reconnect().await; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. when will execute the moment endpoint becomes unhealthy and when that happens it will try to reconnect. I don't think this extra reconnect will help There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there is not reconnect currently. we have to drop and re-create the client to actually reconnect. currently it will always fail if the remote drops connection and can never be able to connect to it anymore |
||
|
||
let new_selected_endpoint = healthiest_endpoint(None).await; | ||
if new_selected_endpoint.url() != selected_endpoint.url() { | ||
tracing::warn!("Switch to endpoint: {new_url}", new_url=new_selected_endpoint.url()); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -290,3 +290,46 @@ async fn health_check_works() { | |
handle1.stop().unwrap(); | ||
handle2.stop().unwrap(); | ||
} | ||
|
||
#[tokio::test] | ||
async fn reconnect_on_disconnect() { | ||
let (addr1, handle1, mut rx1, _) = dummy_server().await; | ||
let (addr2, handle2, mut rx2, _) = dummy_server().await; | ||
|
||
let client = Client::new( | ||
[format!("ws://{addr1}"), format!("ws://{addr2}")], | ||
Some(Duration::from_millis(100)), | ||
None, | ||
Some(2), | ||
None, | ||
) | ||
.unwrap(); | ||
|
||
let h1 = tokio::spawn(async move { | ||
let _req = rx1.recv().await.unwrap(); | ||
// no response, let it timeout | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. a request timeout will make endpoint unhealthy therefor it will try to reconnect itself |
||
tokio::time::sleep(Duration::from_millis(200)).await; | ||
}); | ||
|
||
let h2 = tokio::spawn(async move { | ||
let req = rx2.recv().await.unwrap(); | ||
req.respond(json!(1)); | ||
}); | ||
|
||
let h3 = tokio::spawn(async move { | ||
let res = client.request("mock_rpc", vec![]).await; | ||
assert_eq!(res.unwrap(), json!(1)); | ||
|
||
tokio::time::sleep(Duration::from_millis(2000)).await; | ||
|
||
assert_eq!(client.endpoints()[0].connect_counter(), 2); | ||
assert_eq!(client.endpoints()[1].connect_counter(), 1); | ||
}); | ||
|
||
h3.await.unwrap(); | ||
h1.await.unwrap(); | ||
h2.await.unwrap(); | ||
|
||
handle1.stop().unwrap(); | ||
handle2.stop().unwrap(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tokio::sync::Notify
may be a better optionThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
notify is one off thing but we may need to reconnect multiple times