Skip to content

Commit

Permalink
convert timestamp&fix unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
caibirdme committed Jul 15, 2022
1 parent 1927594 commit e8cffd0
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 31 deletions.
18 changes: 8 additions & 10 deletions src/opentelemetry/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ use super::{
Resource as OtelResource,
};
use bytes::Bytes;
use chrono::{TimeZone, Utc};
use ordered_float::NotNan;
use std::{
collections::BTreeMap,
time::{SystemTime, UNIX_EPOCH},
};
use std::collections::BTreeMap;
use value::Value;
use vector_core::{
config::log_schema,
Expand Down Expand Up @@ -119,15 +117,15 @@ impl From<ResourceLog> for Event {
}

// NOT optional fields
le.insert(log_schema().timestamp_key(), rl.log_record.time_unix_nano);
le.insert(
log_schema().timestamp_key(),
Utc.timestamp_nanos(rl.log_record.time_unix_nano as i64),
);
// according to proto, if observed_time_unix_nano is missing, collector should set it
let observed_timestamp = if rl.log_record.observed_time_unix_nano > 0 {
rl.log_record.observed_time_unix_nano
Utc.timestamp_nanos(rl.log_record.observed_time_unix_nano as i64)
} else {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("SystemTime before UNIX EPOCH!")
.as_nanos() as u64
Utc::now()
};
le.insert(OBSERVED_TIME_UNIX_NANO_KEY, observed_timestamp);
le.insert(
Expand Down
69 changes: 48 additions & 21 deletions src/sources/opentelemetry/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ async fn handle_batch_status(receiver: Option<BatchStatusReceiver>) -> Result<()
mod tests {
use super::*;
use crate::{
event::{into_event_stream, Event, EventStatus, LogEvent, Value},
opentelemetry::{
Common::{any_value, AnyValue, KeyValue},
LogService::logs_service_client::LogsServiceClient,
Expand All @@ -151,17 +152,20 @@ mod tests {
},
SourceSender,
};
use chrono::{TimeZone, Utc};
use futures::Stream;
use futures_util::StreamExt;
use std::collections::BTreeMap;

#[tokio::test]
async fn receive_message() {
assert_source_compliance(&SOURCE_TAGS, async {
let addr = test_util::next_addr();
let config = format!(r#"address = "{}""#, addr);
let source: OpentelemetryConfig = toml::from_str(&config).unwrap();

let (tx, rx) = SourceSender::new_test();
let (sender, logs_output, _) = new_source(EventStatus::Delivered);
let server = source
.build(SourceContext::new_test(tx, None))
.build(SourceContext::new_test(sender, None))
.await
.unwrap();
tokio::spawn(server);
Expand Down Expand Up @@ -210,33 +214,56 @@ mod tests {
}],
});
let _ = client.export(req).await;
let mut output = test_util::collect_ready(rx).await;
let mut output = test_util::collect_ready(logs_output).await;
// we just send one, so only one output
assert_eq!(output.len(), 1);
let actual_event = output.pop().unwrap();
let expect = r#"
{
"attributes": {"attr_key": "attr_val"},
"resources": {"res_key": "res_val"},
"message": "log body",
"trace_id": "4ac52aadf321c2e531db005df08792f5",
"span_id": "0b9e4bda2a55530d",
"severity_number": 9,
"severity_text": "info",
"flags": 4,
"timestamp": 1,
"observed_time_unix_nano": 2,
"dropped_attributes_count": 3
}
"#;
let expect_json: serde_json::Value = serde_json::from_str(expect).unwrap();
let expect_event = Event::try_from(expect_json).unwrap();
let expect_vec = vec_into_btmap(vec![
(
"attributes",
Value::Object(vec_into_btmap(vec![("attr_key", "attr_val".into())])),
),
(
"resources",
Value::Object(vec_into_btmap(vec![("res_key", "res_val".into())])),
),
("message", "log body".into()),
("trace_id", "4ac52aadf321c2e531db005df08792f5".into()),
("span_id", "0b9e4bda2a55530d".into()),
("severity_number", 9.into()),
("severity_text", "info".into()),
("flags", 4.into()),
("dropped_attributes_count", 3.into()),
("timestamp", Utc.timestamp_nanos(1).into()),
("observed_time_unix_nano", Utc.timestamp_nanos(2).into()),
]);
let expect_event = Event::from(LogEvent::from(expect_vec));
assert_eq!(actual_event, expect_event);
})
.await;
}
fn new_source(
status: EventStatus,
) -> (
SourceSender,
impl Stream<Item = Event>,
impl Stream<Item = Event>,
) {
let (mut sender, recv) = SourceSender::new_test_finalize(status);
let logs_output = sender
.add_outputs(status, LOGS.to_string())
.flat_map(into_event_stream);
(sender, logs_output, recv)
}
fn str_into_hex_bytes(s: &str) -> Vec<u8> {
// unwrap is okay in test
hex::decode(s).unwrap()
}
fn vec_into_btmap(arr: Vec<(&'static str, Value)>) -> BTreeMap<String, Value> {
BTreeMap::from_iter(
arr.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect::<Vec<(_, _)>>(),
)
}
}

0 comments on commit e8cffd0

Please sign in to comment.