Skip to content

Commit

Permalink
Merge branch 'main' into ep/retain-endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky authored Sep 7, 2023
2 parents f67a844 + d653025 commit f736060
Show file tree
Hide file tree
Showing 13 changed files with 154 additions and 119 deletions.
66 changes: 33 additions & 33 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,71 +54,71 @@ async-stream = "0.3.5"
base64.workspace = true
base64-serde = "0.7.0"
bytes = { version = "1.4.0", features = ["serde"] }
cached = "0.43.0"
chrono = "0.4.24"
clap = { version = "4.3.0", features = ["cargo", "derive", "env"] }
dashmap = { version = "5.4.0", features = ["serde"] }
cached = "0.45.0"
chrono = "0.4.28"
clap = { version = "4.4.2", features = ["cargo", "derive", "env"] }
dashmap = { version = "5.5.3", features = ["serde"] }
dirs2 = "3.0.1"
either = "1.8.1"
enum-map = "2.5.0"
either = "1.9.0"
enum-map = "2.6.1"
eyre = "0.6.8"
futures.workspace = true
hyper = { version = "0.14.26", features = ["http2"] }
hyper-rustls = { version = "0.24.0", features = ["http2", "webpki-roots"] }
hyper = { version = "0.14.27", features = ["http2"] }
hyper-rustls = { version = "0.24.1", features = ["http2", "webpki-roots"] }
ipnetwork = "0.20.0"
k8s-openapi.workspace = true
maxminddb = "0.23.0"
notify = "6.0.0"
num_cpus = "1.15.0"
once_cell = "1.17.1"
notify = "6.1.1"
num_cpus = "1.16.0"
once_cell = "1.18.0"
parking_lot = "0.12.1"
prometheus = { version = "0.13.3", default-features = false }
prost = "0.11.9"
prost-types = "0.11.9"
prost = "0.12.0"
prost-types = "0.12.0"
rand = "0.8.5"
regex = "1.8.2"
schemars = { version = "0.8.12", features = ["chrono", "bytes", "url"] }
serde = { version = "1.0.163", features = ["derive", "rc"] }
serde_json = "1.0.96"
regex = "1.9.5"
schemars = { version = "0.8.13", features = ["chrono", "bytes", "url"] }
serde = { version = "1.0.188", features = ["derive", "rc"] }
serde_json = "1.0.105"
serde_regex = "1.1.0"
serde_stacker = "0.1.8"
serde_yaml = "0.9.21"
serde_stacker = "0.1.10"
serde_yaml = "0.9.25"
snap = "1.1.0"
socket2 = { version = "0.5.3", features = ["all"] }
stable-eyre = "0.2.2"
thiserror = "1.0.40"
thiserror = "1.0.48"
tokio.workspace = true
tokio-stream = { version = "0.1.14", features = ["sync"] }
tonic = "0.9.2"
tonic = "0.10.0"
tracing.workspace = true
tracing-futures = { version = "0.2.5", features = ["futures-03"] }
tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter"] }
tryhard = "0.5.0"
url = { version = "2.3.1", features = ["serde"] }
uuid = { version = "1.3.3", default-features = false, features = ["v4"] }
tryhard = "0.5.1"
url = { version = "2.4.1", features = ["serde"] }
uuid = { version = "1.4.1", default-features = false, features = ["v4"] }
lasso = { version = "0.7.2", features = ["multi-threaded"] }
kube.workspace = true
trust-dns-resolver = { version = "0.23.0", features = ["tokio", "tokio-rustls", "dns-over-https-rustls"] }
async-trait = "0.1.68"
async-trait = "0.1.73"
nom = "7.1.3"
itertools = "0.11.0"

[target.'cfg(target_os = "linux")'.dependencies]
sys-info = "0.9.1"

[dev-dependencies]
regex = "1.8.2"
criterion = { version = "0.5.0", features = ["html_reports"] }
once_cell = "1.17.1"
regex = "1.9.5"
criterion = { version = "0.5.1", features = ["html_reports"] }
once_cell = "1.18.0"
tracing-test = "0.2.4"
pretty_assertions = "1.3.0"
tempfile = "3.5.0"
pretty_assertions = "1.4.0"
tempfile = "3.8.0"
rand = "0.8.5"

[build-dependencies]
tonic-build = { version = "0.9.2", default_features = false, features = ["transport", "prost"] }
prost-build = "0.11.9"
built = { version = "0.6.0", features = ["git2"] }
tonic-build = { version = "0.10.0", default_features = false, features = ["transport", "prost"] }
prost-build = "0.12.0"
built = { version = "0.6.1", features = ["git2"] }
protobuf-src = { version = "1.1.0", optional = true }

[features]
Expand Down
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@
[![Discord](https://img.shields.io/discord/773975408265134100)](https://discord.gg/mfBNZjBDnc)
[![Twitter Follow](https://img.shields.io/twitter/follow/quilkindev?style=social)](https://twitter.com/quilkindev)

Quilkin is a non-transparent UDP proxy specifically designed for use with large scale multiplayer dedicated game servers
deployments, to ensure security, access control, telemetry data, metrics and more.

It is designed to be used behind game clients as well as in front of dedicated game servers.
Quilkin is a non-transparent UDP proxy specifically designed for use with large scale multiplayer dedicated game
servers deployments that ensures security, access control, telemetry data, metrics and more without the end user
having to custom build and integrate this functionality into their game clients and servers directly.

## Announcements

Expand Down
4 changes: 2 additions & 2 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.map(|name| std::env::current_dir().unwrap().join(name))
.collect::<Vec<_>>();

let include_dirs = vec![
let include_dirs = [
"proto/data-plane-api",
"proto/udpa",
"proto/googleapis",
Expand Down Expand Up @@ -83,7 +83,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

// This tells cargo to re-run this build script only when the proto files
// we're interested in change or the any of the proto directories were updated.
for path in vec![proto_files, include_dirs].concat() {
for path in [proto_files, include_dirs].concat() {
println!("cargo:rerun-if-changed={}", path.to_str().unwrap());
}

Expand Down
3 changes: 2 additions & 1 deletion deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ exceptions = [
# Each entry is the crate and version constraint, and its specific allow
# list
{ name ="webpki-roots", version = "0.25.0", allow = ["MPL-2.0"] },
{ name ="webpki-roots", version = "0.23.0", allow = ["MPL-2.0"] }
{ name ="webpki-roots", version = "0.23.0", allow = ["MPL-2.0"] },
{ name ="option-ext", version = "0.2.0", allow = ["MPL-2.0"] }
]

[[licenses.clarify]]
Expand Down
20 changes: 10 additions & 10 deletions examples/quilkin-filter-example/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@ edition = "2021"
# If lifting this example, you will want to be explicit about the Quilkin version, e.g.
# quilkin = "0.2.0"
quilkin = { path = "../../" }
tokio = { version = "1.21.2", features = ["full"] }
tonic = "0.8.2"
prost = "0.11.2"
prost-types = "0.11.2"
serde = "1.0.147"
serde_yaml = "0.9.14"
bytes = "1.2.1"
schemars = "0.8.11"
async-trait = "0.1.68"
tokio = { version = "1.32", features = ["full"] }
tonic = "0.10"
prost = "0.12"
prost-types = "0.12"
serde = "1"
serde_yaml = "0.9"
bytes = "1"
schemars = "0.8"
async-trait = "0.1"

[build-dependencies]
prost-build = "0.11.2"
prost-build = "0.12"
protobuf-src = "1.1.0"
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@
# limitations under the License.

[toolchain]
channel = "1.71.1"
channel = "1.72.0"
components = ["rustfmt", "clippy"]
7 changes: 6 additions & 1 deletion src/cli/qcmp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ impl Ping {
.await
.unwrap();

let Ok(socket_result) = tokio::time::timeout(std::time::Duration::from_secs(1), socket.recv_from(&mut buf)).await else {
let Ok(socket_result) = tokio::time::timeout(
std::time::Duration::from_secs(1),
socket.recv_from(&mut buf),
)
.await
else {
tracing::error!(endpoint=%self.endpoint, "exceeded timeout duration");
continue;
};
Expand Down
2 changes: 1 addition & 1 deletion src/filters/load_balancer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ policy: RANDOM
([127, 0, 0, 3], 8080).into(),
];
let source_ips = vec![[127u8, 1, 1, 1], [127, 2, 2, 2], [127, 3, 3, 3]];
let source_ports = vec![11111u16, 22222, 33333, 44444, 55555];
let source_ports = [11111u16, 22222, 33333, 44444, 55555];

let yaml = "policy: HASH";
let filter = LoadBalancer::from_config(serde_yaml::from_str(yaml).unwrap());
Expand Down
1 change: 1 addition & 0 deletions src/filters/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ impl Timestamp {
};

// Create a normal DateTime from the NaiveDateTime
#[allow(deprecated)]
let datetime: DateTime<Utc> = DateTime::from_utc(naive, Utc);

let now = Utc::now();
Expand Down
17 changes: 14 additions & 3 deletions src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ pub async fn spawn(port: u16) -> crate::Result<()> {
}
};

let Protocol::Ping { client_timestamp, nonce, } = command else {
let Protocol::Ping {
client_timestamp,
nonce,
} = command
else {
tracing::warn!("rejected unsupported QCMP packet");
continue;
};
Expand Down Expand Up @@ -206,7 +210,13 @@ impl Protocol {
///
/// [ntp]: https://en.wikipedia.org/wiki/Network_Time_Protocol#Clock_synchronization_algorithm
pub fn round_trip_delay(&self, client_response_timestamp: i64) -> Option<i64> {
let Protocol::PingReply { client_timestamp, server_start_timestamp, server_transmit_timestamp, .. } = self else {
let Protocol::PingReply {
client_timestamp,
server_start_timestamp,
server_transmit_timestamp,
..
} = self
else {
return None;
};

Expand Down Expand Up @@ -243,7 +253,8 @@ impl Protocol {
/// is not a QCMP packet, and returning `Err` when it was detected as a
/// QCMP packet, but there was an error in parsing the payload.
pub fn parse(input: &[u8]) -> Result<Option<Self>> {
let Ok((input, _)) = complete::tag::<_, _, nom::error::Error<_>>(MAGIC_NUMBER)(input) else {
let Ok((input, _)) = complete::tag::<_, _, nom::error::Error<_>>(MAGIC_NUMBER)(input)
else {
return Ok(None);
};

Expand Down
27 changes: 10 additions & 17 deletions src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ use tokio::net::UdpSocket;
use crate::{
endpoint::{Endpoint, EndpointAddress},
filters::{Filter, ReadContext},
ttl_map::TryResult,
Config,
};

pub use sessions::{Session, SessionArgs, SessionKey, SessionMap};
pub use sessions::{Session, SessionKey, SessionMap};

/// Packet received from local port
#[derive(Debug)]
Expand Down Expand Up @@ -203,25 +202,21 @@ impl DownstreamReceiveWorkerConfig {
dest: endpoint.address.clone(),
};

let send_future = match sessions.try_get(&session_key) {
TryResult::Present(entry) => entry.send(packet),
TryResult::Absent => {
let session_args = SessionArgs {
config: config.clone(),
source: session_key.source.clone(),
downstream_socket: downstream_socket.clone(),
dest: endpoint.clone(),
let send_future = match sessions.get(&session_key) {
Some(entry) => entry.send(packet),
None => {
let session = Session::new(
config.clone(),
session_key.source.clone(),
downstream_socket.clone(),
endpoint.clone(),
asn_info,
};
)?;

let session = session_args.into_session().await?;
let future = session.send(packet);
sessions.insert(session_key, session);
future
}
TryResult::Locked => {
return Err(PipelineError::SessionMapLocked);
}
};

send_future.await
Expand All @@ -232,8 +227,6 @@ impl DownstreamReceiveWorkerConfig {
pub enum PipelineError {
#[error("No upstream endpoints available")]
NoUpstreamEndpoints,
#[error("session map was locked")]
SessionMapLocked,
#[error("filter {0}")]
Filter(#[from] crate::filters::FilterError),
#[error("qcmp: {0}")]
Expand Down
Loading

0 comments on commit f736060

Please sign in to comment.