diff --git a/Cargo.lock b/Cargo.lock index ce167030c351c..0bdcd68fb9a9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -646,6 +646,40 @@ dependencies = [ "futures-lite", ] +[[package]] +name = "async-nats" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8257238e2a3629ee5618502a75d1b91f8017c24638c75349fc8d2d80cf1f7c4c" +dependencies = [ + "base64 0.21.2", + "bytes 1.4.0", + "futures 0.3.28", + "http", + "itoa", + "memchr", + "nkeys", + "nuid", + "once_cell", + "rand 0.8.5", + "regex", + "ring", + "rustls-native-certs", + "rustls-pemfile", + "rustls-webpki", + "serde", + "serde_json", + "serde_nanos", + "serde_repr", + "thiserror", + "time", + "tokio", + "tokio-retry", + "tokio-rustls 0.24.0", + "tracing 0.1.37", + "url", +] + [[package]] name = "async-net" version = "1.7.0" @@ -1464,15 +1498,6 @@ dependencies = [ "simd-abstraction", ] -[[package]] -name = "base64-url" -version = "1.4.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67a99c239d0c7e77c85dddfa9cebce48704b3c49550fcd3b84dd637e4484899f" -dependencies = [ - "base64 0.13.1", -] - [[package]] name = "base64ct" version = "1.5.3" @@ -1599,8 +1624,8 @@ dependencies = [ "log", "pin-project-lite", "rustls 0.20.7", - "rustls-native-certs 0.6.2", - "rustls-pemfile 1.0.1", + "rustls-native-certs", + "rustls-pemfile", "serde", "serde_derive", "serde_json", @@ -1610,7 +1635,7 @@ dependencies = [ "tokio", "tokio-util", "url", - "webpki 0.22.0", + "webpki", "webpki-roots", "winapi", ] @@ -4180,7 +4205,7 @@ dependencies = [ "hyper", "log", "rustls 0.20.7", - "rustls-native-certs 0.6.2", + "rustls-native-certs", "tokio", "tokio-rustls 0.23.4", ] @@ -4193,7 +4218,7 @@ checksum = "0646026eb1b3eea4cd9ba47912ea5ce9cc07713d105b1a14698f4e6433d348b7" dependencies = [ "http", "hyper", - "rustls 0.21.0", + "rustls 0.21.6", "tokio", "tokio-rustls 0.24.0", ] @@ -4521,12 +4546,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "json" -version = "0.12.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "078e285eafdfb6c4b434e0d31e8cfcb5115b651496faca5749b88fafd4f23bfd" - [[package]] name = "json-patch" version = "1.0.0" @@ -5256,7 +5275,7 @@ dependencies = [ "rand 0.8.5", "rustc_version_runtime", "rustls 0.20.7", - "rustls-pemfile 1.0.1", + "rustls-pemfile", "serde", "serde_bytes", "serde_with 1.14.0", @@ -5319,42 +5338,6 @@ dependencies = [ "tempfile", ] -[[package]] -name = "nats" -version = "0.24.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2eebb39ba0555bcf232817d42ed3346499f14f6f8d4de1c0ca4517bda99c1a7b" -dependencies = [ - "base64 0.13.1", - "base64-url", - "blocking", - "crossbeam-channel", - "fastrand", - "itoa", - "json", - "lazy_static", - "libc", - "log", - "memchr", - "nkeys 0.2.0", - "nuid", - "once_cell", - "parking_lot", - "regex", - "ring", - "rustls 0.19.1", - "rustls-native-certs 0.5.0", - "rustls-pemfile 0.2.1", - "serde", - "serde_json", - "serde_nanos", - "serde_repr", - "time", - "url", - "webpki 0.21.4", - "winapi", -] - [[package]] name = "ndarray" version = "0.15.6" @@ -5429,21 +5412,6 @@ dependencies = [ "static_assertions", ] -[[package]] -name = "nkeys" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e66a7cd1358277b2a6f77078e70aea7315ff2f20db969cc61153103ec162594" -dependencies = [ - "byteorder", - "data-encoding", - "ed25519-dalek", - "getrandom 0.2.10", - "log", - "rand 0.8.5", - "signatory", -] - [[package]] name = "nkeys" version = "0.3.1" @@ -7076,8 +7044,8 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls 0.21.0", - "rustls-pemfile 1.0.1", + "rustls 0.21.6", + "rustls-pemfile", "serde", "serde_json", "serde_urlencoded", @@ -7306,19 +7274,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "rustls" -version = "0.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7" -dependencies = [ - "base64 0.13.1", - "log", - "ring", - "sct 0.6.1", - "webpki 0.21.4", -] - [[package]] name = "rustls" version = "0.20.7" @@ -7327,32 +7282,20 @@ checksum = "539a2bfe908f471bfa933876bd1eb6a19cf2176d375f82ef7f99530a40e48c2c" dependencies = [ "log", "ring", - "sct 0.7.0", - "webpki 0.22.0", + "sct", + "webpki", ] [[package]] name = "rustls" -version = "0.21.0" +version = "0.21.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07180898a28ed6a7f7ba2311594308f595e3dd2e3c3812fa0a80a47b45f17e5d" +checksum = "1d1feddffcfcc0b33f5c6ce9a29e341e4cd59c3f78e7ee45f4a40c038b1d6cbb" dependencies = [ "log", "ring", "rustls-webpki", - "sct 0.7.0", -] - -[[package]] -name = "rustls-native-certs" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a07b7c1885bd8ed3831c289b7870b13ef46fe0e856d288c30d9cc17d75a2092" -dependencies = [ - "openssl-probe", - "rustls 0.19.1", - "schannel", - "security-framework", + "sct", ] [[package]] @@ -7362,34 +7305,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0167bac7a9f490495f3c33013e7722b53cb087ecbe082fb0c6387c96f634ea50" dependencies = [ "openssl-probe", - "rustls-pemfile 1.0.1", + "rustls-pemfile", "schannel", "security-framework", ] [[package]] name = "rustls-pemfile" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5eebeaeb360c87bfb72e84abdb3447159c0eaececf1bef2aecd65a8be949d1c9" -dependencies = [ - "base64 0.13.1", -] - -[[package]] -name = "rustls-pemfile" -version = "1.0.1" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0864aeff53f8c05aa08d86e5ef839d3dfcf07aeba2db32f12db0ef716e87bd55" +checksum = "2d3987094b1d07b653b7dfdc3f70ce9a1da9c51ac18c1b06b662e4f9a0e9f4b2" dependencies = [ - "base64 0.13.1", + "base64 0.21.2", ] [[package]] name = "rustls-webpki" -version = "0.100.1" +version = "0.101.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6207cd5ed3d8dca7816f8f3725513a34609c0c765bf652b8c3cb4cfd87db46b" +checksum = "513722fd73ad80a71f72b61009ea1b584bcfa1483ca93949c8f290298837fa59" dependencies = [ "ring", "untrusted", @@ -7506,16 +7440,6 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8132065adcfd6e02db789d9285a0deb2f3fcb04002865ab67d5fb103533898" -[[package]] -name = "sct" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "sct" version = "0.7.0" @@ -7673,9 +7597,9 @@ dependencies = [ [[package]] name = "serde_nanos" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e44969a61f5d316be20a42ff97816efb3b407a924d06824c3d8a49fa8450de0e" +checksum = "8ae801b7733ca8d6a2b580debe99f67f36826a0f5b8a36055dc6bc40f8d6bc71" dependencies = [ "serde", ] @@ -7702,13 +7626,13 @@ dependencies = [ [[package]] name = "serde_repr" -version = "0.1.9" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fe39d9fbb0ebf5eb2c7cb7e2a47e4f462fad1379f1166b8ae49ad9eae89a7ca" +checksum = "8725e1dfadb3a50f7e5ce0b1a540466f6ed3fe7a0fca2ac2b8b831d31316bd00" dependencies = [ "proc-macro2 1.0.66", "quote 1.0.32", - "syn 1.0.109", + "syn 2.0.28", ] [[package]] @@ -8342,7 +8266,7 @@ checksum = "09a4b0a70bac0a58ca6a7659d1328e34ee462339c70b0fa49f72bad1f278910a" dependencies = [ "cfg-if", "native-tls", - "rustls-pemfile 1.0.1", + "rustls-pemfile", ] [[package]] @@ -8484,9 +8408,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.17" +version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a561bf4617eebd33bca6434b988f39ed798e527f51a1e797d0ee4f61c0a38376" +checksum = "59e399c068f43a5d116fedaf73b203fa4f9c519f17e2b34f63221d3792f81446" dependencies = [ "itoa", "libc", @@ -8498,15 +8422,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.0" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e153e1f1acaef8acc537e68b44906d2db6436e2b35ac2c6b42640fff91f00fd" +checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" [[package]] name = "time-macros" -version = "0.2.6" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d967f99f534ca7e495c575c62638eebc2898a8c84c119b89e250477bc4ba16b2" +checksum = "96ba15a897f3c86766b757e5ac7221554c6750054d74d5b28844fce5fb36a6c4" dependencies = [ "time-core", ] @@ -8662,7 +8586,7 @@ checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" dependencies = [ "rustls 0.20.7", "tokio", - "webpki 0.22.0", + "webpki", ] [[package]] @@ -8671,7 +8595,7 @@ version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e0d409377ff5b1e3ca6437aa86c1eb7d40c134bfec254e44c830defa92669db5" dependencies = [ - "rustls 0.21.0", + "rustls 0.21.6", "tokio", ] @@ -8720,7 +8644,7 @@ checksum = "2b2dbec703c26b00d74844519606ef15d09a7d6857860f84ad223dec002ddea2" dependencies = [ "futures-util", "log", - "rustls 0.21.0", + "rustls 0.21.6", "tokio", "tungstenite 0.20.0", ] @@ -8805,8 +8729,8 @@ dependencies = [ "percent-encoding", "pin-project", "prost", - "rustls-native-certs 0.6.2", - "rustls-pemfile 1.0.1", + "rustls-native-certs", + "rustls-pemfile", "tokio", "tokio-rustls 0.24.0", "tokio-stream", @@ -9479,6 +9403,7 @@ dependencies = [ "async-compression", "async-graphql 6.0.0", "async-graphql-warp", + "async-nats", "async-stream", "async-trait", "atty", @@ -9566,9 +9491,8 @@ dependencies = [ "metrics-tracing-context", "mlua", "mongodb", - "nats", "nix 0.26.2", - "nkeys 0.3.1", + "nkeys", "nom", "notify", "num-format", @@ -10125,7 +10049,7 @@ dependencies = [ "mime_guess", "percent-encoding", "pin-project", - "rustls-pemfile 1.0.1", + "rustls-pemfile", "scoped-tls", "serde", "serde_json", @@ -10256,16 +10180,6 @@ dependencies = [ "web-sys", ] -[[package]] -name = "webpki" -version = "0.21.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "webpki" version = "0.22.0" @@ -10282,7 +10196,7 @@ version = "0.22.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "368bfe657969fb01238bb756d351dcade285e0f6fcbd36dcb23359a5169975be" dependencies = [ - "webpki 0.22.0", + "webpki", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 78f416829b22a..20a3948a620e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -287,7 +287,7 @@ lru = { version = "0.11.0", default-features = false, optional = true } maxminddb = { version = "0.23.0", default-features = false, optional = true } md-5 = { version = "0.10", default-features = false, optional = true } mongodb = { version = "2.6.0", default-features = false, features = ["tokio-runtime"], optional = true } -nats = { version = "0.24.0", default-features = false, optional = true } +async-nats = { version = "0.31.0", default-features = false, optional = true } nkeys = { version = "0.3.1", default-features = false, optional = true } nom = { version = "7.1.3", default-features = false, optional = true } notify = { version = "6.0.1", default-features = false, features = ["macos_fsevent"] } @@ -542,7 +542,7 @@ sources-kafka = ["dep:rdkafka"] sources-kubernetes_logs = ["dep:file-source", "kubernetes", "transforms-reduce"] sources-logstash = ["sources-utils-net-tcp", "tokio-util/net"] sources-mongodb_metrics = ["dep:mongodb"] -sources-nats = ["dep:nats", "dep:nkeys"] +sources-nats = ["dep:async-nats", "dep:nkeys"] sources-nginx_metrics = ["dep:nom"] sources-opentelemetry = ["dep:hex", "dep:opentelemetry-proto", "dep:prost-types", "sources-http_server", "sources-utils-http", "sources-vector"] sources-postgresql_metrics = ["dep:postgres-openssl", "dep:tokio-postgres"] @@ -699,7 +699,7 @@ sinks-influxdb = [] sinks-kafka = ["dep:rdkafka"] sinks-mezmo = [] sinks-loki = ["loki-logproto"] -sinks-nats = ["dep:nats", "dep:nkeys"] +sinks-nats = ["dep:async-nats", "dep:nkeys"] sinks-new_relic_logs = ["sinks-http"] sinks-new_relic = [] sinks-papertrail = ["dep:syslog"] diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index b6d5a876eae7e..c99a0ece7a912 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -35,6 +35,7 @@ async-global-executor,https://github.com/Keruspe/async-global-executor,Apache-2. async-graphql,https://github.com/async-graphql/async-graphql,MIT OR Apache-2.0,"sunli , Koxiaet" async-io,https://github.com/smol-rs/async-io,Apache-2.0 OR MIT,Stjepan Glavina async-lock,https://github.com/smol-rs/async-lock,Apache-2.0 OR MIT,Stjepan Glavina +async-nats,https://github.com/nats-io/nats.rs,Apache-2.0,"Tomasz Pietrek , Casper Beyer " async-net,https://github.com/smol-rs/async-net,Apache-2.0 OR MIT,Stjepan Glavina async-process,https://github.com/smol-rs/async-process,Apache-2.0 OR MIT,Stjepan Glavina async-reactor-trait,https://github.com/amqp-rs/reactor-trait,Apache-2.0 OR MIT,Marc-Antoine Perennou @@ -82,7 +83,6 @@ backtrace,https://github.com/rust-lang/backtrace-rs,MIT OR Apache-2.0,The Rust P base16,https://github.com/thomcc/rust-base16,CC0-1.0,Thom Chiovoloni base64,https://github.com/marshallpierce/rust-base64,MIT OR Apache-2.0,"Alice Maz , Marshall Pierce " base64-simd,https://github.com/Nugine/simd,MIT,The base64-simd Authors -base64-url,https://github.com/magiclen/base64-url,MIT,Magic Len base64ct,https://github.com/RustCrypto/formats/tree/master/base64ct,Apache-2.0 OR MIT,RustCrypto Developers bit-set,https://github.com/contain-rs/bit-set,MIT OR Apache-2.0,Alexis Beingessner bit-vec,https://github.com/contain-rs/bit-vec,MIT OR Apache-2.0,Alexis Beingessner @@ -143,7 +143,6 @@ crc,https://github.com/mrhooray/crc-rs,MIT OR Apache-2.0,"Rui Hu crc32c,https://github.com/zowens/crc32c,Apache-2.0 OR MIT,Zack Owens crc32fast,https://github.com/srijs/rust-crc32fast,MIT OR Apache-2.0,"Sam Rijs , Alex Crichton " -crossbeam-channel,https://github.com/crossbeam-rs/crossbeam,MIT OR Apache-2.0,The crossbeam-channel Authors crossbeam-epoch,https://github.com/crossbeam-rs/crossbeam,MIT OR Apache-2.0,The crossbeam-epoch Authors crossbeam-queue,https://github.com/crossbeam-rs/crossbeam,MIT OR Apache-2.0,The crossbeam-queue Authors crossbeam-utils,https://github.com/crossbeam-rs/crossbeam,MIT OR Apache-2.0,The crossbeam-utils Authors @@ -283,7 +282,6 @@ itoa,https://github.com/dtolnay/itoa,MIT OR Apache-2.0,David Tolnay jni-sys,https://github.com/sfackler/rust-jni-sys,MIT OR Apache-2.0,Steven Fackler js-sys,https://github.com/rustwasm/wasm-bindgen/tree/master/crates/js-sys,MIT OR Apache-2.0,The wasm-bindgen Developers -json,https://github.com/maciejhirsz/json-rust,MIT OR Apache-2.0,Maciej Hirsz json-patch,https://github.com/idubrov/json-patch,MIT OR Apache-2.0,Ivan Dubrov jsonpath_lib,https://github.com/freestrings/jsonpath,MIT,Changseok Han k8s-openapi,https://github.com/Arnavion/k8s-openapi,Apache-2.0,Arnavion @@ -335,7 +333,6 @@ mlua,https://github.com/khvzak/mlua,MIT,"Aleksandr Orlenko , kyren mongodb,https://github.com/mongodb/mongo-rust-driver,Apache-2.0,"Saghm Rossi , Patrick Freed , Isabel Atkinson , Abraham Egnor , Kaitlin Mahar " multer,https://github.com/rousan/multer-rs,MIT,Rousan Ali native-tls,https://github.com/sfackler/rust-native-tls,MIT OR Apache-2.0,Steven Fackler -nats,https://github.com/nats-io/nats.rs,Apache-2.0,"Derek Collison , Tyler Neely , Stjepan Glavina " ndk-context,https://github.com/rust-windowing/android-ndk-rs,MIT OR Apache-2.0,The Rust Windowing contributors nibble_vec,https://github.com/michaelsproul/rust_nibble_vec,MIT,Michael Sproul nix,https://github.com/nix-rust/nix,MIT,The nix-rust Project Developers @@ -455,10 +452,8 @@ rustc-hash,https://github.com/rust-lang-nursery/rustc-hash,Apache-2.0 OR MIT,The rustc_version,https://github.com/Kimundi/rustc-version-rs,MIT OR Apache-2.0,Marvin Löbel rustc_version_runtime,https://github.com/seppo0010/rustc-version-runtime-rs,MIT,Sebastian Waisbrot rustix,https://github.com/bytecodealliance/rustix,Apache-2.0 WITH LLVM-exception OR Apache-2.0 OR MIT,"Dan Gohman , Jakub Konka " -rustls,https://github.com/ctz/rustls,Apache-2.0 OR ISC OR MIT,Joseph Birr-Pixton rustls,https://github.com/rustls/rustls,Apache-2.0 OR ISC OR MIT,The rustls Authors rustls-native-certs,https://github.com/ctz/rustls-native-certs,Apache-2.0 OR ISC OR MIT,Joseph Birr-Pixton -rustls-pemfile,https://github.com/rustls/pemfile,Apache-2.0 OR ISC OR MIT,Joseph Birr-Pixton rustls-pemfile,https://github.com/rustls/pemfile,Apache-2.0 OR ISC OR MIT,The rustls-pemfile Authors rustls-webpki,https://github.com/rustls/webpki,ISC,The rustls-webpki Authors rustversion,https://github.com/dtolnay/rustversion,MIT OR Apache-2.0,David Tolnay diff --git a/src/internal_events/nats.rs b/src/internal_events/nats.rs index 94e06da098108..c68c756a3a377 100644 --- a/src/internal_events/nats.rs +++ b/src/internal_events/nats.rs @@ -1,5 +1,3 @@ -use std::io::Error; - use crate::emit; use metrics::counter; use vector_common::internal_event::{ @@ -7,11 +5,9 @@ use vector_common::internal_event::{ }; use vector_core::internal_event::InternalEvent; -use super::prelude::io_error_code; - #[derive(Debug)] pub struct NatsEventSendError { - pub error: Error, + pub error: async_nats::Error, } impl InternalEvent for NatsEventSendError { @@ -21,14 +17,12 @@ impl InternalEvent for NatsEventSendError { message = reason, error = %self.error, error_type = error_type::WRITER_FAILED, - error_code = io_error_code(&self.error), stage = error_stage::SENDING, internal_log_rate_limit = true, ); counter!( "component_errors_total", 1, "error_type" => error_type::WRITER_FAILED, - "error_code" => io_error_code(&self.error), "stage" => error_stage::SENDING, ); emit!(ComponentEventsDropped:: { count: 1, reason }); diff --git a/src/nats.rs b/src/nats.rs index 5bad2c9182bd0..5d20516d49939 100644 --- a/src/nats.rs +++ b/src/nats.rs @@ -13,6 +13,8 @@ pub enum NatsConfigError { TlsMissingKey, #[snafu(display("NATS TLS Config Error: missing cert"))] TlsMissingCert, + #[snafu(display("NATS Credentials file error"))] + CredentialsFileError { source: std::io::Error }, } /// Configuration of the authentication strategy when interacting with NATS. @@ -115,30 +117,27 @@ pub(crate) struct NatsAuthNKey { } impl NatsAuthConfig { - pub(crate) fn to_nats_options(&self) -> Result { + pub(crate) fn to_nats_options(&self) -> Result { match self { NatsAuthConfig::UserPassword { user_password } => { - Ok(nats::asynk::Options::with_user_pass( - user_password.user.as_str(), - user_password.password.inner(), + Ok(async_nats::ConnectOptions::with_user_and_password( + user_password.user.clone(), + user_password.password.inner().to_string(), )) } - NatsAuthConfig::CredentialsFile { credentials_file } => Ok( - nats::asynk::Options::with_credentials(&credentials_file.path), - ), - NatsAuthConfig::Nkey { nkey } => nkeys::KeyPair::from_seed(&nkey.seed) - .context(AuthConfigSnafu) - .map(|kp| { - // The following unwrap is safe because the only way the sign method can fail is if - // keypair does not contain a seed. We are constructing the keypair from a seed in - // the preceding line. - nats::asynk::Options::with_nkey(&nkey.nkey, move |nonce| { - kp.sign(nonce).unwrap() - }) - }), - NatsAuthConfig::Token { token } => { - Ok(nats::asynk::Options::with_token(token.value.inner())) + NatsAuthConfig::CredentialsFile { credentials_file } => { + async_nats::ConnectOptions::with_credentials( + &std::fs::read_to_string(credentials_file.path.clone()) + .context(CredentialsFileSnafu)?, + ) + .context(CredentialsFileSnafu) } + NatsAuthConfig::Nkey { nkey } => { + Ok(async_nats::ConnectOptions::with_nkey(nkey.seed.clone())) + } + NatsAuthConfig::Token { token } => Ok(async_nats::ConnectOptions::with_token( + token.value.inner().to_string(), + )), } } } @@ -147,35 +146,33 @@ pub(crate) fn from_tls_auth_config( connection_name: &str, auth_config: &Option, tls_config: &Option, -) -> Result { +) -> Result { let nats_options = match &auth_config { - None => nats::asynk::Options::new(), + None => async_nats::ConnectOptions::new(), Some(auth) => auth.to_nats_options()?, }; - let nats_options = nats_options - .with_name(connection_name) - // Set reconnect_buffer_size on the nats client to 0 bytes so that the - // client doesn't buffer internally (to avoid message loss). - .reconnect_buffer_size(0); + let nats_options = nats_options.name(connection_name); match tls_config { None => Ok(nats_options), Some(tls_config) => { let tls_enabled = tls_config.enabled.unwrap_or(false); - let nats_options = nats_options.tls_required(tls_enabled); + let nats_options = nats_options.require_tls(tls_enabled); if !tls_enabled { return Ok(nats_options); } let nats_options = match &tls_config.options.ca_file { None => nats_options, - Some(ca_file) => nats_options.add_root_certificate(ca_file), + Some(ca_file) => nats_options.add_root_certificates(ca_file.clone()), }; let nats_options = match (&tls_config.options.crt_file, &tls_config.options.key_file) { (None, None) => nats_options, - (Some(crt_file), Some(key_file)) => nats_options.client_cert(crt_file, key_file), + (Some(crt_file), Some(key_file)) => { + nats_options.add_client_certificate(crt_file.clone(), key_file.clone()) + } (Some(_crt_file), None) => return Err(NatsConfigError::TlsMissingKey), (None, Some(_key_file)) => return Err(NatsConfigError::TlsMissingCert), }; @@ -188,7 +185,7 @@ pub(crate) fn from_tls_auth_config( mod tests { use super::*; - fn parse_auth(s: &str) -> Result { + fn parse_auth(s: &str) -> Result { toml::from_str(s) .map_err(Into::into) .and_then(|config: NatsAuthConfig| config.to_nats_options().map_err(Into::into)) @@ -266,7 +263,7 @@ mod tests { parse_auth( r#" strategy = "credentials_file" - credentials_file.path = "/path/to/nowhere" + credentials_file.path = "tests/data/nats/nats.creds" "#, ) .unwrap(); diff --git a/src/sinks/nats.rs b/src/sinks/nats.rs index e80f28245a631..02265df21972b 100644 --- a/src/sinks/nats.rs +++ b/src/sinks/nats.rs @@ -33,7 +33,7 @@ enum BuildError { #[snafu(display("NATS Config Error: {}", source))] Config { source: NatsConfigError }, #[snafu(display("NATS Connect Error: {}", source))] - Connect { source: std::io::Error }, + Connect { source: async_nats::ConnectError }, } /** @@ -137,7 +137,7 @@ impl SinkConfig for NatsSinkConfig { } } -impl std::convert::TryFrom<&NatsSinkConfig> for nats::asynk::Options { +impl std::convert::TryFrom<&NatsSinkConfig> for async_nats::ConnectOptions { type Error = NatsConfigError; fn try_from(config: &NatsSinkConfig) -> Result { @@ -146,8 +146,8 @@ impl std::convert::TryFrom<&NatsSinkConfig> for nats::asynk::Options { } impl NatsSinkConfig { - async fn connect(&self) -> Result { - let options: nats::asynk::Options = self.try_into().context(ConfigSnafu)?; + async fn connect(&self) -> Result { + let options: async_nats::ConnectOptions = self.try_into().context(ConfigSnafu)?; options.connect(&self.url).await.context(ConnectSnafu) } @@ -160,7 +160,7 @@ async fn healthcheck(config: NatsSinkConfig) -> crate::Result<()> { pub struct NatsSink { transformer: Transformer, encoder: Encoder<()>, - connection: nats::asynk::Connection, + connection: async_nats::Client, subject: Template, } @@ -213,7 +213,14 @@ impl StreamSink for NatsSink { continue; } - match self.connection.publish(&subject, &bytes).await { + let message_size = bytes.len(); + match self + .connection + .publish(subject.clone(), bytes.freeze()) + .map_err(Into::into) + .and_then(|_| self.connection.flush().map_err(Into::into)) + .await + { Err(error) => { finalizers.update_status(EventStatus::Errored); @@ -223,7 +230,7 @@ impl StreamSink for NatsSink { finalizers.update_status(EventStatus::Delivered); events_sent.emit(CountByteSize(1, event_byte_size)); - bytes_sent.emit(ByteSize(bytes.len())); + bytes_sent.emit(ByteSize(message_size)); } } } @@ -246,7 +253,7 @@ mod tests { #[cfg(test)] mod integration_tests { use codecs::TextSerializerConfig; - use std::{thread, time::Duration}; + use std::time::Duration; use super::*; use crate::nats::{NatsAuthCredentialsFile, NatsAuthNKey, NatsAuthToken, NatsAuthUserPassword}; @@ -274,10 +281,14 @@ mod integration_tests { .connect() .await .expect("failed to connect with test consumer"); - let sub = consumer - .subscribe(&subject) + let mut sub = consumer + .subscribe(subject) .await .expect("failed to subscribe with test consumer"); + consumer + .flush() + .await + .expect("failed to flush with the test consumer"); // Publish events. let num_events = 1_000; @@ -286,12 +297,12 @@ mod integration_tests { run_and_assert_sink_compliance(sink, events, &SINK_TAGS).await; // Unsubscribe from the channel. - thread::sleep(Duration::from_secs(3)); - sub.drain().await.unwrap(); + tokio::time::sleep(Duration::from_secs(3)).await; + sub.unsubscribe().await.unwrap(); let mut output: Vec = Vec::new(); while let Some(msg) = sub.next().await { - output.push(String::from_utf8_lossy(&msg.data).to_string()) + output.push(String::from_utf8_lossy(&msg.payload).to_string()) } assert_eq!(output.len(), input.len()); @@ -501,7 +512,7 @@ mod integration_tests { let r = publish_and_check(conf).await; assert!( - matches!(r, Err(BuildError::Config { .. })), + matches!(r, Err(BuildError::Connect { .. })), "publish_and_check failed, expected BuildError::Config, got: {:?}", r ); diff --git a/src/sources/nats.rs b/src/sources/nats.rs index 2d893a9bee642..2549a4771e40f 100644 --- a/src/sources/nats.rs +++ b/src/sources/nats.rs @@ -1,6 +1,6 @@ use chrono::Utc; use codecs::decoding::{DeserializerConfig, FramingConfig, StreamDecodingError}; -use futures::{pin_mut, stream, Stream, StreamExt}; +use futures::{pin_mut, StreamExt}; use lookup::{lookup_v2::OptionalValuePath, owned_value_path}; use snafu::{ResultExt, Snafu}; use tokio_util::codec::FramedRead; @@ -31,9 +31,9 @@ enum BuildError { #[snafu(display("NATS Config Error: {}", source))] Config { source: NatsConfigError }, #[snafu(display("NATS Connect Error: {}", source))] - Connect { source: std::io::Error }, + Connect { source: async_nats::ConnectError }, #[snafu(display("NATS Subscribe Error: {}", source))] - Subscribe { source: std::io::Error }, + Subscribe { source: async_nats::SubscribeError }, } /// Configuration for the `nats` source. @@ -167,13 +167,13 @@ impl SourceConfig for NatsSourceConfig { } impl NatsSourceConfig { - async fn connect(&self) -> Result { - let options: nats::asynk::Options = self.try_into().context(ConfigSnafu)?; + async fn connect(&self) -> Result { + let options: async_nats::ConnectOptions = self.try_into().context(ConfigSnafu)?; options.connect(&self.url).await.context(ConnectSnafu) } } -impl TryFrom<&NatsSourceConfig> for nats::asynk::Options { +impl TryFrom<&NatsSourceConfig> for async_nats::ConnectOptions { type Error = NatsConfigError; fn try_from(config: &NatsSourceConfig) -> Result { @@ -181,31 +181,23 @@ impl TryFrom<&NatsSourceConfig> for nats::asynk::Options { } } -fn get_subscription_stream( - subscription: nats::asynk::Subscription, -) -> impl Stream { - stream::unfold(subscription, |subscription| async move { - subscription.next().await.map(|msg| (msg, subscription)) - }) -} - async fn nats_source( config: NatsSourceConfig, // Take ownership of the connection so it doesn't get dropped. - _connection: nats::asynk::Connection, - subscription: nats::asynk::Subscription, + _connection: async_nats::Client, + subscriber: async_nats::Subscriber, decoder: Decoder, log_namespace: LogNamespace, shutdown: ShutdownSignal, mut out: SourceSender, ) -> Result<(), ()> { let events_received = register!(EventsReceived); - let stream = get_subscription_stream(subscription).take_until(shutdown); + let stream = subscriber.take_until(shutdown); pin_mut!(stream); let bytes_received = register!(BytesReceived::from(Protocol::TCP)); while let Some(msg) = stream.next().await { - bytes_received.emit(ByteSize(msg.data.len())); - let mut stream = FramedRead::new(msg.data.as_ref(), decoder.clone()); + bytes_received.emit(ByteSize(msg.payload.len())); + let mut stream = FramedRead::new(msg.payload.as_ref(), decoder.clone()); while let Some(next) = stream.next().await { match next { Ok((events, _byte_size)) => { @@ -258,12 +250,15 @@ async fn nats_source( async fn create_subscription( config: &NatsSourceConfig, -) -> Result<(nats::asynk::Connection, nats::asynk::Subscription), BuildError> { +) -> Result<(async_nats::Client, async_nats::Subscriber), BuildError> { let nc = config.connect().await?; let subscription = match &config.queue { - None => nc.subscribe(&config.subject).await, - Some(queue) => nc.queue_subscribe(&config.subject, queue).await, + None => nc.subscribe(config.subject.clone()).await, + Some(queue) => { + nc.queue_subscribe(config.subject.clone(), queue.clone()) + .await + } }; let subscription = subscription.context(SubscribeSnafu)?; @@ -350,6 +345,7 @@ mod tests { mod integration_tests { #![allow(clippy::print_stdout)] //tests + use bytes::Bytes; use vector_core::config::log_schema; use super::*; @@ -385,7 +381,10 @@ mod integration_tests { ShutdownSignal::noop(), tx, )); - nc_pub.publish(&subject, msg).await.unwrap(); + nc_pub + .publish(subject, Bytes::from_static(msg.as_bytes())) + .await + .unwrap(); collect_n(rx, 1).await }) @@ -610,7 +609,7 @@ mod integration_tests { let r = publish_and_check(conf).await; assert!( - matches!(r, Err(BuildError::Config { .. })), + matches!(r, Err(BuildError::Connect { .. })), "publish_and_check failed, expected BuildError::Config, got: {:?}", r );