From 9c56e91cc964bf775c69db0c3c1eddc92dafa185 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Mon, 20 Nov 2023 13:57:33 -0800 Subject: [PATCH] test: Cleanup outbound_balancer_waits_for_ready_endpoint (#2529) The outbound_balancer_waits_for_ready_endpoint test doesn't reliably demonstrate the behavior it is trying to test: we setup two identical endpoints, so our tests can't be sure that the response is coming from the correct endpoint. This change updates this test to provide backends that return varied responses so we can ensure the desired behavior. This change also improves the test's logging. The discovery test infrastructure is updated to allow overriding the logical service address. This aids disambiguating addresses in logs. --- .../app/integration/src/tests/discovery.rs | 97 ++++++++++--------- 1 file changed, 53 insertions(+), 44 deletions(-) diff --git a/linkerd/app/integration/src/tests/discovery.rs b/linkerd/app/integration/src/tests/discovery.rs index 6ea36b57c2..67caf75c73 100644 --- a/linkerd/app/integration/src/tests/discovery.rs +++ b/linkerd/app/integration/src/tests/discovery.rs @@ -1,5 +1,6 @@ use crate::*; use linkerd2_proxy_api as pb; +use tokio::time::sleep; const HOST: &str = "disco.test.svc.cluster.local"; @@ -51,7 +52,7 @@ mod cross_version { let dstctl = controller::new(); let polctl = controller::policy(); - let _txs = send_default_dst(&dstctl, &polctl, &srv); + let _txs = send_default_dst(&dstctl, &polctl, &srv, None); let proxy = proxy::new() .controller(dstctl.run().await) @@ -77,7 +78,7 @@ mod cross_version { let dstctl = controller::new(); let polctl = controller::policy(); - let (_profile, _policy, _dst) = send_default_dst(&dstctl, &polctl, &srv); + let (_profile, _policy, _dst) = send_default_dst(&dstctl, &polctl, &srv, None); dstctl.no_more_destinations(); let proxy = proxy::new() @@ -106,11 +107,11 @@ mod cross_version { let dstctl = controller::new(); let polctl = controller::policy(); - let (_profile, _policy, dst) = send_default_dst(&dstctl, &polctl, &srv); + let (_profile, _policy, dst) = send_default_dst(&dstctl, &polctl, &srv, None); drop(dst); - let dst = dstctl.destination_tx(default_dst_name(&srv)); + let dst = dstctl.destination_tx(default_dst_name(srv.addr.port())); dst.send_addr(srv.addr); let proxy = proxy::new() @@ -149,7 +150,7 @@ mod cross_version { let dstctl = controller::new(); let polctl = controller::policy(); - let (_profile, _policy, dst) = send_default_dst(&dstctl, &polctl, &srv); + let (_profile, _policy, dst) = send_default_dst(&dstctl, &polctl, &srv, None); dst.send(up); @@ -294,7 +295,7 @@ mod cross_version { dst_tx1.send(up); // Wait for the reconnect to happen. TODO: Replace this flaky logic. - tokio::time::sleep(Duration::from_millis(1000)).await; + sleep(Duration::from_secs(1)).await; let rsp = initially_exists .request(initially_exists.request_builder("/")) @@ -314,11 +315,12 @@ mod cross_version { let dstctl = controller::new(); let _profile = dstctl.profile_tx_default(srv.addr, HOST); - let polctl = controller::policy().outbound_default(srv.addr, default_dst_name(&srv)); + let polctl = + controller::policy().outbound_default(srv.addr, default_dst_name(srv.addr.port())); // when the proxy requests the destination, don't respond. - let _dst_tx = dstctl.destination_tx(default_dst_name(&srv)); - let _txs = send_default_dst(&dstctl, &polctl, &srv); + let _dst_tx = dstctl.destination_tx(default_dst_name(srv.addr.port())); + let _txs = send_default_dst(&dstctl, &polctl, &srv, None); let proxy = proxy::new() .controller(dstctl.run().await) @@ -347,7 +349,7 @@ mod cross_version { let dstctl = controller::new(); let polctl = controller::policy(); - let _txs = send_default_dst(&dstctl, &polctl, &srv); + let _txs = send_default_dst(&dstctl, &polctl, &srv, None); let proxy = proxy::new() .controller( @@ -362,12 +364,12 @@ mod cross_version { .await; // Allow the control client to notice a connection error - tokio::time::sleep(Duration::from_millis(500)).await; + sleep(Duration::from_millis(500)).await; // Allow our controller to start accepting connections, // and then wait a little bit so the client tries again. drop(tx); - tokio::time::sleep(Duration::from_millis(500)).await; + sleep(Duration::from_millis(500)).await; let client = (test.mk_client)(&proxy, HOST); @@ -378,28 +380,30 @@ mod cross_version { } } -fn default_dst_name(srv: &server::Listening) -> String { - format!("{}:{}", HOST, srv.addr.port()) +fn default_dst_name(port: u16) -> String { + format!("{}:{}", HOST, port) } fn send_default_dst( dstctl: &controller::Controller, polctl: &policy::Controller, srv: &server::Listening, + svc: Option, ) -> ( controller::ProfileSender, policy::OutboundSender, controller::DstSender, ) { - let addr = srv.addr; - let name = default_dst_name(srv); - tracing::info!("Configuring resolution for {addr} {name}"); + let svc_addr = svc.unwrap_or(srv.addr); + let srv_addr = srv.addr; + let name = default_dst_name(svc_addr.port()); + tracing::info!("Configuring resolution for {svc_addr} {name}"); - let policy = polctl.outbound_tx_default(addr, name.clone()); - let profile = dstctl.profile_tx_default(addr, HOST); + let policy = polctl.outbound_tx_default(svc_addr, name.clone()); + let profile = dstctl.profile_tx_default(svc_addr, HOST); let dst = dstctl.destination_tx(name); - dst.send_addr(addr); + dst.send_addr(srv_addr); (profile, policy, dst) } @@ -422,31 +426,22 @@ mod http2 { outbound_error_reconnects_after_backoff } - #[tokio::test] + /// See https://github.com/linkerd/linkerd2/issues/2550 + #[tokio::test(flavor = "current_thread")] async fn outbound_balancer_waits_for_ready_endpoint() { - // See https://github.com/linkerd/linkerd2/issues/2550 let _t = trace_init(); - let srv1 = server::http2() - .route("/", "hello") - .route("/bye", "bye") - .run() - .await; - - let srv2 = server::http2() - .route("/", "hello") - .route("/bye", "bye") - .run() - .await; - let srv1_addr = srv1.addr; + let svc_addr = SocketAddr::from(([127, 0, 0, 1], 8080)); + let alpha = server::http2().route("/", "alpha").run().await; + let beta = server::http2().route("/", "beta").run().await; // Start with the first server. let dstctl = controller::new(); let polctl = controller::policy(); - let (_profile, _policy, dst) = send_default_dst(&dstctl, &polctl, &srv1); + let (_profile, _policy, dst) = send_default_dst(&dstctl, &polctl, &alpha, Some(svc_addr)); let proxy = proxy::new() - .outbound_ip(srv1.addr) + .outbound_ip(svc_addr) .controller(dstctl.run().await) .policy(polctl.run().await) .run() @@ -454,34 +449,48 @@ mod http2 { let client = client::http2(proxy.outbound, HOST); let metrics = client::http1(proxy.admin, "localhost"); - assert_eq!(client.get("/").await, "hello"); + assert_eq!(client.get("/").await, "alpha"); // Simulate the first server falling over without discovery // knowing about it... - srv1.join().await; - tokio::task::yield_now().await; + tracing::info!(%alpha.addr, "Stopping"); + let alpha_addr = alpha.addr; + alpha.join().await; - // Wait until the proxy has seen the `srv1` disconnect... + // Wait until the proxy has seen the `alpha` disconnect... metrics::metric("tcp_close_total") .label("peer", "dst") .label("direction", "outbound") - .label("target_addr", srv1_addr.to_string()) + .label("target_addr", alpha_addr.to_string()) .value(1u64) .assert_in(&metrics) .await; + tracing::info!("Connection closed"); // Start a new request to the destination, now that the server is dead. // This request should be waiting at the balancer for a ready endpoint. // // The only one it knows about is dead, so it won't have progressed. - let fut = client.request(client.request_builder("/bye")); + tracing::info!("Sending request"); + let fut = client.request(client.request_builder("/")); // When we tell the balancer about a new endpoint, it should have added // it and then dispatched the request... - dst.send_addr(srv2.addr); + tracing::info!(%beta.addr, "Adding"); + dst.send_addr(beta.addr); - let res = fut.await.expect("/bye response"); + let res = fut.await.expect("beta response"); assert_eq!(res.status(), http::StatusCode::OK); + assert_eq!( + String::from_utf8( + hyper::body::to_bytes(res.into_body()) + .await + .unwrap() + .to_vec(), + ) + .unwrap(), + "beta" + ); } }