Skip to content

Commit

Permalink
respect user provided timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
vthacker committed Jan 27, 2024
1 parent 0adba74 commit 3032a5c
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
import com.slack.kaldb.writer.SpanFormatter;
import com.slack.service.murron.trace.Trace;
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -36,12 +39,36 @@ public static Map<String, List<Trace.Span>> parseRequest(byte[] postBody) throws
return convertIndexRequestToTraceFormat(parseBulkRequest(postBody));
}

protected static Trace.Span fromIngestDocument(IngestDocument ingestDocument) {
ZonedDateTime timestamp =
(ZonedDateTime)
/**
* We need users to be able to specify the timestamp field and unit. For now we will do the
* following: 1. Check to see if the "timestamp" field exists and if it does parse that as a long
* in millis 2. Check if a field called `@timestamp` exists and parse that as a date (since
* logstash sets that) 3. Use the current time from the ingestMetadata
*/
public static long getTimestampFromIngestDocument(IngestDocument ingestDocument) {
if (ingestDocument.hasField("timestamp") || ingestDocument.hasField("_timestamp")) {
return ingestDocument.getFieldValue("timestamp", Long.class);
}

if (ingestDocument.hasField("@timestamp")) {
String dateString = ingestDocument.getFieldValue("@timestamp", String.class);
LocalDateTime localDateTime =
LocalDateTime.parse(dateString, DateTimeFormatter.ISO_DATE_TIME);
Instant instant = localDateTime.toInstant(ZoneOffset.UTC);
return instant.toEpochMilli();
}

return ((ZonedDateTime)
ingestDocument
.getIngestMetadata()
.getOrDefault("timestamp", ZonedDateTime.now(ZoneOffset.UTC));
.getOrDefault("timestamp", ZonedDateTime.now(ZoneOffset.UTC)))
.toInstant()
.toEpochMilli();
}

protected static Trace.Span fromIngestDocument(IngestDocument ingestDocument) {

long timestampInMillis = getTimestampFromIngestDocument(ingestDocument);

Map<String, Object> sourceAndMetadata = ingestDocument.getSourceAndMetadata();
String id = (String) sourceAndMetadata.get(IngestDocument.Metadata.ID.getFieldName());
Expand All @@ -56,7 +83,7 @@ protected static Trace.Span fromIngestDocument(IngestDocument ingestDocument) {
spanBuilder.setId(ByteString.copyFrom(id.getBytes()));
// Trace.Span proto expects duration in microseconds today
spanBuilder.setTimestamp(
TimeUnit.MICROSECONDS.convert(timestamp.toInstant().toEpochMilli(), TimeUnit.MILLISECONDS));
TimeUnit.MICROSECONDS.convert(timestampInMillis, TimeUnit.MILLISECONDS));

// Remove the following internal metadata fields that OpenSearch adds
sourceAndMetadata.remove(IngestDocument.Metadata.ROUTING.getFieldName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.ingest.IngestDocument;
Expand All @@ -34,15 +31,15 @@ public void testSimpleIndexRequest() throws Exception {
assertThat(indexRequests.size()).isEqualTo(1);
assertThat(indexRequests.get(0).index()).isEqualTo("test");
assertThat(indexRequests.get(0).id()).isEqualTo("1");
assertThat(indexRequests.get(0).sourceAsMap().size()).isEqualTo(2);
assertThat(indexRequests.get(0).sourceAsMap().size()).isEqualTo(3);

Map<String, List<Trace.Span>> indexDocs =
BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests);
assertThat(indexDocs.keySet().size()).isEqualTo(1);
assertThat(indexDocs.get("test").size()).isEqualTo(1);

assertThat(indexDocs.get("test").get(0).getId().toStringUtf8()).isEqualTo("1");
assertThat(indexDocs.get("test").get(0).getTagsList().size()).isEqualTo(3);
assertThat(indexDocs.get("test").get(0).getTagsList().size()).isEqualTo(4);
assertThat(
indexDocs.get("test").get(0).getTagsList().stream()
.filter(
Expand All @@ -51,6 +48,7 @@ public void testSimpleIndexRequest() throws Exception {
&& keyValue.getVStr().equals("test"))
.count())
.isEqualTo(1);
assertThat(indexDocs.get("test").get(0).getTimestamp()).isEqualTo(4739680479544000L);
}

@Test
Expand Down Expand Up @@ -205,10 +203,6 @@ public void testTraceSpanGeneratedTimestamp() throws IOException {
Trace.Span span = BulkApiRequestParser.fromIngestDocument(ingestDocument);

// timestamp is in microseconds based on the trace.proto definition
Instant ingestDocumentTime =
Instant.ofEpochMilli(
TimeUnit.MILLISECONDS.convert(span.getTimestamp(), TimeUnit.MICROSECONDS));
Instant oneMinuteBefore = Instant.now().minus(1, ChronoUnit.MINUTES);
assertThat(oneMinuteBefore.isBefore(ingestDocumentTime)).isTrue();
assertThat(span.getTimestamp()).isEqualTo(4739680479544000L);
}
}
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1", "field2" : "value2" }
{ "field1" : "value1", "field2" : "value2", "@timestamp": "2120-03-12T09:54:39.544Z" }

0 comments on commit 3032a5c

Please sign in to comment.