Skip to content

Commit

Permalink
[rust] check route equality and keep existing route in case failure t…
Browse files Browse the repository at this point in the history
…o query route from remote (#710)
  • Loading branch information
glcrazier authored Apr 1, 2024
1 parent 79fcd01 commit 0149785
Showing 1 changed file with 83 additions and 5 deletions.
88 changes: 83 additions & 5 deletions rust/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,10 +397,17 @@ where
"query route for topic={} success: route={:?}", topic, route
);
let route = Arc::new(route);
let prev = self
.route_table
.lock()
.insert(topic.to_owned(), RouteStatus::Found(Arc::clone(&route)));
let mut route_table_lock = self.route_table.lock();

// if message queues in previous and new route are the same, just keep the previous.
if let Some(RouteStatus::Found(prev)) = route_table_lock.get(topic) {
if prev.queue == route.queue {
return Ok(Arc::clone(prev));
}
}

let prev =
route_table_lock.insert(topic.to_owned(), RouteStatus::Found(Arc::clone(&route)));
info!(self.logger, "update route for topic={}", topic);

if let Some(RouteStatus::Querying(Some(mut v))) = prev {
Expand All @@ -415,7 +422,12 @@ where
self.logger,
"query route for topic={} failed: error={}", topic, err
);
let prev = self.route_table.lock().remove(topic);
let mut route_table_lock = self.route_table.lock();
// keep the existing route if error occurs.
if let Some(RouteStatus::Found(prev)) = route_table_lock.get(topic) {
return Ok(Arc::clone(prev));
}
let prev = route_table_lock.remove(topic);
if let Some(RouteStatus::Querying(Some(mut v))) = prev {
for item in v.drain(..) {
let _ = item.send(Err(ClientError::new(
Expand Down Expand Up @@ -925,6 +937,72 @@ pub(crate) mod tests {
awaitility::at_most(Duration::from_secs(1)).until(|| handle.is_finished());
}

#[tokio::test]
async fn client_query_existing_route_with_failed_request() {
let client = new_client_for_test();
let message_queues = if let Ok(QueryRouteResponse {
status: _,
message_queues,
}) = new_topic_route_response()
{
message_queues
} else {
vec![]
};
client.route_table.lock().insert(
"DefaultCluster".to_string(),
RouteStatus::Found(Arc::new(Route {
index: AtomicUsize::new(0),
queue: message_queues,
})),
);

let mut mock = session::MockRPCClient::new();
mock.expect_query_route().return_once(|_| {
sleep(Duration::from_millis(200));
Box::pin(futures::future::ready(Err(ClientError::new(
ErrorKind::Server,
"server error",
"test",
))))
});

let result = client.topic_route_inner(mock, "DefaultCluster").await;
assert!(result.is_ok());
}

#[tokio::test]
async fn client_update_same_route() {
let client = new_client_for_test();

let mut mock = session::MockRPCClient::new();
mock.expect_query_route()
.return_once(|_| Box::pin(futures::future::ready(new_topic_route_response())));

let result = client.topic_route_inner(mock, "DefaultCluster").await;
assert!(result.is_ok());

let route = result.unwrap();
assert!(!route.queue.is_empty());
route.index.fetch_add(1, Ordering::Relaxed);

let topic = &route.queue[0].topic;
assert!(topic.is_some());

let topic = topic.clone().unwrap();
assert_eq!(topic.name, "DefaultCluster");
assert_eq!(topic.resource_namespace, "default");

mock = session::MockRPCClient::new();
mock.expect_query_route()
.return_once(|_| Box::pin(futures::future::ready(new_topic_route_response())));

let result2 = client.topic_route_inner(mock, "DefaultCluster").await;
assert!(result2.is_ok());

let route2 = result2.unwrap();
assert_eq!(1, route2.index.load(Ordering::Relaxed));
}
#[tokio::test]
async fn client_heartbeat() {
let response = Ok(HeartbeatResponse {
Expand Down

0 comments on commit 0149785

Please sign in to comment.