Skip to content

Commit

Permalink
test: Cleanup outbound_balancer_waits_for_ready_endpoint (#2529)
Browse files Browse the repository at this point in the history
The outbound_balancer_waits_for_ready_endpoint test doesn't reliably
demonstrate the behavior it is trying to test: we setup two identical
endpoints, so our tests can't be sure that the response is coming from
the correct endpoint.

This change updates this test to provide backends that return varied
responses so we can ensure the desired behavior.

This change also improves the test's logging. The discovery test
infrastructure is updated to allow overriding the logical service
address. This aids disambiguating addresses in logs.
  • Loading branch information
olix0r authored Nov 20, 2023
1 parent 8ce2df4 commit 9c56e91
Showing 1 changed file with 53 additions and 44 deletions.
97 changes: 53 additions & 44 deletions linkerd/app/integration/src/tests/discovery.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::*;
use linkerd2_proxy_api as pb;
use tokio::time::sleep;

const HOST: &str = "disco.test.svc.cluster.local";

Expand Down Expand Up @@ -51,7 +52,7 @@ mod cross_version {

let dstctl = controller::new();
let polctl = controller::policy();
let _txs = send_default_dst(&dstctl, &polctl, &srv);
let _txs = send_default_dst(&dstctl, &polctl, &srv, None);

let proxy = proxy::new()
.controller(dstctl.run().await)
Expand All @@ -77,7 +78,7 @@ mod cross_version {

let dstctl = controller::new();
let polctl = controller::policy();
let (_profile, _policy, _dst) = send_default_dst(&dstctl, &polctl, &srv);
let (_profile, _policy, _dst) = send_default_dst(&dstctl, &polctl, &srv, None);
dstctl.no_more_destinations();

let proxy = proxy::new()
Expand Down Expand Up @@ -106,11 +107,11 @@ mod cross_version {

let dstctl = controller::new();
let polctl = controller::policy();
let (_profile, _policy, dst) = send_default_dst(&dstctl, &polctl, &srv);
let (_profile, _policy, dst) = send_default_dst(&dstctl, &polctl, &srv, None);

drop(dst);

let dst = dstctl.destination_tx(default_dst_name(&srv));
let dst = dstctl.destination_tx(default_dst_name(srv.addr.port()));
dst.send_addr(srv.addr);

let proxy = proxy::new()
Expand Down Expand Up @@ -149,7 +150,7 @@ mod cross_version {

let dstctl = controller::new();
let polctl = controller::policy();
let (_profile, _policy, dst) = send_default_dst(&dstctl, &polctl, &srv);
let (_profile, _policy, dst) = send_default_dst(&dstctl, &polctl, &srv, None);

dst.send(up);

Expand Down Expand Up @@ -294,7 +295,7 @@ mod cross_version {
dst_tx1.send(up);

// Wait for the reconnect to happen. TODO: Replace this flaky logic.
tokio::time::sleep(Duration::from_millis(1000)).await;
sleep(Duration::from_secs(1)).await;

let rsp = initially_exists
.request(initially_exists.request_builder("/"))
Expand All @@ -314,11 +315,12 @@ mod cross_version {
let dstctl = controller::new();
let _profile = dstctl.profile_tx_default(srv.addr, HOST);

let polctl = controller::policy().outbound_default(srv.addr, default_dst_name(&srv));
let polctl =
controller::policy().outbound_default(srv.addr, default_dst_name(srv.addr.port()));

// when the proxy requests the destination, don't respond.
let _dst_tx = dstctl.destination_tx(default_dst_name(&srv));
let _txs = send_default_dst(&dstctl, &polctl, &srv);
let _dst_tx = dstctl.destination_tx(default_dst_name(srv.addr.port()));
let _txs = send_default_dst(&dstctl, &polctl, &srv, None);

let proxy = proxy::new()
.controller(dstctl.run().await)
Expand Down Expand Up @@ -347,7 +349,7 @@ mod cross_version {

let dstctl = controller::new();
let polctl = controller::policy();
let _txs = send_default_dst(&dstctl, &polctl, &srv);
let _txs = send_default_dst(&dstctl, &polctl, &srv, None);

let proxy = proxy::new()
.controller(
Expand All @@ -362,12 +364,12 @@ mod cross_version {
.await;

// Allow the control client to notice a connection error
tokio::time::sleep(Duration::from_millis(500)).await;
sleep(Duration::from_millis(500)).await;

// Allow our controller to start accepting connections,
// and then wait a little bit so the client tries again.
drop(tx);
tokio::time::sleep(Duration::from_millis(500)).await;
sleep(Duration::from_millis(500)).await;

let client = (test.mk_client)(&proxy, HOST);

Expand All @@ -378,28 +380,30 @@ mod cross_version {
}
}

fn default_dst_name(srv: &server::Listening) -> String {
format!("{}:{}", HOST, srv.addr.port())
fn default_dst_name(port: u16) -> String {
format!("{}:{}", HOST, port)
}

fn send_default_dst(
dstctl: &controller::Controller,
polctl: &policy::Controller,
srv: &server::Listening,
svc: Option<SocketAddr>,
) -> (
controller::ProfileSender,
policy::OutboundSender,
controller::DstSender,
) {
let addr = srv.addr;
let name = default_dst_name(srv);
tracing::info!("Configuring resolution for {addr} {name}");
let svc_addr = svc.unwrap_or(srv.addr);
let srv_addr = srv.addr;
let name = default_dst_name(svc_addr.port());
tracing::info!("Configuring resolution for {svc_addr} {name}");

let policy = polctl.outbound_tx_default(addr, name.clone());
let profile = dstctl.profile_tx_default(addr, HOST);
let policy = polctl.outbound_tx_default(svc_addr, name.clone());
let profile = dstctl.profile_tx_default(svc_addr, HOST);

let dst = dstctl.destination_tx(name);
dst.send_addr(addr);
dst.send_addr(srv_addr);

(profile, policy, dst)
}
Expand All @@ -422,66 +426,71 @@ mod http2 {
outbound_error_reconnects_after_backoff
}

#[tokio::test]
/// See https://github.com/linkerd/linkerd2/issues/2550
#[tokio::test(flavor = "current_thread")]
async fn outbound_balancer_waits_for_ready_endpoint() {
// See https://github.com/linkerd/linkerd2/issues/2550
let _t = trace_init();

let srv1 = server::http2()
.route("/", "hello")
.route("/bye", "bye")
.run()
.await;

let srv2 = server::http2()
.route("/", "hello")
.route("/bye", "bye")
.run()
.await;
let srv1_addr = srv1.addr;
let svc_addr = SocketAddr::from(([127, 0, 0, 1], 8080));
let alpha = server::http2().route("/", "alpha").run().await;
let beta = server::http2().route("/", "beta").run().await;

// Start with the first server.
let dstctl = controller::new();
let polctl = controller::policy();
let (_profile, _policy, dst) = send_default_dst(&dstctl, &polctl, &srv1);
let (_profile, _policy, dst) = send_default_dst(&dstctl, &polctl, &alpha, Some(svc_addr));

let proxy = proxy::new()
.outbound_ip(srv1.addr)
.outbound_ip(svc_addr)
.controller(dstctl.run().await)
.policy(polctl.run().await)
.run()
.await;
let client = client::http2(proxy.outbound, HOST);
let metrics = client::http1(proxy.admin, "localhost");

assert_eq!(client.get("/").await, "hello");
assert_eq!(client.get("/").await, "alpha");

// Simulate the first server falling over without discovery
// knowing about it...
srv1.join().await;
tokio::task::yield_now().await;
tracing::info!(%alpha.addr, "Stopping");
let alpha_addr = alpha.addr;
alpha.join().await;

// Wait until the proxy has seen the `srv1` disconnect...
// Wait until the proxy has seen the `alpha` disconnect...
metrics::metric("tcp_close_total")
.label("peer", "dst")
.label("direction", "outbound")
.label("target_addr", srv1_addr.to_string())
.label("target_addr", alpha_addr.to_string())
.value(1u64)
.assert_in(&metrics)
.await;
tracing::info!("Connection closed");

// Start a new request to the destination, now that the server is dead.
// This request should be waiting at the balancer for a ready endpoint.
//
// The only one it knows about is dead, so it won't have progressed.
let fut = client.request(client.request_builder("/bye"));
tracing::info!("Sending request");
let fut = client.request(client.request_builder("/"));

// When we tell the balancer about a new endpoint, it should have added
// it and then dispatched the request...
dst.send_addr(srv2.addr);
tracing::info!(%beta.addr, "Adding");
dst.send_addr(beta.addr);

let res = fut.await.expect("/bye response");
let res = fut.await.expect("beta response");
assert_eq!(res.status(), http::StatusCode::OK);
assert_eq!(
String::from_utf8(
hyper::body::to_bytes(res.into_body())
.await
.unwrap()
.to_vec(),
)
.unwrap(),
"beta"
);
}
}

Expand Down

0 comments on commit 9c56e91

Please sign in to comment.