Skip to content

Commit

Permalink
Merge branch 'main' into auto-schema
Browse files Browse the repository at this point in the history
  • Loading branch information
markmandel authored Feb 5, 2022
2 parents b8dd915 + 2fdc8ac commit eb1777f
Show file tree
Hide file tree
Showing 26 changed files with 1,047 additions and 73 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ quilkin-macros = { version = "0.3.0-dev", path = "./macros" }
# Crates.io
base64 = "0.13.0"
base64-serde = "0.6.1"
bytes = "1.1.0"
bytes = { version = "1.1.0", features = ["serde"] }
clap = { version = "3", features = ["cargo", "derive", "env"] }
dashmap = "4.0.2"
either = "1.6.1"
Expand Down
172 changes: 162 additions & 10 deletions benches/throughput.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use std::net::UdpSocket;
use std::net::{Ipv4Addr, SocketAddr, UdpSocket};
use std::sync::{atomic, mpsc, Arc};
use std::thread::sleep;
use std::time;

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use once_cell::sync::Lazy;

use quilkin::config::Admin;

const MESSAGE_SIZE: usize = 0xffff;
const DEFAULT_MESSAGE: [u8; 0xffff] = [0xff; 0xffff];
const BENCH_LOOP_ADDR: &str = "127.0.0.1:8002";
Expand All @@ -19,16 +24,19 @@ const PACKETS: &[&[u8]] = &[
&[0xffu8; 1500],
];

static SERVER_INIT: Lazy<()> = Lazy::new(|| {
std::thread::spawn(|| {
/// Run and instance of quilkin that sends and received data
/// from the given address.
fn run_quilkin(port: u16, endpoint: SocketAddr) {
std::thread::spawn(move || {
let runtime = tokio::runtime::Runtime::new().unwrap();
let config = quilkin::config::Builder::empty()
.with_port(8000)
.with_port(port)
.with_admin(Admin {
address: "[::]:0".parse().unwrap(),
})
.with_static(
vec![],
vec![quilkin::endpoint::Endpoint::new(
FEEDBACK_LOOP_ADDR.parse().unwrap(),
)],
vec![quilkin::endpoint::Endpoint::new(endpoint.into())],
)
.build();
let server = quilkin::Builder::from(std::sync::Arc::new(config))
Expand All @@ -41,6 +49,10 @@ static SERVER_INIT: Lazy<()> = Lazy::new(|| {
server.run(shutdown_rx).await.unwrap();
});
});
}

static THROUGHPUT_SERVER_INIT: Lazy<()> = Lazy::new(|| {
run_quilkin(8000, FEEDBACK_LOOP_ADDR.parse().unwrap());
});

static FEEDBACK_LOOP: Lazy<()> = Lazy::new(|| {
Expand All @@ -61,9 +73,9 @@ static FEEDBACK_LOOP: Lazy<()> = Lazy::new(|| {
});
});

fn criterion_benchmark(c: &mut Criterion) {
fn throughput_benchmark(c: &mut Criterion) {
Lazy::force(&FEEDBACK_LOOP);
Lazy::force(&SERVER_INIT);
Lazy::force(&THROUGHPUT_SERVER_INIT);
// Sleep to give the servers some time to warm-up.
std::thread::sleep(std::time::Duration::from_millis(500));
let socket = UdpSocket::bind(BENCH_LOOP_ADDR).unwrap();
Expand Down Expand Up @@ -98,5 +110,145 @@ fn criterion_benchmark(c: &mut Criterion) {
group.finish();
}

criterion_group!(benches, criterion_benchmark);
const WRITE_LOOP_ADDR: &str = "127.0.0.1:8003";
const READ_LOOP_ADDR: &str = "127.0.0.1:8004";

const READ_QUILKIN_PORT: u16 = 9001;
static READ_SERVER_INIT: Lazy<()> = Lazy::new(|| {
run_quilkin(READ_QUILKIN_PORT, READ_LOOP_ADDR.parse().unwrap());
});

const WRITE_QUILKIN_PORT: u16 = 9002;
static WRITE_SERVER_INIT: Lazy<()> = Lazy::new(|| {
run_quilkin(WRITE_QUILKIN_PORT, WRITE_LOOP_ADDR.parse().unwrap());
});

/// Binds a socket to `addr`, and waits for an initial packet to be sent to it to establish
/// a connection. After which any `Vec<u8>` sent to the returned channel will result in that
/// data being send via that connection - thereby skipping the proxy `read` operation.
fn write_feedback(addr: SocketAddr) -> mpsc::Sender<Vec<u8>> {
let (write_tx, write_rx) = mpsc::channel::<Vec<u8>>();
std::thread::spawn(move || {
let socket = UdpSocket::bind(addr).unwrap();
let mut packet = [0; MESSAGE_SIZE];
let (_, source) = socket.recv_from(&mut packet).unwrap();
while let Ok(packet) = write_rx.recv() {
socket.send_to(packet.as_slice(), source).unwrap();
}
});
write_tx
}

fn readwrite_benchmark(c: &mut Criterion) {
Lazy::force(&READ_SERVER_INIT);

// start a feedback server for read operations, that sends a response through a channel,
// thereby skipping a proxy connection on the return.
let (read_tx, read_rx) = mpsc::channel::<Vec<u8>>();
std::thread::spawn(move || {
let socket = UdpSocket::bind(READ_LOOP_ADDR).unwrap();
let mut packet = [0; MESSAGE_SIZE];
loop {
let (length, _) = socket.recv_from(&mut packet).unwrap();
let packet = &packet[..length];
assert_eq!(packet, &DEFAULT_MESSAGE[..length]);

if read_tx.send(packet.to_vec()).is_err() {
return;
}
}
});

// start a feedback server for a direct write benchmark.
let direct_write_addr = (Ipv4Addr::LOCALHOST, 9004).into();
let direct_write_tx = write_feedback(direct_write_addr);

// start a feedback server for a quilkin write benchmark.
let quilkin_write_addr = (Ipv4Addr::LOCALHOST, WRITE_QUILKIN_PORT);
let quilkin_write_tx = write_feedback(WRITE_LOOP_ADDR.parse().unwrap());
Lazy::force(&WRITE_SERVER_INIT);

// Sleep to give the servers some time to warm-up.
std::thread::sleep(std::time::Duration::from_millis(500));

let socket = UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)).unwrap();

// prime the direct write connection
socket.send_to(PACKETS[0], direct_write_addr).unwrap();

// we need to send packets at least once a minute, otherwise the endpoint session expires.
// So setting up a ping packet for the write test.
// TODO(markmandel): If we ever make session timeout configurable, we can remove this.
let ping_socket = socket.try_clone().unwrap();
let stop = Arc::new(atomic::AtomicBool::default());
let ping_stop = stop.clone();
std::thread::spawn(move || {
while !ping_stop.load(atomic::Ordering::Relaxed) {
ping_socket.send_to(PACKETS[0], quilkin_write_addr).unwrap();
sleep(time::Duration::from_secs(30));
}
});

let mut group = c.benchmark_group("readwrite");

for message in PACKETS {
group.sample_size(NUMBER_OF_PACKETS);
group.sampling_mode(criterion::SamplingMode::Flat);
group.throughput(criterion::Throughput::Bytes(message.len() as u64));

// direct read
group.bench_with_input(
BenchmarkId::new("direct-read", format!("{} bytes", message.len())),
&message,
|b, message| {
b.iter(|| {
socket.send_to(message, READ_LOOP_ADDR).unwrap();
read_rx.recv().unwrap();
})
},
);
// quilkin read
let addr = (Ipv4Addr::LOCALHOST, READ_QUILKIN_PORT);
group.bench_with_input(
BenchmarkId::new("quilkin-read", format!("{} bytes", message.len())),
&message,
|b, message| {
b.iter(|| {
socket.send_to(message, addr).unwrap();
read_rx.recv().unwrap();
})
},
);

// direct write
let mut packet = [0; MESSAGE_SIZE];
group.bench_with_input(
BenchmarkId::new("direct-write", format!("{} bytes", message.len())),
&message,
|b, message| {
b.iter(|| {
direct_write_tx.send(message.to_vec()).unwrap();
socket.recv(&mut packet).unwrap();
})
},
);

// quilkin write
let mut packet = [0; MESSAGE_SIZE];
group.bench_with_input(
BenchmarkId::new("quilkin-write", format!("{} bytes", message.len())),
&message,
|b, message| {
b.iter(|| {
quilkin_write_tx.send(message.to_vec()).unwrap();
socket.recv(&mut packet).unwrap();
})
},
);
}

stop.store(true, atomic::Ordering::Relaxed);
}

criterion_group!(benches, readwrite_benchmark, throughput_benchmark);
criterion_main!(benches);
5 changes: 3 additions & 2 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"proto/data-plane-api/envoy/type/metadata/v3/metadata.proto",
"proto/data-plane-api/envoy/type/tracing/v3/custom_tag.proto",
"proto/udpa/xds/core/v3/resource_name.proto",
"proto/quilkin/extensions/filters/debug/v1alpha1/debug.proto",
"proto/quilkin/extensions/filters/capture_bytes/v1alpha1/capture_bytes.proto",
"proto/quilkin/extensions/filters/compress/v1alpha1/compress.proto",
"proto/quilkin/extensions/filters/concatenate_bytes/v1alpha1/concatenate_bytes.proto",
"proto/quilkin/extensions/filters/debug/v1alpha1/debug.proto",
"proto/quilkin/extensions/filters/firewall/v1alpha1/firewall.proto",
"proto/quilkin/extensions/filters/load_balancer/v1alpha1/load_balancer.proto",
"proto/quilkin/extensions/filters/local_rate_limit/v1alpha1/local_rate_limit.proto",
"proto/quilkin/extensions/filters/matches/v1alpha1/matches.proto",
"proto/quilkin/extensions/filters/token_router/v1alpha1/token_router.proto",
"proto/quilkin/extensions/filters/firewall/v1alpha1/firewall.proto",
]
.iter()
.map(|name| std::env::current_dir().unwrap().join(name))
Expand Down
43 changes: 43 additions & 0 deletions docs/src/filters/matches.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Matches

The `Matches` filter's job is to provide a mechanism to change behaviour based
on dynamic metadata. This filter behaves similarly to the `match` expression
in Rust or `switch` statements in other languages.

#### Filter name
```text
quilkin.extensions.filters.matches.v1alpha1.Matches
```

### Configuration Examples
```rust
# let yaml = "
version: v1alpha1
static:
endpoints:
- address: 127.0.0.1:26000
- address: 127.0.0.1:26001
filters:
- name: quilkin.extensions.filters.capture_bytes.v1alpha1.CaptureBytes
config:
strategy: PREFIX
metadataKey: myapp.com/token
size: 3
remove: false
- name: quilkin.extensions.filters.matches.v1alpha1.Matches
config:
on_read:
metadataKey: myapp.com/token
branches:
- value: abc
filter: quilkin.extensions.filters.concatenate_bytes.v1alpha1.ConcatenateBytes
config:
on_read: APPEND
bytes: eHl6 # "xyz"
# ";
# let config = quilkin::config::Config::from_reader(yaml.as_bytes()).unwrap();
# assert_eq!(config.source.get_static_filters().unwrap().len(), 1);
# quilkin::Builder::from(std::sync::Arc::new(config)).validate().unwrap();
```

View the [Matches](../../api/quilkin/filters/matches/struct.Config.html) filter documentation for more details.
4 changes: 2 additions & 2 deletions docs/src/filters/writing_custom_filters.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ impl FilterFactory for GreetFilterFactory {
fn create_filter(&self, args: CreateFilterArgs) -> Result<FilterInstance, Error> {
let config = match args.config.unwrap() {
ConfigType::Static(config) => {
serde_yaml::from_str::<Config>(serde_yaml::to_string(config).unwrap().as_str())
serde_yaml::from_str::<Config>(serde_yaml::to_string(&config).unwrap().as_str())
.unwrap()
}
ConfigType::Dynamic(_) => unimplemented!("dynamic config is not yet supported for this filter"),
Expand All @@ -273,7 +273,7 @@ has a [Dynamic][ConfigType::dynamic] variant.
```rust,ignore
let config = match args.config.unwrap() {
ConfigType::Static(config) => {
serde_yaml::from_str::<Config>(serde_yaml::to_string(config).unwrap().as_str())
serde_yaml::from_str::<Config>(serde_yaml::to_string(&config).unwrap().as_str())
.unwrap()
}
ConfigType::Dynamic(_) => unimplemented!("dynamic config is not yet supported for this filter"),
Expand Down
49 changes: 49 additions & 0 deletions proto/quilkin/extensions/filters/matches/v1alpha1/matches.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

syntax = "proto3";

package quilkin.extensions.filters.matches.v1alpha1;

import "google/protobuf/wrappers.proto";
import "google/protobuf/struct.proto";
import "google/protobuf/any.proto";

message Matches {
message Branch {
google.protobuf.Value value = 1;
google.protobuf.StringValue filter = 2;
optional google.protobuf.Any config = 3;
}

message FallthroughFilter {
google.protobuf.StringValue filter = 2;
optional google.protobuf.Any config = 3;
}

message DirectionalConfig {
google.protobuf.StringValue metadata_key = 1;
repeated Branch branches = 2;
oneof fallthrough {
google.protobuf.NullValue pass = 3;
google.protobuf.NullValue drop = 4;
FallthroughFilter filter = 5;
}
}

optional DirectionalConfig on_read = 1;
optional DirectionalConfig on_write = 2;
}
Loading

0 comments on commit eb1777f

Please sign in to comment.