From 152d6b0febe13628e05cfc9acebbcc45d5c3b59e Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Tue, 4 Jun 2024 12:44:51 +0200 Subject: [PATCH] Upgrade omniqueue and related dependencies --- bridge/Cargo.lock | 31 ++++++---- bridge/svix-bridge-plugin-queue/Cargo.toml | 6 +- .../svix-bridge-plugin-queue/src/redis/mod.rs | 58 +++++++++---------- .../tests/it/redis_stream_consumer.rs | 6 +- 4 files changed, 52 insertions(+), 49 deletions(-) diff --git a/bridge/Cargo.lock b/bridge/Cargo.lock index e780511e5..d74ecb334 100644 --- a/bridge/Cargo.lock +++ b/bridge/Cargo.lock @@ -963,9 +963,9 @@ dependencies = [ [[package]] name = "bb8-redis" -version = "0.14.0" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4094bc17b933090cfded54315a86db01d67ec999583d4bab894c520f8c097d1f" +checksum = "7eb4f141b33a750b5f667c445bd8588de10b8f2b045cd2aabc040ca746fb53ae" dependencies = [ "async-trait", "bb8", @@ -1076,6 +1076,12 @@ dependencies = [ "either", ] +[[package]] +name = "bytesize" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3e368af43e418a04d52505cf3dbc23dda4e3407ae2fa99fd0e4f308ce546acc" + [[package]] name = "cache_control" version = "0.2.0" @@ -2834,9 +2840,9 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "google-cloud-auth" -version = "0.13.1" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3798ce9d99d548f28a7f942dcd546befae21c9a1310a7b54e920a7ee241aca3" +checksum = "fe54dd8b6eb2bcd5390998238bcc39d1daed4dbb70df2845832532540384fc41" dependencies = [ "async-trait", "base64 0.21.7", @@ -2894,9 +2900,9 @@ dependencies = [ [[package]] name = "google-cloud-pubsub" -version = "0.23.0" +version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1da196da473976944d408a91213bafe078e7223e10694d3f8ed36b6e210fa130" +checksum = "0b2184a5c70b994e6d77eb1c140e193e7f5fe6015e9115322fac24f7e33f003c" dependencies = [ "async-channel 1.9.0", "async-stream", @@ -3940,15 +3946,15 @@ dependencies = [ [[package]] name = "omniqueue" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cba3b7f69e420be6e32a580307e38aef7f53ac01cc09c3bb6851aec37d710894" +version = "0.2.1" +source = "git+https://github.com/svix/omniqueue-rs?rev=62ca8fa5cb0ac47bbfbad4b1939bcfe7d4cdfb6b#62ca8fa5cb0ac47bbfbad4b1939bcfe7d4cdfb6b" dependencies = [ "async-trait", "aws-config", "aws-sdk-sqs", "bb8", "bb8-redis", + "bytesize", "futures-util", "google-cloud-googleapis", "google-cloud-pubsub", @@ -3958,6 +3964,7 @@ dependencies = [ "serde_json", "svix-ksuid 0.8.0", "thiserror", + "time", "tokio", "tracing", ] @@ -4753,9 +4760,9 @@ dependencies = [ [[package]] name = "redis" -version = "0.24.0" +version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c580d9cbbe1d1b479e8d67cf9daf6a62c957e6846048408b80b43ac3f6af84cd" +checksum = "e0d7a6955c7511f60f3ba9e86c6d02b3c3f144f8c24b288d1f4e18074ab8bbec" dependencies = [ "async-trait", "bytes", @@ -4771,7 +4778,7 @@ dependencies = [ "rand 0.8.5", "ryu", "sha1_smol", - "socket2 0.4.10", + "socket2 0.5.6", "tokio", "tokio-native-tls", "tokio-util", diff --git a/bridge/svix-bridge-plugin-queue/Cargo.toml b/bridge/svix-bridge-plugin-queue/Cargo.toml index 795b3c81e..ddbff8ace 100644 --- a/bridge/svix-bridge-plugin-queue/Cargo.toml +++ b/bridge/svix-bridge-plugin-queue/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -omniqueue = "0.2.0" +omniqueue = { git = "https://github.com/svix/omniqueue-rs", rev = "62ca8fa5cb0ac47bbfbad4b1939bcfe7d4cdfb6b" } serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } svix-bridge-types = { path = "../svix-bridge-types" } @@ -20,8 +20,8 @@ aws-config = "1.1.5" aws-sdk-sqs = "1.13.0" fastrand = "2.0.1" google-cloud-googleapis = "0.12.0" -google-cloud-pubsub = "0.23.0" +google-cloud-pubsub = "0.24.0" lapin = "2" -redis = { version = "0.24.0", features = ["tokio-comp", "streams"] } +redis = { version = "0.25.4", features = ["tokio-comp", "streams"] } tracing-subscriber = "0.3" wiremock = "0.5.18" diff --git a/bridge/svix-bridge-plugin-queue/src/redis/mod.rs b/bridge/svix-bridge-plugin-queue/src/redis/mod.rs index 122637825..c5be035d7 100644 --- a/bridge/svix-bridge-plugin-queue/src/redis/mod.rs +++ b/bridge/svix-bridge-plugin-queue/src/redis/mod.rs @@ -42,21 +42,19 @@ pub async fn consumer(cfg: &RedisInputOpts) -> Result { .unwrap_or_else(|| format!("{}_delays", cfg.queue_key)); let delayed_lock_key = format!("{delayed_queue_key}_lock"); - backends::RedisBackend::::builder( - backends::RedisConfig { - dsn: cfg.dsn.clone(), - max_connections: cfg.max_connections, - reinsert_on_nack: cfg.reinsert_on_nack, - queue_key: cfg.queue_key.clone(), - delayed_queue_key, - delayed_lock_key, - consumer_group: cfg.consumer_group.clone(), - consumer_name: cfg.consumer_name.clone(), - // FIXME: expose in config? - payload_key: "payload".to_string(), - ack_deadline_ms: cfg.ack_deadline_ms, - }, - ) + backends::RedisBackend::builder(backends::RedisConfig { + dsn: cfg.dsn.clone(), + max_connections: cfg.max_connections, + reinsert_on_nack: cfg.reinsert_on_nack, + queue_key: cfg.queue_key.clone(), + delayed_queue_key, + delayed_lock_key, + consumer_group: cfg.consumer_group.clone(), + consumer_name: cfg.consumer_name.clone(), + // FIXME: expose in config? + payload_key: "payload".to_string(), + ack_deadline_ms: cfg.ack_deadline_ms, + }) .make_dynamic() .build_consumer() .await @@ -69,22 +67,20 @@ pub async fn producer(cfg: &RedisOutputOpts) -> Result { .unwrap_or_else(|| format!("{}_delays", cfg.queue_key)); let delayed_lock_key = format!("{delayed_queue_key}_lock"); - backends::RedisBackend::::builder( - backends::RedisConfig { - dsn: cfg.dsn.clone(), - max_connections: cfg.max_connections, - queue_key: cfg.queue_key.clone(), - delayed_queue_key, - delayed_lock_key, - // FIXME: expose in config? - payload_key: "payload".to_string(), - // consumer stuff we don't care about. - reinsert_on_nack: false, - consumer_group: String::new(), - consumer_name: String::new(), - ack_deadline_ms: cfg.ack_deadline_ms, - }, - ) + backends::RedisBackend::builder(backends::RedisConfig { + dsn: cfg.dsn.clone(), + max_connections: cfg.max_connections, + queue_key: cfg.queue_key.clone(), + delayed_queue_key, + delayed_lock_key, + // FIXME: expose in config? + payload_key: "payload".to_string(), + // consumer stuff we don't care about. + reinsert_on_nack: false, + consumer_group: String::new(), + consumer_name: String::new(), + ack_deadline_ms: cfg.ack_deadline_ms, + }) .make_dynamic() .build_producer() .await diff --git a/bridge/svix-bridge-plugin-queue/tests/it/redis_stream_consumer.rs b/bridge/svix-bridge-plugin-queue/tests/it/redis_stream_consumer.rs index 533273078..d4ff4b5c7 100644 --- a/bridge/svix-bridge-plugin-queue/tests/it/redis_stream_consumer.rs +++ b/bridge/svix-bridge-plugin-queue/tests/it/redis_stream_consumer.rs @@ -59,7 +59,7 @@ async fn create_test_stream(client: &Client) -> String { .take(8) .collect(); - let mut conn = client.get_async_connection().await.unwrap(); + let mut conn = client.get_multiplexed_async_connection().await.unwrap(); let _: () = conn .xgroup_create_mkstream(&name, "test_cg", 0i8) @@ -70,12 +70,12 @@ async fn create_test_stream(client: &Client) -> String { } async fn delete_test_stream(client: &Client, key: &str) { - let mut conn = client.get_async_connection().await.unwrap(); + let mut conn = client.get_multiplexed_async_connection().await.unwrap(); let _: () = conn.del(key).await.unwrap(); } async fn publish(client: &Client, key: &str, payload: &str) { - let mut conn = client.get_async_connection().await.unwrap(); + let mut conn = client.get_multiplexed_async_connection().await.unwrap(); // N.b. the redis code relies on the messages being json with a `payload` key in there. // The `payload` key can be any valid JSON value. let _: () = conn.xadd(key, "*", &[("payload", payload)]).await.unwrap();