diff --git a/Cargo.lock b/Cargo.lock index bc2752ea2..916c052e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -224,18 +224,6 @@ dependencies = [ "futures-core", ] -[[package]] -name = "async-channel" -version = "2.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" -dependencies = [ - "concurrent-queue 2.5.0", - "event-listener-strategy", - "futures-core", - "pin-project-lite 0.2.13", -] - [[package]] name = "async-executor" version = "1.4.1" @@ -256,7 +244,7 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd8b508d585e01084059b60f06ade4cb7415cd2e4084b71dd1cb44e7d3fb9880" dependencies = [ - "async-channel 1.6.1", + "async-channel", "async-executor", "async-io", "async-lock 2.8.0", @@ -264,7 +252,7 @@ dependencies = [ "futures-lite", "once_cell", "tokio 0.2.25", - "tokio 1.35.1", + "tokio 1.40.0", ] [[package]] @@ -341,7 +329,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" dependencies = [ "async-attributes", - "async-channel 1.6.1", + "async-channel", "async-global-executor", "async-io", "async-lock 2.8.0", @@ -415,20 +403,6 @@ dependencies = [ "syn 2.0.76", ] -[[package]] -name = "async-tungstenite" -version = "0.17.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1b71b31561643aa8e7df3effe284fa83ab1a840e52294c5f4bd7bfd8b2becbb" -dependencies = [ - "async-std", - "futures-io", - "futures-util", - "log", - "pin-project-lite 0.2.13", - "tungstenite", -] - [[package]] name = "atomic" version = "0.5.1" @@ -450,7 +424,7 @@ version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" dependencies = [ - "hermit-abi", + "hermit-abi 0.1.19", "libc", "winapi", ] @@ -551,13 +525,13 @@ dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", - "bytes 1.5.0", + "bytes 1.7.2", "fastrand 2.1.1", "hex", "http 0.2.12", "ring 0.17.8", "time", - "tokio 1.35.1", + "tokio 1.40.0", "tracing", "url", "zeroize", @@ -590,7 +564,7 @@ dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", - "bytes 1.5.0", + "bytes 1.7.2", "fastrand 2.1.1", "http 0.2.12", "http-body 0.4.5", @@ -621,7 +595,7 @@ dependencies = [ "aws-smithy-types", "aws-smithy-xml", "aws-types", - "bytes 1.5.0", + "bytes 1.7.2", "fastrand 2.1.1", "hex", "hmac", @@ -651,7 +625,7 @@ dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", - "bytes 1.5.0", + "bytes 1.7.2", "http 0.2.12", "once_cell", "regex-lite", @@ -673,7 +647,7 @@ dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", - "bytes 1.5.0", + "bytes 1.7.2", "http 0.2.12", "once_cell", "regex-lite", @@ -714,7 +688,7 @@ dependencies = [ "aws-smithy-http", "aws-smithy-runtime-api", "aws-smithy-types", - "bytes 1.5.0", + "bytes 1.7.2", "crypto-bigint 0.5.5", "form_urlencoded", "hex", @@ -740,7 +714,7 @@ checksum = "62220bc6e97f946ddd51b5f1361f78996e704677afc518a4ff66b7a72ea1378c" dependencies = [ "futures-util", "pin-project-lite 0.2.13", - "tokio 1.35.1", + "tokio 1.40.0", ] [[package]] @@ -751,7 +725,7 @@ checksum = "598b1689d001c4d4dc3cb386adb07d37786783aee3ac4b324bcadac116bf3d23" dependencies = [ "aws-smithy-http", "aws-smithy-types", - "bytes 1.5.0", + "bytes 1.7.2", "crc32c", "crc32fast", "hex", @@ -771,7 +745,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6363078f927f612b970edf9d1903ef5cef9a64d1e8423525ebb1f0a1633c858" dependencies = [ "aws-smithy-types", - "bytes 1.5.0", + "bytes 1.7.2", "crc32fast", ] @@ -784,7 +758,7 @@ dependencies = [ "aws-smithy-eventstream", "aws-smithy-runtime-api", "aws-smithy-types", - "bytes 1.5.0", + "bytes 1.7.2", "bytes-utils", "futures-core", "http 0.2.12", @@ -825,7 +799,7 @@ dependencies = [ "aws-smithy-http", "aws-smithy-runtime-api", "aws-smithy-types", - "bytes 1.5.0", + "bytes 1.7.2", "fastrand 2.1.1", "h2 0.3.26", "http 0.2.12", @@ -838,7 +812,7 @@ dependencies = [ "pin-project-lite 0.2.13", "pin-utils", "rustls 0.21.12", - "tokio 1.35.1", + "tokio 1.40.0", "tracing", ] @@ -850,11 +824,11 @@ checksum = "e086682a53d3aa241192aa110fa8dfce98f2f5ac2ead0de84d41582c7e8fdb96" dependencies = [ "aws-smithy-async", "aws-smithy-types", - "bytes 1.5.0", + "bytes 1.7.2", "http 0.2.12", "http 1.1.0", "pin-project-lite 0.2.13", - "tokio 1.35.1", + "tokio 1.40.0", "tracing", "zeroize", ] @@ -866,7 +840,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "273dcdfd762fae3e1650b8024624e7cd50e484e37abdab73a7a706188ad34543" dependencies = [ "base64-simd", - "bytes 1.5.0", + "bytes 1.7.2", "bytes-utils", "futures-core", "http 0.2.12", @@ -881,7 +855,7 @@ dependencies = [ "ryu", "serde", "time", - "tokio 1.35.1", + "tokio 1.40.0", "tokio-util 0.7.2", ] @@ -910,18 +884,18 @@ dependencies = [ [[package]] name = "axum" -version = "0.7.5" +version = "0.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" +checksum = "504e3947307ac8326a5437504c517c4b56716c9d98fac0028c2acc7ca47d70ae" dependencies = [ "async-trait", "axum-core", - "bytes 1.5.0", + "bytes 1.7.2", "futures-util", "http 1.1.0", "http-body 1.0.0", "http-body-util", - "hyper 1.3.1", + "hyper 1.4.1", "hyper-util", "itoa", "matchit", @@ -936,8 +910,8 @@ dependencies = [ "serde_path_to_error", "serde_urlencoded", "sync_wrapper 1.0.1", - "tokio 1.35.1", - "tower", + "tokio 1.40.0", + "tower 0.5.1", "tower-layer", "tower-service", "tracing", @@ -945,12 +919,12 @@ dependencies = [ [[package]] name = "axum-core" -version = "0.4.3" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" dependencies = [ "async-trait", - "bytes 1.5.0", + "bytes 1.7.2", "futures-util", "http 1.1.0", "http-body 1.0.0", @@ -958,7 +932,7 @@ dependencies = [ "mime", "pin-project-lite 0.2.13", "rustversion", - "sync_wrapper 0.1.2", + "sync_wrapper 1.0.1", "tower-layer", "tower-service", "tracing", @@ -972,7 +946,7 @@ checksum = "0be6ea09c9b96cb5076af0de2e383bd2bc0c18f827cf1967bdd353e0b910d733" dependencies = [ "axum", "axum-core", - "bytes 1.5.0", + "bytes 1.7.2", "futures-util", "headers", "http 1.1.0", @@ -981,7 +955,7 @@ dependencies = [ "mime", "pin-project-lite 0.2.13", "serde", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", "tracing", @@ -1008,13 +982,13 @@ dependencies = [ "anyhow", "axum", "axum_typed_multipart_macros", - "bytes 1.5.0", + "bytes 1.7.2", "chrono", "futures-core", "futures-util", "tempfile", "thiserror", - "tokio 1.35.1", + "tokio 1.40.0", "uuid 1.4.1", ] @@ -1191,7 +1165,7 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c6ccb65d468978a086b69884437ded69a90faab3bbe6e67f242173ea728acccc" dependencies = [ - "async-channel 1.6.1", + "async-channel", "async-task", "atomic-waker", "fastrand 1.7.0", @@ -1262,9 +1236,9 @@ checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" [[package]] name = "bytes" -version = "1.5.0" +version = "1.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" +checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3" [[package]] name = "bytes-utils" @@ -1272,7 +1246,7 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7dafe3a8757b027e2be6e4e5601ed563c55989fcf1546e933c66c8eb3a058d35" dependencies = [ - "bytes 1.5.0", + "bytes 1.7.2", "either", ] @@ -1296,7 +1270,7 @@ dependencies = [ "instant", "once_cell", "thiserror", - "tokio 1.35.1", + "tokio 1.40.0", ] [[package]] @@ -1428,11 +1402,11 @@ version = "4.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a604e93b79d1808327a6fca85a6f2d69de66461e7620f5a4cbf5fb4d1d7c948" dependencies = [ - "bytes 1.5.0", + "bytes 1.7.2", "futures-core", "memchr", "pin-project-lite 0.2.13", - "tokio 1.35.1", + "tokio 1.40.0", "tokio-util 0.7.2", ] @@ -1720,9 +1694,9 @@ dependencies = [ [[package]] name = "curl-sys" -version = "0.4.65+curl-8.2.1" +version = "0.4.77+curl-8.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "961ba061c9ef2fe34bbd12b807152d96f0badd2bebe7b90ce6c8c8b7572a0986" +checksum = "f469e8a5991f277a208224f6c7ad72ecb5f986e36d09ae1f2c1bb9259478a480" dependencies = [ "cc", "libc", @@ -1731,7 +1705,7 @@ dependencies = [ "openssl-sys", "pkg-config", "vcpkg", - "winapi", + "windows-sys 0.52.0", ] [[package]] @@ -1870,7 +1844,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "16a2561fd313df162315935989dceb8c99db4ee1933358270a57a3cfb8c957f3" dependencies = [ "crossbeam-queue", - "tokio 1.35.1", + "tokio 1.40.0", ] [[package]] @@ -2257,6 +2231,26 @@ version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6" +[[package]] +name = "fastwebsockets" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26da0c7b5cef45c521a6f9cdfffdfeb6c9f5804fbac332deb5ae254634c7a6be" +dependencies = [ + "base64 0.21.3", + "bytes 1.7.2", + "http-body-util", + "hyper 1.4.1", + "hyper-util", + "pin-project", + "rand 0.8.5", + "sha1", + "simdutf8", + "thiserror", + "tokio 1.40.0", + "utf-8", +] + [[package]] name = "fcm_v1" version = "0.3.0" @@ -2350,6 +2344,9 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" dependencies = [ + "futures-core", + "futures-sink", + "nanorand", "spin 0.9.8", ] @@ -2414,7 +2411,7 @@ checksum = "d3b2a2ac060e3266004c552235c241b481e438e2b1ea75715ea1176914ef2868" dependencies = [ "arc-swap", "async-trait", - "bytes 1.5.0", + "bytes 1.7.2", "bytes-utils", "crossbeam-queue", "float-cmp", @@ -2426,7 +2423,7 @@ dependencies = [ "redis-protocol", "semver 1.0.23", "socket2 0.5.5", - "tokio 1.35.1", + "tokio 1.40.0", "tokio-stream", "tokio-util 0.7.2", "url", @@ -2520,7 +2517,7 @@ checksum = "45ec6fe3675af967e67c5536c0b9d44e34e6c52f86bedc4ea49c5317b8e94d06" dependencies = [ "futures-channel", "futures-task", - "tokio 1.35.1", + "tokio 1.40.0", ] [[package]] @@ -2729,7 +2726,7 @@ version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" dependencies = [ - "bytes 1.5.0", + "bytes 1.7.2", "fnv", "futures-core", "futures-sink", @@ -2737,7 +2734,7 @@ dependencies = [ "http 0.2.12", "indexmap 2.0.1", "slab", - "tokio 1.35.1", + "tokio 1.40.0", "tokio-util 0.7.2", "tracing", ] @@ -2749,14 +2746,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa82e28a107a8cc405f0839610bdc9b15f1e25ec7d696aa5cf173edbcb1486ab" dependencies = [ "atomic-waker", - "bytes 1.5.0", + "bytes 1.7.2", "fnv", "futures-core", "futures-sink", "http 1.1.0", "indexmap 2.0.1", "slab", - "tokio 1.35.1", + "tokio 1.40.0", "tokio-util 0.7.2", "tracing", ] @@ -2826,7 +2823,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "322106e6bd0cba2d5ead589ddb8150a13d7c4217cf80d7c4f682ca994ccc6aa9" dependencies = [ "base64 0.21.3", - "bytes 1.5.0", + "bytes 1.7.2", "headers-core", "http 1.1.0", "httpdate", @@ -2864,6 +2861,12 @@ dependencies = [ "libc", ] +[[package]] +name = "hermit-abi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" + [[package]] name = "hex" version = "0.4.3" @@ -2943,7 +2946,7 @@ version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" dependencies = [ - "bytes 1.5.0", + "bytes 1.7.2", "fnv", "itoa", ] @@ -2954,7 +2957,7 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" dependencies = [ - "bytes 1.5.0", + "bytes 1.7.2", "fnv", "itoa", ] @@ -2965,7 +2968,7 @@ version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ - "bytes 1.5.0", + "bytes 1.7.2", "http 0.2.12", "pin-project-lite 0.2.13", ] @@ -2976,7 +2979,7 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" dependencies = [ - "bytes 1.5.0", + "bytes 1.7.2", "http 1.1.0", ] @@ -2986,7 +2989,7 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" dependencies = [ - "bytes 1.5.0", + "bytes 1.7.2", "futures-util", "http 1.1.0", "http-body 1.0.0", @@ -3020,7 +3023,7 @@ version = "0.14.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a152ddd61dfaec7273fe8419ab357f33aee0d914c5f4efbf0d96fa749eea5ec9" dependencies = [ - "bytes 1.5.0", + "bytes 1.7.2", "futures-channel", "futures-core", "futures-util", @@ -3032,7 +3035,7 @@ dependencies = [ "itoa", "pin-project-lite 0.2.13", "socket2 0.5.5", - "tokio 1.35.1", + "tokio 1.40.0", "tower-service", "tracing", "want", @@ -3040,11 +3043,11 @@ dependencies = [ [[package]] name = "hyper" -version = "1.3.1" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d" +checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" dependencies = [ - "bytes 1.5.0", + "bytes 1.7.2", "futures-channel", "futures-util", "h2 0.4.5", @@ -3055,7 +3058,7 @@ dependencies = [ "itoa", "pin-project-lite 0.2.13", "smallvec", - "tokio 1.35.1", + "tokio 1.40.0", "want", ] @@ -3071,7 +3074,7 @@ dependencies = [ "log", "rustls 0.21.12", "rustls-native-certs", - "tokio 1.35.1", + "tokio 1.40.0", "tokio-rustls 0.24.1", ] @@ -3083,11 +3086,11 @@ checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" dependencies = [ "futures-util", "http 1.1.0", - "hyper 1.3.1", + "hyper 1.4.1", "hyper-util", "rustls 0.22.4", "rustls-pki-types", - "tokio 1.35.1", + "tokio 1.40.0", "tokio-rustls 0.25.0", "tower-service", "webpki-roots 0.26.3", @@ -3099,10 +3102,10 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ - "bytes 1.5.0", + "bytes 1.7.2", "hyper 0.14.30", "native-tls", - "tokio 1.35.1", + "tokio 1.40.0", "tokio-native-tls", ] @@ -3112,32 +3115,31 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" dependencies = [ - "bytes 1.5.0", + "bytes 1.7.2", "http-body-util", - "hyper 1.3.1", + "hyper 1.4.1", "hyper-util", "native-tls", - "tokio 1.35.1", + "tokio 1.40.0", "tokio-native-tls", "tower-service", ] [[package]] name = "hyper-util" -version = "0.1.5" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b875924a60b96e5d7b9ae7b066540b1dd1cbd90d1828f54c92e02a283351c56" +checksum = "41296eb09f183ac68eec06e03cdbea2e759633d4067b2f6552fc2e009bcad08b" dependencies = [ - "bytes 1.5.0", + "bytes 1.7.2", "futures-channel", "futures-util", "http 1.1.0", "http-body 1.0.0", - "hyper 1.3.1", + "hyper 1.4.1", "pin-project-lite 0.2.13", "socket2 0.5.5", - "tokio 1.35.1", - "tower", + "tokio 1.40.0", "tower-service", "tracing", ] @@ -3339,7 +3341,7 @@ version = "1.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "334e04b4d781f436dc315cb1e7515bd96826426345d498149e4bde36b67f8ee9" dependencies = [ - "async-channel 1.6.1", + "async-channel", "castaway", "crossbeam-utils", "curl", @@ -4020,13 +4022,14 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.10" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" dependencies = [ + "hermit-abi 0.3.9", "libc", "wasi", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -4044,7 +4047,7 @@ dependencies = [ "log", "metrics", "thiserror", - "tokio 1.35.1", + "tokio 1.40.0", "tracing", "tracing-subscriber", ] @@ -4120,7 +4123,7 @@ dependencies = [ "strsim 0.10.0", "take_mut", "thiserror", - "tokio 1.35.1", + "tokio 1.40.0", "tokio-rustls 0.23.4", "tokio-util 0.7.2", "trust-dns-proto", @@ -4136,7 +4139,7 @@ version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f8f35e687561d5c1667590911e6698a8cb714a134a7505718a182e7bc9d3836" dependencies = [ - "bytes 1.5.0", + "bytes 1.7.2", "encoding_rs", "futures-util", "http 0.2.12", @@ -4145,7 +4148,7 @@ dependencies = [ "memchr", "mime", "spin 0.9.8", - "tokio 1.35.1", + "tokio 1.40.0", "tokio-util 0.6.10", "version_check", ] @@ -4156,7 +4159,7 @@ version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "83e87776546dc87511aa5ee218730c92b666d7264ab6ed41f9d215af9cd5224b" dependencies = [ - "bytes 1.5.0", + "bytes 1.7.2", "encoding_rs", "futures-util", "http 1.1.0", @@ -4182,6 +4185,15 @@ dependencies = [ "rand 0.8.5", ] +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom", +] + [[package]] name = "native-tls" version = "0.2.10" @@ -4375,7 +4387,7 @@ version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1" dependencies = [ - "hermit-abi", + "hermit-abi 0.1.19", "libc", ] @@ -5331,7 +5343,7 @@ source = "git+https://github.com/revoltchat/redis-rs?rev=1a41faf356fd21aebba71ce dependencies = [ "async-std", "async-trait", - "bytes 1.5.0", + "bytes 1.7.2", "combine", "futures-util", "itoa", @@ -5339,7 +5351,7 @@ dependencies = [ "pin-project-lite 0.2.13", "ryu", "sha1_smol", - "tokio 1.35.1", + "tokio 1.40.0", "tokio-util 0.7.2", "url", ] @@ -5352,14 +5364,14 @@ checksum = "4f49cdc0bb3f412bf8e7d1bd90fe1d9eb10bc5c399ba90973c14662a27b3f8ba" dependencies = [ "async-std", "async-trait", - "bytes 1.5.0", + "bytes 1.7.2", "combine", "futures-util", "itoa", "percent-encoding", "pin-project-lite 0.2.13", "ryu", - "tokio 1.35.1", + "tokio 1.40.0", "tokio-util 0.7.2", "url", ] @@ -5386,7 +5398,7 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c31deddf734dc0a39d3112e73490e88b61a05e83e074d211f348404cee4d2c6" dependencies = [ - "bytes 1.5.0", + "bytes 1.7.2", "bytes-utils", "cookie-factory", "crc16", @@ -5480,7 +5492,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "46a1f7aa4f35e5e8b4160449f51afc758f0ce6454315a9fa7d0d113e958c41eb" dependencies = [ "base64 0.13.0", - "bytes 1.5.0", + "bytes 1.7.2", "encoding_rs", "futures-core", "futures-util", @@ -5500,7 +5512,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "tokio 1.35.1", + "tokio 1.40.0", "tokio-native-tls", "url", "wasm-bindgen", @@ -5516,7 +5528,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "566cafdd92868e0939d3fb961bd0dc25fcfaaed179291093b3d43e6b3150ea10" dependencies = [ "base64 0.22.1", - "bytes 1.5.0", + "bytes 1.7.2", "encoding_rs", "futures-core", "futures-util", @@ -5524,7 +5536,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.0", "http-body-util", - "hyper 1.3.1", + "hyper 1.4.1", "hyper-tls 0.6.0", "hyper-util", "ipnet", @@ -5541,7 +5553,7 @@ dependencies = [ "serde_urlencoded", "sync_wrapper 0.1.2", "system-configuration", - "tokio 1.35.1", + "tokio 1.40.0", "tokio-native-tls", "tower-service", "url", @@ -5605,7 +5617,7 @@ dependencies = [ "simdutf8", "strum_macros", "tempfile", - "tokio 1.35.1", + "tokio 1.40.0", "tower-http", "tracing", "tracing-subscriber", @@ -5619,13 +5631,15 @@ dependencies = [ name = "revolt-bonfire" version = "0.7.18" dependencies = [ - "async-channel 2.3.1", - "async-std", - "async-tungstenite", "authifier", "bincode", + "fastwebsockets", + "flume", "fred", "futures", + "http-body-util", + "hyper 1.4.1", + "hyper-util", "log", "lru 0.7.6", "lru_time_cache", @@ -5642,6 +5656,7 @@ dependencies = [ "sentry", "serde", "serde_json", + "tokio 1.40.0", "ulid 0.5.0", ] @@ -5714,7 +5729,7 @@ dependencies = [ name = "revolt-delta" version = "0.7.18" dependencies = [ - "async-channel 1.6.1", + "async-channel", "async-std", "authifier", "bitfield", @@ -5801,7 +5816,7 @@ dependencies = [ "serde", "serde_json", "tempfile", - "tokio 1.35.1", + "tokio 1.40.0", "tracing", "tracing-subscriber", "utoipa", @@ -5889,7 +5904,7 @@ dependencies = [ "erased-serde", "http 1.1.0", "http-body-util", - "hyper 1.3.1", + "hyper 1.4.1", "hyper-rustls 0.26.0", "hyper-util", "parking_lot", @@ -5900,7 +5915,7 @@ dependencies = [ "serde", "serde_json", "thiserror", - "tokio 1.35.1", + "tokio 1.40.0", ] [[package]] @@ -6043,7 +6058,7 @@ dependencies = [ "atomic", "atty", "binascii", - "bytes 1.5.0", + "bytes 1.7.2", "either", "figment", "futures", @@ -6063,7 +6078,7 @@ dependencies = [ "state", "tempfile", "time", - "tokio 1.35.1", + "tokio 1.40.0", "tokio-stream", "tokio-util 0.7.2", "ubyte", @@ -6153,7 +6168,7 @@ dependencies = [ "stable-pattern", "state", "time", - "tokio 1.35.1", + "tokio 1.40.0", "uncased", ] @@ -6597,7 +6612,7 @@ dependencies = [ "sentry-debug-images", "sentry-panic", "sentry-tracing", - "tokio 1.35.1", + "tokio 1.40.0", "ureq", ] @@ -6962,7 +6977,7 @@ version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d7400c0eff44aa2fcb5e31a5f24ba9716ed90138769e4977a2ba6014ae63eb5" dependencies = [ - "async-channel 1.6.1", + "async-channel", "futures-core", "futures-io", ] @@ -7427,28 +7442,27 @@ dependencies = [ [[package]] name = "tokio" -version = "1.35.1" +version = "1.40.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104" +checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998" dependencies = [ "backtrace", - "bytes 1.5.0", + "bytes 1.7.2", "libc", "mio", - "num_cpus", "parking_lot", "pin-project-lite 0.2.13", "signal-hook-registry", "socket2 0.5.5", "tokio-macros", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] name = "tokio-macros" -version = "2.2.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote 1.0.37", @@ -7462,7 +7476,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b" dependencies = [ "native-tls", - "tokio 1.35.1", + "tokio 1.40.0", ] [[package]] @@ -7472,7 +7486,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" dependencies = [ "rustls 0.20.6", - "tokio 1.35.1", + "tokio 1.40.0", "webpki", ] @@ -7483,7 +7497,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ "rustls 0.21.12", - "tokio 1.35.1", + "tokio 1.40.0", ] [[package]] @@ -7494,7 +7508,7 @@ checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" dependencies = [ "rustls 0.22.4", "rustls-pki-types", - "tokio 1.35.1", + "tokio 1.40.0", ] [[package]] @@ -7505,7 +7519,7 @@ checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" dependencies = [ "futures-core", "pin-project-lite 0.2.13", - "tokio 1.35.1", + "tokio 1.40.0", ] [[package]] @@ -7514,12 +7528,12 @@ version = "0.6.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507" dependencies = [ - "bytes 1.5.0", + "bytes 1.7.2", "futures-core", "futures-sink", "log", "pin-project-lite 0.2.13", - "tokio 1.35.1", + "tokio 1.40.0", ] [[package]] @@ -7528,12 +7542,12 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f988a1a1adc2fb21f9c12aa96441da33a1728193ae0b95d2be22dbd17fcb4e5c" dependencies = [ - "bytes 1.5.0", + "bytes 1.7.2", "futures-core", "futures-io", "futures-sink", "pin-project-lite 0.2.13", - "tokio 1.35.1", + "tokio 1.40.0", "tracing", ] @@ -7602,7 +7616,21 @@ dependencies = [ "futures-util", "pin-project", "pin-project-lite 0.2.13", - "tokio 1.35.1", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite 0.2.13", + "sync_wrapper 0.1.2", + "tokio 1.40.0", "tower-layer", "tower-service", "tracing", @@ -7615,7 +7643,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" dependencies = [ "bitflags 2.6.0", - "bytes 1.5.0", + "bytes 1.7.2", "http 1.1.0", "http-body 1.0.0", "http-body-util", @@ -7626,15 +7654,15 @@ dependencies = [ [[package]] name = "tower-layer" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-service" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" @@ -7735,7 +7763,7 @@ dependencies = [ "smallvec", "thiserror", "tinyvec", - "tokio 1.35.1", + "tokio 1.40.0", "url", ] @@ -7755,7 +7783,7 @@ dependencies = [ "resolv-conf", "smallvec", "thiserror", - "tokio 1.35.1", + "tokio 1.40.0", "trust-dns-proto", ] @@ -7774,25 +7802,6 @@ dependencies = [ "core_maths", ] -[[package]] -name = "tungstenite" -version = "0.17.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d96a2dea40e7570482f28eb57afbe42d97551905da6a9400acc5c328d24004f5" -dependencies = [ - "base64 0.13.0", - "byteorder", - "bytes 1.5.0", - "http 0.2.12", - "httparse", - "log", - "rand 0.8.5", - "sha-1 0.10.0", - "thiserror", - "url", - "utf-8", -] - [[package]] name = "typed-builder" version = "0.10.0" @@ -8762,7 +8771,7 @@ dependencies = [ "serde", "serde_json", "time", - "tokio 1.35.1", + "tokio 1.40.0", "tower-service", "url", ] diff --git a/crates/bonfire/Cargo.toml b/crates/bonfire/Cargo.toml index 10f935c76..91ce93079 100644 --- a/crates/bonfire/Cargo.toml +++ b/crates/bonfire/Cargo.toml @@ -15,7 +15,7 @@ ulid = "0.5.0" once_cell = "1.9.0" redis-kiss = "0.1.4" lru_time_cache = "0.11.11" -async-channel = "2.3.1" +flume = { version = "0.11.0", features = ["async"] } # parsing querystring = "1.1.0" @@ -28,12 +28,11 @@ serde = "1.0.136" # async futures = "0.3.21" -async-tungstenite = { version = "0.17.0", features = ["async-std-runtime"] } -async-std = { version = "1.8.0", features = [ - "tokio1", - "tokio02", - "attributes", -] } +fastwebsockets = { version = "0.8.0", features = ["upgrade", "unstable-split"] } +tokio = { version = "1.40.0", features = ["macros", "rt-multi-thread", "net"] } +hyper = "1.4.1" +http-body-util = "0.1.2" +hyper-util = "0.1.9" # core authifier = { version = "1.0.8" } diff --git a/crates/bonfire/src/config.rs b/crates/bonfire/src/config.rs index 5642d3baa..611a21753 100644 --- a/crates/bonfire/src/config.rs +++ b/crates/bonfire/src/config.rs @@ -1,4 +1,4 @@ -use async_tungstenite::tungstenite::{handshake, Message}; +use fastwebsockets::{Frame, OpCode, Payload, upgrade::upgrade}; use futures::channel::oneshot::Sender; use revolt_database::events::client::ReadyPayloadFields; use revolt_result::{create_error, Result}; @@ -14,9 +14,9 @@ pub enum ProtocolFormat { /// User-provided protocol configuration #[derive(Debug)] pub struct ProtocolConfiguration { - protocol_version: i32, - format: ProtocolFormat, - session_token: Option, + pub(crate) protocol_version: i32, + pub(crate) format: ProtocolFormat, + pub(crate) session_token: Option, } impl ProtocolConfiguration { @@ -34,18 +34,18 @@ impl ProtocolConfiguration { } /// Decode some WebSocket message into a T: Deserialize using the client's specified protocol format - pub fn decode<'a, T: Deserialize<'a>>(&self, msg: &'a Message) -> Result { + pub fn decode<'a, T: Deserialize<'a>>(&self, frame: &'a Frame) -> Result { match self.format { ProtocolFormat::Json => { - if let Message::Text(text) = msg { - serde_json::from_str(text).map_err(|_| create_error!(InternalError)) + if let OpCode::Text = frame.opcode { + serde_json::from_slice(&frame.payload).map_err(|_| create_error!(InternalError)) } else { Err(create_error!(InternalError)) } } ProtocolFormat::Msgpack => { - if let Message::Binary(buf) = msg { - rmp_serde::from_slice(buf).map_err(|_| create_error!(InternalError)) + if let OpCode::Binary = frame.opcode { + rmp_serde::from_slice(&frame.payload).map_err(|_| create_error!(InternalError)) } else { Err(create_error!(InternalError)) } @@ -54,14 +54,16 @@ impl ProtocolConfiguration { } /// Encode T: Serialize into a WebSocket message using the client's specified protocol format - pub fn encode(&self, data: &T) -> Message { + pub fn encode(&self, data: &T) -> Frame { match self.format { ProtocolFormat::Json => { - Message::Text(serde_json::to_string(data).expect("Failed to serialise (as json).")) + let payload = serde_json::to_vec(data).expect("Failed to serialise (as json)."); + Frame::new(true, OpCode::Text, None, Payload::Owned(payload)) + } + ProtocolFormat::Msgpack => { + let payload = rmp_serde::to_vec_named(data).expect("Failed to serialise (as msgpack)."); + Frame::new(true, OpCode::Binary, None, Payload::Owned(payload)) } - ProtocolFormat::Msgpack => Message::Binary( - rmp_serde::to_vec_named(data).expect("Failed to serialise (as msgpack)."), - ), } } @@ -96,67 +98,3 @@ impl ProtocolConfiguration { ] } } - -/// Object holding one side of a channel for receiving the parsed information -pub struct WebsocketHandshakeCallback { - sender: Sender, -} - -impl WebsocketHandshakeCallback { - /// Create a callback using a given sender - pub fn from(sender: Sender) -> Self { - Self { sender } - } -} - -impl handshake::server::Callback for WebsocketHandshakeCallback { - /// Handle request to create a new WebSocket connection - fn on_request( - self, - request: &handshake::server::Request, - response: handshake::server::Response, - ) -> Result { - // Take and parse query parameters from the URI. - let query = request.uri().query().unwrap_or_default(); - let params = querystring::querify(query); - - // Set default values for the protocol. - let mut protocol_version = 1; - let mut format = ProtocolFormat::Json; - let mut session_token = None; - - // Parse and map parameters from key-value to known variables. - for (key, value) in params { - match key { - "version" => { - if let Ok(version) = value.parse() { - protocol_version = version; - } - } - "format" => match value { - "json" => format = ProtocolFormat::Json, - "msgpack" => format = ProtocolFormat::Msgpack, - _ => {} - }, - "token" => session_token = Some(value.into()), - _ => {} - } - } - - // Send configuration information back from this callback. - // We have to use a channel as this function does not borrow mutably. - if self - .sender - .send(ProtocolConfiguration { - protocol_version, - format, - session_token, - }) - .is_ok() - { - Ok(response) - } else { - Err(handshake::server::ErrorResponse::new(None)) - } - } -} diff --git a/crates/bonfire/src/events/state.rs b/crates/bonfire/src/events/state.rs index eae398019..41ab2b428 100644 --- a/crates/bonfire/src/events/state.rs +++ b/crates/bonfire/src/events/state.rs @@ -4,7 +4,7 @@ use std::{ time::Duration, }; -use async_std::sync::{Mutex, RwLock}; +use tokio::sync::{Mutex, RwLock}; use lru::LruCache; use lru_time_cache::{LruCache as LruTimeCache, TimedEntry}; use revolt_database::{Channel, Member, Server, User}; diff --git a/crates/bonfire/src/main.rs b/crates/bonfire/src/main.rs index 62766cf77..d641efa8d 100644 --- a/crates/bonfire/src/main.rs +++ b/crates/bonfire/src/main.rs @@ -1,7 +1,16 @@ use std::env; +use fastwebsockets::upgrade::{IncomingUpgrade, upgrade}; +use fastwebsockets::{WebSocket, WebSocketError}; +use http_body_util::Empty; +use hyper::server::conn::http1; +use hyper::body::{Bytes, Incoming}; +use hyper::{Request, Response}; +use hyper::service::service_fn; -use async_std::net::TcpListener; +use tokio::net::TcpListener; use revolt_presence::clear_region; +use crate::config::{ProtocolConfiguration, ProtocolFormat}; +use crate::websocket::client; #[macro_use] extern crate log; @@ -12,7 +21,7 @@ pub mod events; mod database; mod websocket; -#[async_std::main] +#[tokio::main] async fn main() { // Configure requirements for Bonfire. revolt_config::configure!(events); @@ -30,10 +39,55 @@ async fn main() { // Start accepting new connections and spawn a client for each connection. while let Ok((stream, addr)) = listener.accept().await { - async_std::task::spawn(async move { + tokio::task::spawn(async move { + let io = hyper_util::rt::TokioIo::new(stream); + let conn_fut = http1::Builder::new() + .serve_connection(io, service_fn(server_upgrade)) + .with_upgrades(); info!("User connected from {addr:?}"); - websocket::client(database::get_db(), stream, addr).await; + conn_fut.await.unwrap(); info!("User disconnected from {addr:?}"); }); } } + +async fn server_upgrade(mut req: Request) -> Result>, WebSocketError> { + // Take and parse query parameters from the URI. + let query = req.uri().query().unwrap_or_default(); + let params = querystring::querify(query); + + // Set default values for the protocol. + let mut protocol_version = 1; + let mut format = ProtocolFormat::Json; + let mut session_token = None; + + // Parse and map parameters from key-value to known variables. + for (key, value) in params { + match key { + "version" => { + if let Ok(version) = value.parse() { + protocol_version = version; + } + } + "format" => match value { + "json" => format = ProtocolFormat::Json, + "msgpack" => format = ProtocolFormat::Msgpack, + _ => {} + }, + "token" => session_token = Some(value.into()), + _ => {} + } + } + + let (response, fut) = upgrade(&mut req)?; + + tokio::task::spawn(async move { + client(database::get_db(), fut, ProtocolConfiguration { + protocol_version, + format, + session_token + }).await; + }); + + Ok(response) +} diff --git a/crates/bonfire/src/websocket.rs b/crates/bonfire/src/websocket.rs index e932fb3a4..7b5e8e424 100644 --- a/crates/bonfire/src/websocket.rs +++ b/crates/bonfire/src/websocket.rs @@ -1,18 +1,22 @@ -use std::{collections::HashSet, net::SocketAddr, sync::Arc}; +use std::{collections::HashSet, sync::Arc}; -use async_tungstenite::WebSocketStream; +use fastwebsockets::upgrade::UpgradeFut; use authifier::AuthifierEvent; +use fastwebsockets::{FragmentCollectorRead, Frame, WebSocket, WebSocketError, WebSocketRead, WebSocketWrite}; use fred::{ error::{RedisError, RedisErrorKind}, interfaces::{ClientLike, EventInterface, PubsubInterface}, types::RedisConfig, }; +use flume::bounded; +use tokio::select; use futures::{ - channel::oneshot, - join, pin_mut, select, + FutureExt, + join, pin_mut, stream::{SplitSink, SplitStream}, - FutureExt, SinkExt, StreamExt, TryStreamExt, }; +use hyper::upgrade::Upgraded; +use hyper_util::rt::TokioIo; use redis_kiss::{PayloadType, REDIS_PAYLOAD_TYPE, REDIS_URI}; use revolt_config::report_internal_error; use revolt_database::{ @@ -21,55 +25,47 @@ use revolt_database::{ }; use revolt_presence::{create_session, delete_session}; -use async_std::{ +use tokio::{ net::TcpStream, sync::{Mutex, RwLock}, task::spawn, }; use revolt_result::create_error; use sentry::Level; +use tokio::io::{ReadHalf, WriteHalf}; -use crate::config::{ProtocolConfiguration, WebsocketHandshakeCallback}; +use crate::config::{ProtocolConfiguration}; use crate::events::state::{State, SubscriptionStateChange}; -type WsReader = SplitStream>; -type WsWriter = SplitSink, async_tungstenite::tungstenite::Message>; +type Ws = TokioIo; -/// Start a new WebSocket client worker given access to the database, -/// the relevant TCP stream and the remote address of the client. -pub async fn client(db: &'static Database, stream: TcpStream, addr: SocketAddr) { +type WsReader = FragmentCollectorRead>; +type WsWriter = WebSocketWrite>; + +/// Start a new WebSocket client worker given access to the database +/// and upgrade future +pub async fn client(db: &'static Database, fut: UpgradeFut, mut config: ProtocolConfiguration) { + let mut ws = fut.await.expect("Could not upgrade"); // Upgrade the TCP connection to a WebSocket connection. // In this process, we also parse any additional parameters given. // e.g. wss://example.com?format=json&version=1 - let (sender, receiver) = oneshot::channel(); - let Ok(ws) = async_tungstenite::accept_hdr_async_with_config( - stream, - WebsocketHandshakeCallback::from(sender), - None, - ) - .await - else { - return; - }; - - // Verify we've received a valid config, otherwise we should just drop the connection. - let Ok(mut config) = receiver.await else { - return; - }; info!( - "User {addr:?} provided protocol configuration (version = {}, format = {:?})", + "User provided protocol configuration (version = {}, format = {:?})", config.get_protocol_version(), config.get_protocol_format() ); // Split the socket for simultaneously read and write. - let (mut write, mut read) = ws.split(); + let (read, mut write) = ws.split(tokio::io::split); + let mut read = FragmentCollectorRead::new(read); // If the user has not provided authentication, request information. if config.get_session_token().is_none() { - while let Ok(Some(message)) = read.try_next().await { - if let Ok(ClientMessage::Authenticate { token }) = config.decode(&message) { + while let Ok(frame) = read.read_frame::<_, WebSocketError>(&mut move |_| async { + unreachable!(); + }).await { + if let Ok(ClientMessage::Authenticate { token }) = config.decode(&frame) { config.set_session_token(token); break; } @@ -79,7 +75,7 @@ pub async fn client(db: &'static Database, stream: TcpStream, addr: SocketAddr) // Try to authenticate the user. let Some(token) = config.get_session_token().as_ref() else { write - .send(config.encode(&create_error!(InvalidSession))) + .write_frame(config.encode(&create_error!(InvalidSession))) .await .ok(); return; @@ -88,19 +84,17 @@ pub async fn client(db: &'static Database, stream: TcpStream, addr: SocketAddr) let (user, session_id) = match User::from_token(db, token, UserHint::Any).await { Ok(user) => user, Err(err) => { - write.send(config.encode(&err)).await.ok(); + write.write_frame(config.encode(&err)).await.ok(); return; } }; - info!("User {addr:?} authenticated as @{}", user.username); - // Create local state. let mut state = State::from(user, session_id); let user_id = state.cache.user_id.clone(); // Notify socket we have authenticated. - if report_internal_error!(write.send(config.encode(&EventV1::Authenticated)).await).is_err() { + if report_internal_error!(write.write_frame(config.encode(&EventV1::Authenticated)).await).is_err() { return; } @@ -114,7 +108,7 @@ pub async fn client(db: &'static Database, stream: TcpStream, addr: SocketAddr) Err(_) => return, }; - if report_internal_error!(write.send(config.encode(&ready_payload)).await).is_err() { + if report_internal_error!(write.write_frame(config.encode(&ready_payload)).await).is_err() { return; } @@ -131,18 +125,17 @@ pub async fn client(db: &'static Database, stream: TcpStream, addr: SocketAddr) let write = Mutex::new(write); let subscribed = state.subscribed.clone(); let active_servers = state.active_servers.clone(); - let (topic_signal_s, topic_signal_r) = async_channel::unbounded(); + let (topic_signal_s, topic_signal_r) = flume::unbounded(); // TODO: this needs to be rewritten // Create channels through which the tasks can signal to each other they need to clean up - let (kill_signal_1_s, kill_signal_1_r) = async_channel::bounded(1); - let (kill_signal_2_s, kill_signal_2_r) = async_channel::bounded(1); + let (kill_signal_1_s, kill_signal_1_r) = bounded(1); + let (kill_signal_2_s, kill_signal_2_r) = bounded(1); // Create a PubSub connection to poll on. let listener = listener_with_kill_signal( db, &mut state, - addr, &config, topic_signal_r, kill_signal_1_r, @@ -152,7 +145,6 @@ pub async fn client(db: &'static Database, stream: TcpStream, addr: SocketAddr) // Read from WebSocket stream. let worker = worker_with_kill_signal( - addr, subscribed, active_servers, user_id.clone(), @@ -179,33 +171,30 @@ pub async fn client(db: &'static Database, stream: TcpStream, addr: SocketAddr) async fn listener_with_kill_signal( db: &'static Database, state: &mut State, - addr: SocketAddr, config: &ProtocolConfiguration, - topic_signal_r: async_channel::Receiver<()>, - kill_signal_r: async_channel::Receiver<()>, + topic_signal_r: flume::Receiver<()>, + kill_signal_r: flume::Receiver<()>, write: &Mutex, - kill_signal_s: async_channel::Sender<()>, + kill_signal_s: flume::Sender<()>, ) { listener( db, state, - addr, config, topic_signal_r, kill_signal_r, write, ) .await; - kill_signal_s.send(()).await.ok(); + kill_signal_s.send_async(()).await.ok(); } async fn listener( db: &'static Database, state: &mut State, - addr: SocketAddr, config: &ProtocolConfiguration, - topic_signal_r: async_channel::Receiver<()>, - kill_signal_r: async_channel::Receiver<()>, + topic_signal_r: flume::Receiver<()>, + kill_signal_r: flume::Receiver<()>, write: &Mutex, ) { let redis_config = RedisConfig::from_url(&REDIS_URI).unwrap(); @@ -221,13 +210,13 @@ async fn listener( } // Handle Redis connection dropping - let (clean_up_s, clean_up_r) = async_channel::bounded(1); + let (clean_up_s, clean_up_r) = bounded(1); let clean_up_s = Arc::new(Mutex::new(clean_up_s)); subscriber.on_error(move |err| { if let RedisErrorKind::Canceled = err.kind() { let clean_up_s = clean_up_s.clone(); spawn(async move { - clean_up_s.lock().await.send(()).await.ok(); + clean_up_s.lock().await.send_async(()).await.ok(); }); } @@ -250,13 +239,9 @@ async fn listener( } } - #[cfg(debug_assertions)] - info!("{addr:?} has reset their subscriptions"); } SubscriptionStateChange::Change { add, remove } => { for id in remove { - #[cfg(debug_assertions)] - info!("{addr:?} unsubscribing from {id}"); if report_internal_error!(subscriber.unsubscribe(id).await).is_err() { break 'out; @@ -264,8 +249,6 @@ async fn listener( } for id in add { - #[cfg(debug_assertions)] - info!("{addr:?} subscribing to {id}"); if report_internal_error!(subscriber.subscribe(id).await).is_err() { break 'out; @@ -276,9 +259,9 @@ async fn listener( } let t1 = message_rx.recv().fuse(); - let t2 = topic_signal_r.recv().fuse(); - let t3 = kill_signal_r.recv().fuse(); - let t4 = clean_up_r.recv().fuse(); + let t2 = topic_signal_r.recv_async().fuse(); + let t3 = kill_signal_r.recv_async().fuse(); + let t4 = clean_up_r.recv_async().fuse(); pin_mut!(t1, t2, t3, t4); @@ -351,11 +334,11 @@ async fn listener( } } - let result = write.lock().await.send(config.encode(&event)).await; + let result = write.lock().await.write_frame(config.encode(&event)).await; if let Err(e) = result { - use async_tungstenite::tungstenite::Error; - if !matches!(e, Error::AlreadyClosed | Error::ConnectionClosed) { - let err = format!("Error while sending an event to {addr:?}: {e:?}"); + use fastwebsockets::WebSocketError; + if !matches!(e, WebSocketError::ConnectionClosed) { + let err = format!("Error while sending an event to client: {e:?}"); warn!("{}", err); sentry::capture_message(&err, Level::Warning); } @@ -364,7 +347,6 @@ async fn listener( } if let EventV1::Logout = event { - info!("User {addr:?} received log out event!"); break 'out; } } @@ -376,19 +358,17 @@ async fn listener( #[allow(clippy::too_many_arguments)] async fn worker_with_kill_signal( - addr: SocketAddr, subscribed: Arc>>, active_servers: Arc>>, user_id: String, config: &ProtocolConfiguration, - topic_signal_s: async_channel::Sender<()>, - kill_signal_r: async_channel::Receiver<()>, + topic_signal_s: flume::Sender<()>, + kill_signal_r: flume::Receiver<()>, read: WsReader, write: &Mutex, - kill_signal_s: async_channel::Sender<()>, + kill_signal_s: flume::Sender<()>, ) { worker( - addr, subscribed, active_servers, user_id, @@ -399,24 +379,26 @@ async fn worker_with_kill_signal( write, ) .await; - kill_signal_s.send(()).await.ok(); + kill_signal_s.send_async(()).await.ok(); } #[allow(clippy::too_many_arguments)] async fn worker( - addr: SocketAddr, subscribed: Arc>>, active_servers: Arc>>, user_id: String, config: &ProtocolConfiguration, - topic_signal_s: async_channel::Sender<()>, - kill_signal_r: async_channel::Receiver<()>, + topic_signal_s: flume::Sender<()>, + kill_signal_r: flume::Receiver<()>, mut read: WsReader, write: &Mutex, ) { + let send_fn = &mut move |_| async { + unreachable!(); + }; loop { - let t1 = read.try_next().fuse(); - let t2 = kill_signal_r.recv().fuse(); + let t1 = read.read_frame::<_, WebSocketError>(send_fn); + let t2 = kill_signal_r.recv_async().fuse(); pin_mut!(t1, t2); @@ -426,16 +408,11 @@ async fn worker( }, result = t1 => { let msg = match result { - Ok(Some(msg)) => msg, - Ok(None) => { - warn!("Received a None message!"); - sentry::capture_message("Received a None message!", Level::Warning); - return; - } + Ok(msg) => msg, Err(e) => { - use async_tungstenite::tungstenite::Error; - if !matches!(e, Error::AlreadyClosed | Error::ConnectionClosed) { - let err = format!("Error while reading an event from {addr:?}: {e:?}"); + use fastwebsockets::WebSocketError; + if !matches!(e, WebSocketError::ConnectionClosed) { + let err = format!("Error while reading an event from client: {e:?}"); warn!("{}", err); sentry::capture_message(&err, Level::Warning); } @@ -480,7 +457,7 @@ async fn worker( if !has_item { // Poke the listener to adjust subscriptions - topic_signal_s.send(()).await.ok(); + topic_signal_s.send_async(()).await.ok(); } } ClientMessage::Ping { data, responded } => { @@ -488,7 +465,7 @@ async fn worker( write .lock() .await - .send(config.encode(&EventV1::Pong { data })) + .write_frame(config.encode(&EventV1::Pong { data })) .await .ok(); }