Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

respect user provided timestamp #753

Merged
merged 2 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ public class BulkIngestApi {
private final DatasetRateLimitingService datasetRateLimitingService;
private final MeterRegistry meterRegistry;
private final Counter incomingByteTotal;
private final Counter incomingDocsTotal;
private final Timer bulkIngestTimer;
private final String BULK_INGEST_INCOMING_BYTE_TOTAL = "kaldb_preprocessor_incoming_byte";
private final String BULK_INGEST_INCOMING_BYTE_DOCS = "kaldb_preprocessor_incoming_docs";
private final String BULK_INGEST_TIMER = "kaldb_preprocessor_bulk_ingest";

public BulkIngestApi(
Expand All @@ -41,6 +43,7 @@ public BulkIngestApi(
this.datasetRateLimitingService = datasetRateLimitingService;
this.meterRegistry = meterRegistry;
this.incomingByteTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_TOTAL);
this.incomingDocsTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_DOCS);
this.bulkIngestTimer = meterRegistry.timer(BULK_INGEST_TIMER);
}

Expand Down Expand Up @@ -70,6 +73,7 @@ public HttpResponse addDocument(String bulkRequest) {
}

for (Map.Entry<String, List<Trace.Span>> indexDocs : docs.entrySet()) {
incomingDocsTotal.increment(indexDocs.getValue().size());
vthacker marked this conversation as resolved.
Show resolved Hide resolved
final String index = indexDocs.getKey();
if (!datasetRateLimitingService.tryAcquire(index, indexDocs.getValue())) {
BulkIngestResponse response = new BulkIngestResponse(0, 0, "rate limit exceeded");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
import com.slack.kaldb.writer.SpanFormatter;
import com.slack.service.murron.trace.Trace;
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -36,12 +39,43 @@ public static Map<String, List<Trace.Span>> parseRequest(byte[] postBody) throws
return convertIndexRequestToTraceFormat(parseBulkRequest(postBody));
}

protected static Trace.Span fromIngestDocument(IngestDocument ingestDocument) {
ZonedDateTime timestamp =
(ZonedDateTime)
/**
* We need users to be able to specify the timestamp field and unit. For now we will do the
* following: 1. Check to see if the "timestamp" field exists and if it does parse that as a long
* in millis 2. Check if a field called `@timestamp` exists and parse that as a date (since
* logstash sets that) 3. Use the current time from the ingestMetadata
*/
public static long getTimestampFromIngestDocument(IngestDocument ingestDocument) {
vthacker marked this conversation as resolved.
Show resolved Hide resolved
// assumption that the provided timestamp is in millis
// at some point both th unit and field need to be configurable
// when we do that, remember to change the called to appropriately remove the field
if (ingestDocument.hasField("timestamp")) {
return ingestDocument.getFieldValue("timestamp", Long.class);
}

if (ingestDocument.hasField("_timestamp")) {
return ingestDocument.getFieldValue("_timestamp", Long.class);
}

if (ingestDocument.hasField("@timestamp")) {
String dateString = ingestDocument.getFieldValue("@timestamp", String.class);
LocalDateTime localDateTime =
LocalDateTime.parse(dateString, DateTimeFormatter.ISO_DATE_TIME);
Instant instant = localDateTime.toInstant(ZoneOffset.UTC);
return instant.toEpochMilli();
}

return ((ZonedDateTime)
ingestDocument
.getIngestMetadata()
.getOrDefault("timestamp", ZonedDateTime.now(ZoneOffset.UTC));
.getOrDefault("timestamp", ZonedDateTime.now(ZoneOffset.UTC)))
.toInstant()
.toEpochMilli();
}

protected static Trace.Span fromIngestDocument(IngestDocument ingestDocument) {

long timestampInMillis = getTimestampFromIngestDocument(ingestDocument);

Map<String, Object> sourceAndMetadata = ingestDocument.getSourceAndMetadata();
String id = (String) sourceAndMetadata.get(IngestDocument.Metadata.ID.getFieldName());
Expand All @@ -56,15 +90,19 @@ protected static Trace.Span fromIngestDocument(IngestDocument ingestDocument) {
spanBuilder.setId(ByteString.copyFrom(id.getBytes()));
// Trace.Span proto expects duration in microseconds today
spanBuilder.setTimestamp(
TimeUnit.MICROSECONDS.convert(timestamp.toInstant().toEpochMilli(), TimeUnit.MILLISECONDS));
TimeUnit.MICROSECONDS.convert(timestampInMillis, TimeUnit.MILLISECONDS));

// Remove the following internal metadata fields that OpenSearch adds
sourceAndMetadata.remove(IngestDocument.Metadata.ROUTING.getFieldName());
sourceAndMetadata.remove(IngestDocument.Metadata.VERSION.getFieldName());
sourceAndMetadata.remove(IngestDocument.Metadata.VERSION_TYPE.getFieldName());
// these two fields don't need to be tags as they have been explicitly set already

// these fields don't need to be tags as they have been explicitly set already
sourceAndMetadata.remove(IngestDocument.Metadata.ID.getFieldName());
sourceAndMetadata.remove(IngestDocument.Metadata.INDEX.getFieldName());
sourceAndMetadata.remove("timestamp");
sourceAndMetadata.remove("_timestamp");
sourceAndMetadata.remove("@timestamp");

sourceAndMetadata.forEach(
(key, value) -> spanBuilder.addTags(SpanFormatter.convertKVtoProto(key, value)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.index.VersionType;
import org.opensearch.ingest.IngestDocument;

public class BulkApiRequestParserTest {
Expand All @@ -28,13 +29,13 @@ private byte[] getRawQueryBytes(String filename) throws IOException {

@Test
public void testSimpleIndexRequest() throws Exception {
byte[] rawRequest = getRawQueryBytes("index_simple");
byte[] rawRequest = getRawQueryBytes("index_simple_with_ts");

List<IndexRequest> indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest);
assertThat(indexRequests.size()).isEqualTo(1);
assertThat(indexRequests.get(0).index()).isEqualTo("test");
assertThat(indexRequests.get(0).id()).isEqualTo("1");
assertThat(indexRequests.get(0).sourceAsMap().size()).isEqualTo(2);
assertThat(indexRequests.get(0).sourceAsMap().size()).isEqualTo(3);

Map<String, List<Trace.Span>> indexDocs =
BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests);
Expand All @@ -51,6 +52,7 @@ public void testSimpleIndexRequest() throws Exception {
&& keyValue.getVStr().equals("test"))
.count())
.isEqualTo(1);
assertThat(indexDocs.get("test").get(0).getTimestamp()).isEqualTo(4739680479544000L);
}

@Test
Expand Down Expand Up @@ -210,5 +212,53 @@ public void testTraceSpanGeneratedTimestamp() throws IOException {
TimeUnit.MILLISECONDS.convert(span.getTimestamp(), TimeUnit.MICROSECONDS));
Instant oneMinuteBefore = Instant.now().minus(1, ChronoUnit.MINUTES);
assertThat(oneMinuteBefore.isBefore(ingestDocumentTime)).isTrue();

Instant oneMinuteAfter = Instant.now().plus(1, ChronoUnit.MINUTES);
assertThat(ingestDocumentTime.isBefore(oneMinuteAfter)).isTrue();
}

@Test
public void testTimestampParsingFromIngestDocument() {
IngestDocument ingestDocument =
new IngestDocument("index", "1", "routing", 1L, VersionType.INTERNAL, Map.of());
long timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument);
Instant ingestDocumentTime = Instant.ofEpochMilli(timeInMillis);

// this tests that the parser inserted a timestamp close to the current time
Instant oneMinuteBefore = Instant.now().minus(1, ChronoUnit.MINUTES);
Instant oneMinuteAfter = Instant.now().plus(1, ChronoUnit.MINUTES);
assertThat(oneMinuteBefore.isBefore(ingestDocumentTime)).isTrue();
assertThat(ingestDocumentTime.isBefore(oneMinuteAfter)).isTrue();

// We respect the user provided @timestamp field
String ts = "2024-01-01T00:00:00Z";
Instant providedTimeStamp = Instant.parse(ts);
ingestDocument =
new IngestDocument(
"index", "1", "routing", 1L, VersionType.INTERNAL, Map.of("@timestamp", ts));
timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument);
assertThat(timeInMillis).isEqualTo(providedTimeStamp.toEpochMilli());

ingestDocument =
new IngestDocument(
"index",
"1",
"routing",
1L,
VersionType.INTERNAL,
Map.of("timestamp", providedTimeStamp.toEpochMilli()));
timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument);
assertThat(timeInMillis).isEqualTo(providedTimeStamp.toEpochMilli());

ingestDocument =
new IngestDocument(
"index",
"1",
"routing",
1L,
VersionType.INTERNAL,
Map.of("_timestamp", providedTimeStamp.toEpochMilli()));
timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument);
assertThat(timeInMillis).isEqualTo(providedTimeStamp.toEpochMilli());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ public void testDeleteStaleDataDoesNothingWhenGivenLimitLessThan0()
metricsRegistry, 10 * 1024 * 1024 * 1024L, 1000000L);

KaldbConfigs.IndexerConfig indexerConfig =
KaldbConfigUtil.makeIndexerConfig(TEST_PORT, 1000, "log_message", 100, -1, 0);
KaldbConfigUtil.makeIndexerConfig(TEST_PORT, 1000, "log_message", 100, -1, 10_000);
vthacker marked this conversation as resolved.
Show resolved Hide resolved
initChunkManager(
chunkRollOverStrategy,
S3_TEST_BUCKET,
Expand Down Expand Up @@ -312,8 +312,7 @@ public void testDeleteStaleDataDoesNothingWhenGivenLimitLessThan0()

// Get the count of the amount of indices so that we can confirm we've cleaned them up
// after the rollover
final File dataDirectory = new File(indexerConfig.getDataDirectory());
final File indexDirectory = new File(dataDirectory.getAbsolutePath() + "/indices");
final File indexDirectory = tmpPath.resolve("indices").toFile();
vthacker marked this conversation as resolved.
Show resolved Hide resolved

// files before rollover may or may-not be null, depending on other test timing
int filesBeforeRollover =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1", "field2" : "value2" }
{ "field1" : "value1", "field2" : "value2"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1", "field2" : "value2", "@timestamp": "2120-03-12T09:54:39.544Z" }
Loading