From afdc66e08fc46a990e7f0c65a8a1a540de8ef52b Mon Sep 17 00:00:00 2001 From: Kunal Mohan <44079328+kunalmohan@users.noreply.github.com> Date: Sat, 19 Aug 2023 00:29:18 +0530 Subject: [PATCH] enhancement(http_server source): Configurable http response code (#18208) * make http response code customizable for http-server source * fix tests * update docs * Apply suggestions from code review Co-authored-by: neuronull * update generated docs * add unit test * fix clippy errors * update licenses * fix cue --------- Co-authored-by: neuronull --- Cargo.lock | 12 ++++ Cargo.toml | 1 + LICENSE-3rdparty.csv | 1 + lib/vector-config/Cargo.toml | 1 + lib/vector-config/src/http.rs | 35 ++++++++++ lib/vector-config/src/lib.rs | 1 + src/sources/heroku_logs.rs | 1 + src/sources/http_server.rs | 67 ++++++++++++++++++- src/sources/prometheus/remote_write.rs | 1 + src/sources/util/http/prelude.rs | 11 +-- .../components/sources/base/http.cue | 10 +++ .../components/sources/base/http_server.cue | 10 +++ 12 files changed, 146 insertions(+), 5 deletions(-) create mode 100644 lib/vector-config/src/http.rs diff --git a/Cargo.lock b/Cargo.lock index 35ae708e34fbf..8f6872036f219 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4110,6 +4110,16 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" +[[package]] +name = "http-serde" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e272971f774ba29341db2f686255ff8a979365a26fb9e4277f6b6d9ec0cdd5e" +dependencies = [ + "http", + "serde", +] + [[package]] name = "http-types" version = "2.12.0" @@ -9477,6 +9487,7 @@ dependencies = [ "hostname", "http", "http-body", + "http-serde", "hyper", "hyper-openssl", "hyper-proxy", @@ -9701,6 +9712,7 @@ dependencies = [ "chrono", "chrono-tz", "encoding_rs", + "http", "indexmap 2.0.0", "inventory", "no-proxy", diff --git a/Cargo.toml b/Cargo.toml index d90e138e086e3..25d36061e7707 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -275,6 +275,7 @@ hashbrown = { version = "0.14.0", default-features = false, optional = true, fea headers = { version = "0.3.8", default-features = false } hostname = { version = "0.3.1", default-features = false } http = { version = "0.2.9", default-features = false } +http-serde = "1.1.2" http-body = { version = "0.4.5", default-features = false } hyper = { version = "0.14.27", default-features = false, features = ["client", "runtime", "http1", "http2", "server", "stream"] } hyper-openssl = { version = "0.9.2", default-features = false } diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index c8f5814fbca1a..3f4f81b68a599 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -252,6 +252,7 @@ hostname,https://github.com/svartalf/hostname,MIT,"fengcen , Carl Lerche , Sean McArthur " http-body,https://github.com/hyperium/http-body,MIT,"Carl Lerche , Lucio Franco , Sean McArthur " http-range-header,https://github.com/MarcusGrass/parse-range-headers,MIT,The http-range-header Authors +http-serde,https://gitlab.com/kornelski/http-serde,Apache-2.0 OR MIT,Kornel http-types,https://github.com/http-rs/http-types,MIT OR Apache-2.0,Yoshua Wuyts httparse,https://github.com/seanmonstar/httparse,MIT OR Apache-2.0,Sean McArthur httpdate,https://github.com/pyfisch/httpdate,MIT OR Apache-2.0,Pyfisch diff --git a/lib/vector-config/Cargo.toml b/lib/vector-config/Cargo.toml index c592fe580fef3..d8ff34b2e41de 100644 --- a/lib/vector-config/Cargo.toml +++ b/lib/vector-config/Cargo.toml @@ -26,6 +26,7 @@ snafu = { version = "0.7.5", default-features = false } toml = { version = "0.7.6", default-features = false } tracing = { version = "0.1.34", default-features = false } url = { version = "2.4.0", default-features = false, features = ["serde"] } +http = { version = "0.2.9", default-features = false } vrl.workspace = true vector-config-common = { path = "../vector-config-common" } vector-config-macros = { path = "../vector-config-macros" } diff --git a/lib/vector-config/src/http.rs b/lib/vector-config/src/http.rs new file mode 100644 index 0000000000000..17be823be7638 --- /dev/null +++ b/lib/vector-config/src/http.rs @@ -0,0 +1,35 @@ +use http::StatusCode; +use serde_json::Value; +use std::cell::RefCell; + +use crate::{ + schema::{generate_number_schema, SchemaGenerator, SchemaObject}, + Configurable, GenerateError, Metadata, ToValue, +}; + +impl ToValue for StatusCode { + fn to_value(&self) -> Value { + serde_json::to_value(self.as_u16()).expect("Could not convert HTTP status code to JSON") + } +} + +impl Configurable for StatusCode { + fn referenceable_name() -> Option<&'static str> { + Some("http::StatusCode") + } + + fn is_optional() -> bool { + true + } + + fn metadata() -> Metadata { + let mut metadata = Metadata::default(); + metadata.set_description("HTTP response status code"); + metadata.set_default_value(StatusCode::OK); + metadata + } + + fn generate_schema(_: &RefCell) -> Result { + Ok(generate_number_schema::()) + } +} diff --git a/lib/vector-config/src/lib.rs b/lib/vector-config/src/lib.rs index 38904b60c6832..b18a6897505fd 100644 --- a/lib/vector-config/src/lib.rs +++ b/lib/vector-config/src/lib.rs @@ -117,6 +117,7 @@ pub use self::configurable::{Configurable, ConfigurableRef, ToValue}; mod errors; pub use self::errors::{BoundDirection, GenerateError}; mod external; +mod http; mod metadata; pub use self::metadata::Metadata; mod named; diff --git a/src/sources/heroku_logs.rs b/src/sources/heroku_logs.rs index 4ffb1cdc5169d..fd728def7b254 100644 --- a/src/sources/heroku_logs.rs +++ b/src/sources/heroku_logs.rs @@ -176,6 +176,7 @@ impl SourceConfig for LogplexConfig { self.address, "events", HttpMethod::Post, + StatusCode::OK, true, &self.tls, &self.auth, diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index 89da17c13e333..0be8b5e4e2117 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -9,6 +9,7 @@ use codecs::{ }; use http::{StatusCode, Uri}; +use http_serde; use lookup::{lookup_v2::OptionalValuePath, owned_value_path, path}; use tokio_util::codec::Decoder as _; use vector_config::configurable_component; @@ -129,6 +130,13 @@ pub struct SimpleHttpConfig { #[serde(default = "default_http_method")] method: HttpMethod, + /// Specifies the HTTP response status code that will be returned on successful requests. + #[configurable(metadata(docs::examples = 202))] + #[configurable(metadata(docs::numeric_type = "uint"))] + #[serde(with = "http_serde::status_code")] + #[serde(default = "default_http_response_code")] + response_code: StatusCode, + #[configurable(derived)] tls: Option, @@ -242,6 +250,7 @@ impl Default for SimpleHttpConfig { path: default_path(), path_key: default_path_key(), method: default_http_method(), + response_code: default_http_response_code(), strict_path: true, framing: None, decoding: Some(default_decoding()), @@ -289,6 +298,10 @@ fn default_path_key() -> OptionalValuePath { OptionalValuePath::from(owned_value_path!("path")) } +const fn default_http_response_code() -> StatusCode { + StatusCode::OK +} + /// Removes duplicates from the list, and logs a `warn!()` for each duplicate removed. fn remove_duplicates(mut list: Vec, list_name: &str) -> Vec { list.sort(); @@ -328,6 +341,7 @@ impl SourceConfig for SimpleHttpConfig { self.address, self.path.as_str(), self.method, + self.response_code, self.strict_path, &self.tls, &self.auth, @@ -478,7 +492,7 @@ mod tests { Compression, }; use futures::Stream; - use http::{HeaderMap, Method}; + use http::{HeaderMap, Method, StatusCode}; use lookup::lookup_v2::OptionalValuePath; use similar_asserts::assert_eq; @@ -506,6 +520,7 @@ mod tests { path_key: &'a str, path: &'a str, method: &'a str, + response_code: StatusCode, strict_path: bool, status: EventStatus, acknowledgements: bool, @@ -529,6 +544,7 @@ mod tests { headers, encoding: None, query_parameters, + response_code, tls: None, auth: None, strict_path, @@ -639,6 +655,7 @@ mod tests { "http_path", "/", "POST", + StatusCode::OK, true, EventStatus::Delivered, true, @@ -683,6 +700,7 @@ mod tests { "http_path", "/", "POST", + StatusCode::OK, true, EventStatus::Delivered, true, @@ -720,6 +738,7 @@ mod tests { "http_path", "/", "POST", + StatusCode::OK, true, EventStatus::Delivered, true, @@ -751,6 +770,7 @@ mod tests { "http_path", "/", "POST", + StatusCode::OK, true, EventStatus::Delivered, true, @@ -787,6 +807,7 @@ mod tests { "http_path", "/", "POST", + StatusCode::OK, true, EventStatus::Delivered, true, @@ -830,6 +851,7 @@ mod tests { "http_path", "/", "POST", + StatusCode::OK, true, EventStatus::Delivered, true, @@ -879,6 +901,7 @@ mod tests { "http_path", "/", "POST", + StatusCode::OK, true, EventStatus::Delivered, true, @@ -964,6 +987,7 @@ mod tests { "http_path", "/", "POST", + StatusCode::OK, true, EventStatus::Delivered, true, @@ -1005,6 +1029,7 @@ mod tests { "http_path", "/", "POST", + StatusCode::OK, true, EventStatus::Delivered, true, @@ -1055,6 +1080,7 @@ mod tests { "http_path", "/", "POST", + StatusCode::OK, true, EventStatus::Delivered, true, @@ -1084,6 +1110,7 @@ mod tests { "vector_http_path", "/event/path", "POST", + StatusCode::OK, true, EventStatus::Delivered, true, @@ -1123,6 +1150,7 @@ mod tests { "vector_http_path", "/event", "POST", + StatusCode::OK, false, EventStatus::Delivered, true, @@ -1182,6 +1210,7 @@ mod tests { "vector_http_path", "/", "POST", + StatusCode::OK, true, EventStatus::Delivered, true, @@ -1196,6 +1225,39 @@ mod tests { ); } + #[tokio::test] + async fn http_status_code() { + assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move { + let (rx, addr) = source( + vec![], + vec![], + "http_path", + "/", + "POST", + StatusCode::ACCEPTED, + true, + EventStatus::Delivered, + true, + None, + None, + ) + .await; + + spawn_collect_n( + async move { + assert_eq!( + StatusCode::ACCEPTED, + send(addr, "{\"key1\":\"value1\"}").await + ); + }, + rx, + 1, + ) + .await; + }) + .await; + } + #[tokio::test] async fn http_delivery_failure() { assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async { @@ -1205,6 +1267,7 @@ mod tests { "http_path", "/", "POST", + StatusCode::OK, true, EventStatus::Rejected, true, @@ -1234,6 +1297,7 @@ mod tests { "http_path", "/", "POST", + StatusCode::OK, true, EventStatus::Rejected, false, @@ -1265,6 +1329,7 @@ mod tests { "http_path", "/", "GET", + StatusCode::OK, true, EventStatus::Delivered, true, diff --git a/src/sources/prometheus/remote_write.rs b/src/sources/prometheus/remote_write.rs index fb8b84a3d724b..6fe08a32c7b51 100644 --- a/src/sources/prometheus/remote_write.rs +++ b/src/sources/prometheus/remote_write.rs @@ -80,6 +80,7 @@ impl SourceConfig for PrometheusRemoteWriteConfig { self.address, "", HttpMethod::Post, + StatusCode::OK, true, &self.tls, &self.auth, diff --git a/src/sources/util/http/prelude.rs b/src/sources/util/http/prelude.rs index 1087a65b81f40..3d96d1971c2f2 100644 --- a/src/sources/util/http/prelude.rs +++ b/src/sources/util/http/prelude.rs @@ -63,6 +63,7 @@ pub trait HttpSource: Clone + Send + Sync + 'static { address: SocketAddr, path: &str, method: HttpMethod, + response_code: StatusCode, strict_path: bool, tls: &Option, auth: &Option, @@ -152,7 +153,7 @@ pub trait HttpSource: Clone + Send + Sync + 'static { events }); - handle_request(events, acknowledgements, cx.out.clone()) + handle_request(events, acknowledgements, response_code, cx.out.clone()) }, ) .with(warp::trace(move |_info| span.clone())); @@ -205,6 +206,7 @@ impl warp::reject::Reject for RejectShuttingDown {} async fn handle_request( events: Result, ErrorMessage>, acknowledgements: bool, + response_code: StatusCode, mut out: SourceSender, ) -> Result { match events { @@ -219,7 +221,7 @@ async fn handle_request( emit!(StreamClosedError { count }); warp::reject::custom(RejectShuttingDown) }) - .and_then(|_| handle_batch_status(receiver)) + .and_then(|_| handle_batch_status(response_code, receiver)) .await } Err(error) => { @@ -230,12 +232,13 @@ async fn handle_request( } async fn handle_batch_status( + success_response_code: StatusCode, receiver: Option, ) -> Result { match receiver { - None => Ok(warp::reply()), + None => Ok(success_response_code), Some(receiver) => match receiver.await { - BatchStatus::Delivered => Ok(warp::reply()), + BatchStatus::Delivered => Ok(success_response_code), BatchStatus::Errored => Err(warp::reject::custom(ErrorMessage::new( StatusCode::INTERNAL_SERVER_ERROR, "Error delivering contents to sink".into(), diff --git a/website/cue/reference/components/sources/base/http.cue b/website/cue/reference/components/sources/base/http.cue index 21671e40afd90..d47eca98423a7 100644 --- a/website/cue/reference/components/sources/base/http.cue +++ b/website/cue/reference/components/sources/base/http.cue @@ -338,6 +338,16 @@ base: components: sources: http: configuration: { items: type: string: examples: ["application", "source"] } } + response_code: { + description: "Specifies the HTTP response status code that will be returned on successful requests." + required: false + type: uint: { + default: 200 + examples: [ + 202, + ] + } + } strict_path: { description: """ Whether or not to treat the configured `path` as an absolute path. diff --git a/website/cue/reference/components/sources/base/http_server.cue b/website/cue/reference/components/sources/base/http_server.cue index 6ee325a66a02c..5d2e305b0dd08 100644 --- a/website/cue/reference/components/sources/base/http_server.cue +++ b/website/cue/reference/components/sources/base/http_server.cue @@ -338,6 +338,16 @@ base: components: sources: http_server: configuration: { items: type: string: examples: ["application", "source"] } } + response_code: { + description: "Specifies the HTTP response status code that will be returned on successful requests." + required: false + type: uint: { + default: 200 + examples: [ + 202, + ] + } + } strict_path: { description: """ Whether or not to treat the configured `path` as an absolute path.