Skip to content

Commit

Permalink
Implement Agent service (#744)
Browse files Browse the repository at this point in the history
* Remove shutdown_rx from downstream loop

* Improve missing gameserver warning

* Add name as property in warning

* Implement Agent service

* use random port in test

* Add backtrace and nocapture to test

* * Upgrade to 50ms delay. (#752)

---------

Co-authored-by: Mark Mandel <[email protected]>
  • Loading branch information
XAMPPRocky and markmandel authored Jun 15, 2023
1 parent 8c2bc21 commit 57244b4
Show file tree
Hide file tree
Showing 11 changed files with 467 additions and 102 deletions.
2 changes: 1 addition & 1 deletion build/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ test-quilkin: ensure-build-image
docker run --rm $(common_rust_args) \
--entrypoint=cargo $(BUILD_IMAGE_TAG) fmt -- --check
docker run --rm $(common_rust_args) \
--entrypoint=cargo $(BUILD_IMAGE_TAG) test
-e RUST_BACKTRACE=1 --entrypoint=cargo $(BUILD_IMAGE_TAG) test -- --nocapture

# Run tests against the examples
test-examples: ensure-build-image
Expand Down
48 changes: 48 additions & 0 deletions docs/src/services/agent.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Control Plane Relay

| services | ports | Protocol |
|----------|-------|-----------|
| QCMP | 7600 | UDP(IPv4 && IPv6) |

> **Note:** This service is currently in active experimentation and development
so there may be bugs which cause it to be unusable for production, as always
all bug reports are welcome and appreciated.

For multi-cluster integration, Quilkin provides a `agent` service, that can be
deployed to a cluster to act as a beacon for QCMP pings and forward cluster
configuration information to a `relay` service

To view all options for the `agent` subcommand, run:

```shell
$ quilkin agent --help
{{#include ../../../target/quilkin.agent.commands}}
```

## Quickstart
The simplest version of the `agent` service is just running `quilkin agent`,
this will setup just the QCMP service allowing the agent to be pinged for
measuring round-time-trips (RTT).

```
quilkin agent
```

To run an agent with the relay (see [`relay` quickstart](./relay.md#quickstart)
for more information), you just need to specify the relay endpoint with the
`--relay` flag **and** provide a configuration discovery provider such as a
configuration file or Agones.

```
quilkin --admin-adress http://localhost:8001 agent --relay http://localhost:7900 file quilkin.yaml
```

Now if we run cURL on both the relay and the control plane we should see that
they both contain the same set of endpoints.

```bash
# Check Agent
curl localhost:8001/config
# Check Relay
curl localhost:8000/config
```
16 changes: 13 additions & 3 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use tokio::{signal, sync::watch};
use crate::{admin::Mode, Config};

pub use self::{
generate_config_schema::GenerateConfigSchema, manage::Manage, proxy::Proxy, relay::Relay,
agent::Agent, generate_config_schema::GenerateConfigSchema, manage::Manage, proxy::Proxy,
relay::Relay,
};

macro_rules! define_port {
Expand All @@ -38,6 +39,7 @@ macro_rules! define_port {
};
}

pub mod agent;
pub mod generate_config_schema;
pub mod manage;
pub mod proxy;
Expand Down Expand Up @@ -70,16 +72,17 @@ pub struct Cli {
/// The various Quilkin commands.
#[derive(Clone, Debug, clap::Subcommand)]
pub enum Commands {
Proxy(Proxy),
Agent(Agent),
GenerateConfigSchema(GenerateConfigSchema),
Manage(Manage),
Proxy(Proxy),
Relay(Relay),
}

impl Commands {
pub fn admin_mode(&self) -> Option<Mode> {
match self {
Self::Proxy(_) => Some(Mode::Proxy),
Self::Proxy(_) | Self::Agent(_) => Some(Mode::Proxy),
Self::Relay(_) | Self::Manage(_) => Some(Mode::Xds),
Self::GenerateConfigSchema(_) => None,
}
Expand Down Expand Up @@ -148,6 +151,13 @@ impl Cli {
let fut = tryhard::retry_fn({
let shutdown_rx = shutdown_rx.clone();
move || match self.command.clone() {
Commands::Agent(agent) => {
let config = config.clone();
let shutdown_rx = shutdown_rx.clone();
tokio::spawn(
async move { agent.run(config.clone(), shutdown_rx.clone()).await },
)
}
Commands::Proxy(runner) => {
let config = config.clone();
let shutdown_rx = shutdown_rx.clone();
Expand Down
101 changes: 101 additions & 0 deletions src/cli/agent.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use std::sync::Arc;

use crate::config::Config;

define_port!(7600);

/// Runs Quilkin as a relay service that runs a Manager Discovery Service
/// (mDS) for accepting cluster and configuration information from xDS
/// management services, and exposing it as a single merged xDS service for
/// proxy services.
#[derive(clap::Args, Clone, Debug)]
pub struct Agent {
/// Port for QCMP service.
#[clap(short, long, env = "QCMP_PORT", default_value_t = PORT)]
pub qcmp_port: u16,
/// One or more `quilkin relay` endpoints to push configuration changes to.
#[clap(short, long, env = "QUILKIN_MANAGEMENT_SERVER")]
pub relay: Vec<tonic::transport::Endpoint>,
/// The `region` to set in the cluster map for any provider
/// endpoints discovered.
#[clap(long, env = "QUILKIN_REGION")]
pub region: Option<String>,
/// The `zone` in the `region` to set in the cluster map for any provider
/// endpoints discovered.
#[clap(long, env = "QUILKIN_ZONE")]
pub zone: Option<String>,
/// The `sub_zone` in the `zone` in the `region` to set in the cluster map
/// for any provider endpoints discovered.
#[clap(long, env = "QUILKIN_SUB_ZONE")]
pub sub_zone: Option<String>,
/// The configuration source for a management server.
#[clap(subcommand)]
pub provider: Option<crate::config::Providers>,
}

impl Default for Agent {
fn default() -> Self {
Self {
qcmp_port: PORT,
relay: <_>::default(),
region: <_>::default(),
zone: <_>::default(),
sub_zone: <_>::default(),
provider: <_>::default(),
}
}
}

impl Agent {
pub async fn run(
&self,
config: Arc<Config>,
mut shutdown_rx: tokio::sync::watch::Receiver<()>,
) -> crate::Result<()> {
let locality = (self.region.is_some() || self.zone.is_some() || self.sub_zone.is_some())
.then(|| crate::endpoint::Locality {
region: self.region.clone().unwrap_or_default(),
zone: self.zone.clone().unwrap_or_default(),
sub_zone: self.sub_zone.clone().unwrap_or_default(),
});

let _mds_task = if !self.relay.is_empty() {
let _provider_task = match self.provider.as_ref() {
Some(provider) => Some(provider.spawn(config.clone(), locality.clone())),
None => return Err(eyre::eyre!("no configuration provider given")),
};

let task = crate::xds::client::MdsClient::connect(
String::clone(&config.id.load()),
self.relay.clone(),
);

tokio::select! {
result = task => Some(result?.mds_client_stream(config.clone())),
_ = shutdown_rx.changed() => return Ok(()),
}
} else {
tracing::info!("no relay servers given");
None
};

crate::protocol::spawn(self.qcmp_port).await?;
shutdown_rx.changed().await.map_err(From::from)
}
}
20 changes: 4 additions & 16 deletions src/cli/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@
* limitations under the License.
*/

use std::{net::SocketAddr, sync::Arc};
use std::{net::SocketAddr, sync::Arc, time::Duration};

use tokio::{sync::watch, time::Duration};
use tonic::transport::Endpoint;

use crate::{proxy::SessionMap, utils::net, xds::ResourceType, Config, Result};
Expand Down Expand Up @@ -115,7 +114,7 @@ impl Proxy {
None
};

self.run_recv_from(&config, sessions.clone(), shutdown_rx.clone())?;
self.run_recv_from(&config, sessions.clone())?;
tracing::info!("Quilkin is ready");

shutdown_rx
Expand All @@ -138,12 +137,7 @@ impl Proxy {
/// This function also spawns the set of worker tasks responsible for consuming packets
/// off the aforementioned queue and processing them through the filter chain and session
/// pipeline.
fn run_recv_from(
&self,
config: &Arc<Config>,
sessions: SessionMap,
shutdown_rx: watch::Receiver<()>,
) -> Result<()> {
fn run_recv_from(&self, config: &Arc<Config>, sessions: SessionMap) -> Result<()> {
// The number of worker tasks to spawn. Each task gets a dedicated queue to
// consume packets off.
let num_workers = num_cpus::get();
Expand All @@ -155,7 +149,6 @@ impl Proxy {
workers.push(crate::proxy::DownstreamReceiveWorkerConfig {
worker_id,
socket: socket.clone(),
shutdown_rx: shutdown_rx.clone(),
config: config.clone(),
sessions: sessions.clone(),
})
Expand Down Expand Up @@ -315,7 +308,6 @@ mod tests {

let socket = Arc::new(create_socket().await);
let addr = socket.local_addr().unwrap();
let (_shutdown_tx, shutdown_rx) = watch::channel(());
let endpoint = t.open_socket_and_recv_single_packet().await;
let msg = "hello";
let config = Arc::new(Config::default());
Expand All @@ -329,7 +321,6 @@ mod tests {
socket: socket.clone(),
config,
sessions: <_>::default(),
shutdown_rx,
}
.spawn();

Expand All @@ -348,7 +339,6 @@ mod tests {
#[tokio::test]
async fn run_recv_from() {
let t = TestHelper::default();
let (_shutdown_tx, shutdown_rx) = watch::channel(());

let msg = "hello";
let endpoint = t.open_socket_and_recv_single_packet().await;
Expand All @@ -363,9 +353,7 @@ mod tests {
clusters.insert_default(vec![endpoint.socket.local_addr().unwrap()])
});

proxy
.run_recv_from(&config, <_>::default(), shutdown_rx)
.unwrap();
proxy.run_recv_from(&config, <_>::default()).unwrap();

let socket = create_socket().await;
socket.send_to(msg.as_bytes(), &local_addr).await.unwrap();
Expand Down
13 changes: 6 additions & 7 deletions src/config/providers/k8s.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,7 @@ pub fn update_endpoints_from_gameservers(
}

Event::Deleted(server) => {
let found = if let Some(status) = &server.status {
let port = status.ports.as_ref()
.and_then(|ports| ports.first().map(|status| status.port))
.unwrap_or_default();

let endpoint = Endpoint::from((status.address.clone(), port));
let found = if let Some(endpoint) = server.endpoint() {
config.clusters.value().remove_endpoint(&endpoint)
} else {
config.clusters.value().remove_endpoint_if(|endpoint| {
Expand All @@ -159,7 +154,11 @@ pub fn update_endpoints_from_gameservers(
};

if found.is_none() {
tracing::warn!(?server, "received unknown gameserver to delete from k8s");
tracing::warn!(
endpoint=%serde_json::to_value(server.endpoint()).unwrap(),
name=%serde_json::to_value(server.metadata.name).unwrap(),
"received unknown gameserver to delete from k8s"
);
}
}
};
Expand Down
Loading

0 comments on commit 57244b4

Please sign in to comment.