Skip to content

Commit

Permalink
Refactor ClusterMap
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky committed Sep 10, 2023
1 parent d653025 commit af7ec10
Show file tree
Hide file tree
Showing 34 changed files with 526 additions and 934 deletions.
2 changes: 1 addition & 1 deletion benches/throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ fn run_quilkin(port: u16, endpoint: SocketAddr) {
let runtime = tokio::runtime::Runtime::new().unwrap();
let config = Arc::new(quilkin::Config::default());
config.clusters.modify(|clusters| {
clusters.insert_default(vec![quilkin::endpoint::Endpoint::new(endpoint.into())])
clusters.insert_default([quilkin::endpoint::Endpoint::new(endpoint.into())].into())
});

let proxy = quilkin::cli::Proxy {
Expand Down
1 change: 1 addition & 0 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"proto/data-plane-api/envoy/type/metadata/v3/metadata.proto",
"proto/data-plane-api/envoy/type/tracing/v3/custom_tag.proto",
"proto/quilkin/relay/v1alpha1/relay.proto",
"proto/quilkin/config/v1alpha1/config.proto",
"proto/quilkin/filters/capture/v1alpha1/capture.proto",
"proto/quilkin/filters/compress/v1alpha1/compress.proto",
"proto/quilkin/filters/concatenate_bytes/v1alpha1/concatenate_bytes.proto",
Expand Down
41 changes: 41 additions & 0 deletions proto/quilkin/config/v1alpha1/config.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto3";

import "google/protobuf/struct.proto";

package quilkin.config.v1alpha1;

message ClusterMap {
repeated Cluster clusters = 1;
}

message Cluster {
Locality locality = 1;
repeated Endpoint endpoints = 2;
}

message Locality {
string region = 1;
string zone = 2;
string sub_zone = 3;
}

message Endpoint {
string host = 1;
uint32 port = 2;
google.protobuf.Struct metadata = 3;
}
10 changes: 4 additions & 6 deletions src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,10 @@ mod tests {
let response = super::check_proxy_readiness(&config);
assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);

let cluster = crate::cluster::Cluster::new_default(vec![vec![Endpoint::new(
(std::net::Ipv4Addr::LOCALHOST, 25999).into(),
)]
.into()]);

config.clusters.write().insert(cluster);
config
.clusters
.write()
.insert_default([Endpoint::new((std::net::Ipv4Addr::LOCALHOST, 25999).into())].into());

let response = super::check_proxy_readiness(&config);
assert_eq!(response.status(), StatusCode::OK);
Expand Down
33 changes: 16 additions & 17 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,13 @@ mod tests {

use crate::{
config::{Filter, Providers},
endpoint::{Endpoint, LocalityEndpoints},
endpoint::Endpoint,
filters::{Capture, StaticFilter, TokenRouter},
};

#[tokio::test]
async fn relay_routing() {
crate::test_utils::enable_log("quilkin=trace");
let server_port = crate::test_utils::available_addr().await.port();
let server_socket = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, server_port))
.await
Expand Down Expand Up @@ -281,16 +282,15 @@ mod tests {
let endpoints_file = tempfile::NamedTempFile::new().unwrap();
let config = Config::default();
std::fs::write(endpoints_file.path(), {
config
.clusters
.write()
.default_cluster_mut()
.insert(LocalityEndpoints::from(vec![Endpoint::with_metadata(
config.clusters.write().insert_default(
[Endpoint::with_metadata(
(std::net::Ipv4Addr::LOCALHOST, server_port).into(),
crate::endpoint::Metadata {
tokens: vec!["abc".into()].into_iter().collect(),
},
)]));
)]
.into(),
);
serde_yaml::to_string(&config).unwrap()
})
.unwrap();
Expand All @@ -315,15 +315,15 @@ mod tests {
quiet: true,
admin_address: Some((Ipv4Addr::LOCALHOST, control_plane_admin_port).into()),
config: <_>::default(),
command: Commands::Manage(Manage {
command: Commands::Agent(Agent {
relay: vec!["http://localhost:7900".parse().unwrap()],
port: 7801,
region: None,
sub_zone: None,
zone: None,
provider: Providers::File {
qcmp_port: crate::test_utils::available_addr().await.port(),
provider: Some(Providers::File {
path: endpoints_file.path().to_path_buf(),
},
}),
}),
};

Expand Down Expand Up @@ -356,16 +356,15 @@ mod tests {

tracing::info!(?token, "writing new config");
std::fs::write(endpoints_file.path(), {
config
.clusters
.write()
.default_cluster_mut()
.insert(LocalityEndpoints::from(vec![Endpoint::with_metadata(
config.clusters.write().insert_default(
[Endpoint::with_metadata(
(std::net::Ipv4Addr::LOCALHOST, server_port).into(),
crate::endpoint::Metadata {
tokens: vec![token.clone()].into_iter().collect(),
},
)]));
)]
.into(),
);
serde_yaml::to_string(&config).unwrap()
})
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/cli/manage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl Manage {
if let Some(locality) = &locality {
config
.clusters
.modify(|map| map.update_unlocated_endpoints(locality));
.modify(|map| map.update_unlocated_endpoints(locality.clone()));
}

let provider_task = self.provider.spawn(config.clone(), locality.clone());
Expand Down
45 changes: 29 additions & 16 deletions src/cli/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,14 @@ impl Proxy {

if !self.to.is_empty() {
config.clusters.modify(|clusters| {
clusters.default_cluster_mut().localities = vec![self.to.clone().into()].into();
clusters.insert(
None,
self.to
.iter()
.cloned()
.map(crate::endpoint::Endpoint::from)
.collect(),
);
});
}

Expand All @@ -109,9 +116,7 @@ impl Proxy {
let mut stream = client.xds_client_stream(config.clone());

tokio::time::sleep(std::time::Duration::from_nanos(1)).await;
stream
.discovery_request(ResourceType::Endpoint, &[])
.await?;
stream.discovery_request(ResourceType::Cluster, &[]).await?;
tokio::time::sleep(std::time::Duration::from_nanos(1)).await;
stream
.discovery_request(ResourceType::Listener, &[])
Expand Down Expand Up @@ -199,10 +204,13 @@ mod tests {

let config = Arc::new(crate::Config::default());
config.clusters.modify(|clusters| {
clusters.insert_default(vec![
Endpoint::new(endpoint1.socket.local_addr().unwrap().into()),
Endpoint::new(endpoint2.socket.local_addr().unwrap().into()),
])
clusters.insert_default(
[
Endpoint::new(endpoint1.socket.local_addr().unwrap().into()),
Endpoint::new(endpoint2.socket.local_addr().unwrap().into()),
]
.into(),
);
});

t.run_server(config, proxy, None);
Expand Down Expand Up @@ -242,9 +250,9 @@ mod tests {
};
let config = Arc::new(Config::default());
config.clusters.modify(|clusters| {
clusters.insert_default(vec![Endpoint::new(
endpoint.socket.local_addr().unwrap().into(),
)])
clusters.insert_default(
[Endpoint::new(endpoint.socket.local_addr().unwrap().into())].into(),
);
});
t.run_server(config, proxy, None);

Expand Down Expand Up @@ -281,9 +289,9 @@ mod tests {
.unwrap(),
);
config.clusters.modify(|clusters| {
clusters.insert_default(vec![Endpoint::new(
endpoint.socket.local_addr().unwrap().into(),
)])
clusters.insert_default(
[Endpoint::new(endpoint.socket.local_addr().unwrap().into())].into(),
);
});
t.run_server(
config,
Expand Down Expand Up @@ -320,7 +328,7 @@ mod tests {
let msg = "hello";
let config = Arc::new(Config::default());
config.clusters.modify(|clusters| {
clusters.insert_default(vec![endpoint.socket.local_addr().unwrap()])
clusters.insert_default([endpoint.socket.local_addr().unwrap().into()].into())
});

// we'll test a single DownstreamReceiveWorkerConfig
Expand Down Expand Up @@ -358,7 +366,12 @@ mod tests {

let config = Arc::new(crate::Config::default());
config.clusters.modify(|clusters| {
clusters.insert_default(vec![endpoint.socket.local_addr().unwrap()])
clusters.insert_default(
[crate::endpoint::Endpoint::from(
endpoint.socket.local_addr().unwrap(),
)]
.into(),
)
});

proxy.run_recv_from(&config, <_>::default()).unwrap();
Expand Down
Loading

0 comments on commit af7ec10

Please sign in to comment.