Skip to content

Commit

Permalink
feat(sources): http_server accepts query parameter wildcards
Browse files Browse the repository at this point in the history
Much like was done for headers, specifying which `query_parameters` to
be saved into the event should support wildcards.

Ref: LOG-19105
  • Loading branch information
darinspivey committed Jan 24, 2024
1 parent 733f3a9 commit 6627a95
Showing 1 changed file with 130 additions and 14 deletions.
144 changes: 130 additions & 14 deletions src/sources/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ use crate::{
event::{Event, Value},
register_validatable_component,
serde::{bool_or_struct, default_decoding},
sources::util::{
http::{add_query_parameters, HttpMethod},
Encoding, ErrorMessage, HttpSource, HttpSourceAuthConfig,
},
sources::util::{http::HttpMethod, Encoding, ErrorMessage, HttpSource, HttpSourceAuthConfig},
tls::TlsEnableableConfig,
};

Expand Down Expand Up @@ -353,7 +350,10 @@ impl SourceConfig for SimpleHttpConfig {

let source = SimpleHttpSource {
headers: build_param_matcher(&remove_duplicates(self.headers.clone(), "headers"))?,
query_parameters: remove_duplicates(self.query_parameters.clone(), "query_parameters"),
query_parameters: build_param_matcher(&remove_duplicates(
self.query_parameters.clone(),
"query_parameters",
))?,
path_key: self.path_key.clone(),
decoder,
log_namespace,
Expand Down Expand Up @@ -399,7 +399,7 @@ impl SourceConfig for SimpleHttpConfig {
#[derive(Clone)]
struct SimpleHttpSource {
headers: Vec<HttpConfigParamKind>,
query_parameters: Vec<String>,
query_parameters: Vec<HttpConfigParamKind>,
path_key: OptionalValuePath,
decoder: Decoder,
log_namespace: LogNamespace,
Expand Down Expand Up @@ -472,6 +472,47 @@ impl HttpSource for SimpleHttpSource {
};
}

for q in &self.query_parameters {
match q {
// Same as headers, add non-wildcard query parameters if they are found
HttpConfigParamKind::Exact(query_parameter_name) => {
let value = query_parameters.get(query_parameter_name);

self.log_namespace.insert_source_metadata(
SimpleHttpConfig::NAME,
log,
Some(LegacyKey::InsertIfEmpty(path!(query_parameter_name))),
path!("query_parameters", query_parameter_name),
Value::from(value.map(String::to_owned)),
);
}
// Add all query parameters that match against wildcard pattens specified
HttpConfigParamKind::Glob(query_parameter_pattern) => {
for query_parameter_name in query_parameters.keys() {
if query_parameter_pattern.matches_with(
query_parameter_name.as_str(),
glob::MatchOptions::default(),
) {
let value = query_parameters.get(query_parameter_name);

self.log_namespace.insert_source_metadata(
SimpleHttpConfig::NAME,
log,
Some(LegacyKey::InsertIfEmpty(path!(
query_parameter_name.as_str()
))),
path!(
"query_parameters",
query_parameter_name.as_str()
),
Value::from(value.map(String::to_owned)),
);
}
}
}
};
}

self.log_namespace.insert_standard_vector_source_metadata(
log,
SimpleHttpConfig::NAME,
Expand All @@ -483,14 +524,6 @@ impl HttpSource for SimpleHttpSource {
}
}
}

add_query_parameters(
events,
&self.query_parameters,
query_parameters,
self.log_namespace,
SimpleHttpConfig::NAME,
);
}

fn build_events(
Expand Down Expand Up @@ -1155,6 +1188,89 @@ mod tests {
}
}

#[tokio::test]
async fn http_query_parameters_wildcard_all() {
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
let (rx, addr) = source(
vec![],
vec!["*".to_string()],
"http_path",
"/",
"POST",
StatusCode::OK,
true,
EventStatus::Delivered,
true,
None,
Some(JsonDeserializerConfig::default().into()),
)
.await;

spawn_ok_collect_n(
send_with_query(
addr,
"{\"key1\":\"value1\"}",
"source=staging&region=gb&status=200",
),
rx,
1,
)
.await
})
.await;

{
let event = events.remove(0);
let log = event.as_log();
assert_eq!(log["key1"], "value1".into());
assert_eq!(log["source"], "staging".into());
assert_eq!(log["region"], "gb".into());
assert_eq!(log["status"], "200".into());
assert_event_metadata(log).await;
}
}

#[tokio::test]
async fn http_query_parameters_wildcard_some() {
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
let (rx, addr) = source(
vec![],
vec!["s*".to_string()],
"http_path",
"/",
"POST",
StatusCode::OK,
true,
EventStatus::Delivered,
true,
None,
Some(JsonDeserializerConfig::default().into()),
)
.await;

spawn_ok_collect_n(
send_with_query(
addr,
"{\"key1\":\"value1\"}",
"source=staging&region=gb&status=200",
),
rx,
1,
)
.await
})
.await;

{
let event = events.remove(0);
let log = event.as_log();
assert_eq!(log["key1"], "value1".into());
assert_eq!(log["source"], "staging".into());
assert_eq!(log["status"], "200".into());
assert_event_metadata(log).await;
}
}

#[tokio::test]
async fn http_gzip_deflate() {
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
Expand Down

0 comments on commit 6627a95

Please sign in to comment.