Skip to content

Commit

Permalink
Test that the init hook uses the source ID of the transform
Browse files Browse the repository at this point in the history
  • Loading branch information
bruceg committed Jul 5, 2023
1 parent 95167bb commit 987df7c
Showing 1 changed file with 64 additions and 15 deletions.
79 changes: 64 additions & 15 deletions src/transforms/lua/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ mod tests {
use tokio_stream::wrappers::ReceiverStream;

use super::*;
use crate::test_util::components::assert_transform_compliance;
use crate::test_util::{components::assert_transform_compliance, random_string};
use crate::transforms::test::create_topology;
use crate::{
event::{
Expand Down Expand Up @@ -514,17 +514,57 @@ mod tests {
.await
}

async fn next_event(out: &Arc<Mutex<Receiver<Event>>>) -> Event {
async fn next_event(out: &Arc<Mutex<Receiver<Event>>>, source: &str) -> Event {
let event = out
.lock()
.await
.recv()
.await
.expect("Event was not received");
assert_eq!(event.source_id(), Some(&Arc::new(ComponentKey::from("in"))));
assert_eq!(
event.source_id(),
Some(&Arc::new(ComponentKey::from(source)))
);
event
}

#[tokio::test]
async fn lua_runs_init_hook() {
let line1 = random_string(9);
run_transform(
&format!(
r#"
version = "2"
hooks.init = """function (emit)
event = {{log={{message="{line1}"}}}}
emit(event)
end
"""
hooks.process = """function (event, emit)
emit(event)
end
"""
"#
),
|tx, out| async move {
let line2 = random_string(9);
tx.send(Event::Log(LogEvent::from(line2.as_str())))
.await
.unwrap();
drop(tx);
assert_eq!(
next_event(&out, "transform").await.as_log()["message"],
line1.into()
);
assert_eq!(
next_event(&out, "in").await.as_log()["message"],
line2.into(),
);
},
)
.await;
}

#[tokio::test]
async fn lua_add_field() {
run_transform(
Expand All @@ -540,7 +580,10 @@ mod tests {
let event = Event::Log(LogEvent::from("program me"));
tx.send(event).await.unwrap();

assert_eq!(next_event(&out).await.as_log()["hello"], "goodbye".into());
assert_eq!(
next_event(&out, "in").await.as_log()["hello"],
"goodbye".into()
);
},
)
.await;
Expand All @@ -562,7 +605,7 @@ mod tests {
let event = Event::Log(LogEvent::from("Hello, my name is Bob."));
tx.send(event).await.unwrap();

assert_eq!(next_event(&out).await.as_log()["name"], "Bob".into());
assert_eq!(next_event(&out, "in").await.as_log()["name"], "Bob".into());
},
)
.await;
Expand All @@ -585,7 +628,7 @@ mod tests {

tx.send(event.into()).await.unwrap();

assert_eq!(next_event(&out).await.as_log().get("name"), None);
assert_eq!(next_event(&out, "in").await.as_log().get("name"), None);
},
)
.await;
Expand Down Expand Up @@ -653,7 +696,10 @@ mod tests {
let event = LogEvent::default();
tx.send(event.into()).await.unwrap();

assert_eq!(next_event(&out).await.as_log()["result"], "empty".into());
assert_eq!(
next_event(&out, "in").await.as_log()["result"],
"empty".into()
);
},
)
.await;
Expand All @@ -674,7 +720,10 @@ mod tests {
let event = LogEvent::default();
tx.send(event.into()).await.unwrap();

assert_eq!(next_event(&out).await.as_log()["number"], Value::Integer(3));
assert_eq!(
next_event(&out, "in").await.as_log()["number"],
Value::Integer(3)
);
},
)
.await;
Expand All @@ -696,7 +745,7 @@ mod tests {
tx.send(event.into()).await.unwrap();

assert_eq!(
next_event(&out).await.as_log()["number"],
next_event(&out, "in").await.as_log()["number"],
Value::from(3.14159)
);
},
Expand All @@ -720,7 +769,7 @@ mod tests {
tx.send(event.into()).await.unwrap();

assert_eq!(
next_event(&out).await.as_log()["bool"],
next_event(&out, "in").await.as_log()["bool"],
Value::Boolean(true)
);
},
Expand All @@ -743,7 +792,7 @@ mod tests {
let event = LogEvent::default();
tx.send(event.into()).await.unwrap();

assert_eq!(next_event(&out).await.as_log().get("junk"), None);
assert_eq!(next_event(&out, "in").await.as_log().get("junk"), None);
},
)
.await;
Expand Down Expand Up @@ -789,7 +838,7 @@ mod tests {
let event = LogEvent::default();
tx.send(event.into()).await.unwrap();

assert_eq!(next_event(&out).await.as_log().get("result"), None);
assert_eq!(next_event(&out, "in").await.as_log().get("result"), None);
},
)
.await;
Expand Down Expand Up @@ -873,7 +922,7 @@ mod tests {
tx.send(event.into()).await.unwrap();

assert_eq!(
next_event(&out).await.as_log()["\"new field\""],
next_event(&out, "in").await.as_log()["\"new field\""],
"new value".into()
);
},
Expand All @@ -900,7 +949,7 @@ mod tests {
event.insert("friend", "Alice");
tx.send(event.into()).await.unwrap();

let output = next_event(&out).await;
let output = next_event(&out, "in").await;

assert_eq!(output.as_log()["name"], "nameBob".into());
assert_eq!(output.as_log()["friend"], "friendAlice".into());
Expand Down Expand Up @@ -936,7 +985,7 @@ mod tests {

tx.send(metric.into()).await.unwrap();

assert_eq!(next_event(&out).await.as_metric(), &expected);
assert_eq!(next_event(&out, "in").await.as_metric(), &expected);
},
)
.await;
Expand Down

0 comments on commit 987df7c

Please sign in to comment.