Skip to content

Commit

Permalink
Added support for 'epoch_micro' in date processor (#4098)
Browse files Browse the repository at this point in the history
* Added support for 'epoch_micro' in date processor

Signed-off-by: Utkarsh Agarwal <[email protected]>

* Added support for 'epoch_micro' in date processor

Signed-off-by: Utkarsh Agarwal <[email protected]>

---------

Signed-off-by: Utkarsh Agarwal <[email protected]>
Co-authored-by: Utkarsh Agarwal <[email protected]>
  • Loading branch information
Utkarsh-Aga and Utkarsh Agarwal authored Feb 13, 2024
1 parent 3da1696 commit 72e5a92
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 3 deletions.
2 changes: 1 addition & 1 deletion data-prepper-plugins/date-processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ valid key and at least one pattern is required if match is configured.
* `patterns`: List of possible patterns the timestamp value of key can have. The patterns are based on sequence of letters and symbols.
The `patterns` support all the patterns listed in Java
[DatetimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html).
and also supports `epoch_second`, `epoch_milli` and `epoch_nano` values which represents the timestamp as the number of seconds, milliseconds and nano seconds since epoch. Epoch values are always UTC time zone.
and also supports `epoch_second`, `epoch_milli`, `epoch_micro` and `epoch_nano` values which represents the timestamp as the number of seconds, milliseconds, microseconds and nano seconds since epoch. Epoch values are always UTC time zone.
* Type: `List<String>`

The following example of date configuration will use `timestamp` key to match against given patterns and stores the timestamp in ISO 8601
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class DateProcessor extends AbstractProcessor<Record<Event>, Record<Event
private static final String OUTPUT_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
private static final int LENGTH_OF_EPOCH_IN_MILLIS = 13;
private static final int LENGTH_OF_EPOCH_SECONDS = 10;

private static final int LENGTH_OF_EPOCH_MICROSECONDS = 16;
static final String DATE_PROCESSING_MATCH_SUCCESS = "dateProcessingMatchSuccess";
static final String DATE_PROCESSING_MATCH_FAILURE = "dateProcessingMatchFailure";

Expand Down Expand Up @@ -165,6 +165,9 @@ private Pair<String, Instant> getEpochFormatOutput(Instant time) {
return Pair.of(Long.toString(time.getEpochSecond()), time);
} else if (outputFormat.equals("epoch_milli")) {
return Pair.of(Long.toString(time.toEpochMilli()), time);
} else if (outputFormat.equals("epoch_micro")) {
long micro = (long)time.getEpochSecond() * 1000_000 + (long) time.getNano() / 1000;
return Pair.of(Long.toString(micro), time);
} else { // epoch_nano. validation for valid epoch_ should be
// done at init time
long nano = (long)time.getEpochSecond() * 1000_000_000 + (long) time.getNano();
Expand All @@ -187,13 +190,20 @@ private Pair<String, Instant> getFormattedDateTimeString(final String sourceTime
}
if (numberValue != null) {
int timestampLength = sourceTimestamp.length();
if (timestampLength > LENGTH_OF_EPOCH_IN_MILLIS) {
if (timestampLength > LENGTH_OF_EPOCH_MICROSECONDS) {
if (epochFormatters.contains("epoch_nano")) {
epochTime = Instant.ofEpochSecond(numberValue/1000_000_000, numberValue % 1000_000_000);
} else {
LOG.warn("Source time value is larger than epoch pattern configured. epoch_nano is expected but not present in the patterns list");
return null;
}
} else if (timestampLength > LENGTH_OF_EPOCH_IN_MILLIS) {
if (epochFormatters.contains("epoch_micro")) {
epochTime = Instant.ofEpochSecond(numberValue/1000_000, (numberValue % 1000_000) * 1000);
} else {
LOG.warn("Source time value is larger than epoch pattern configured. epoch_micro is expected but not present in the patterns list");
return null;
}
} else if (timestampLength > LENGTH_OF_EPOCH_SECONDS) {
if (epochFormatters.contains("epoch_milli")) {
epochTime = Instant.ofEpochMilli(numberValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public boolean isValidPatterns() {
public static boolean isValidPattern(final String pattern) {
if (pattern.equals("epoch_second") ||
pattern.equals("epoch_milli") ||
pattern.equals("epoch_micro") ||
pattern.equals("epoch_nano")) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ void testValidAndInvalidOutputFormats() throws NoSuchFieldException, IllegalAcce
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true));
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_nano");
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true));
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_micro");
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true));
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_xyz");
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(false));
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "yyyy-MM-dd'T'HH:mm:ss.nnnnnnnnnXXX");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,12 +283,15 @@ private static Stream<Arguments> getInputOutputFormats() {
Random random = new Random();
long millis = random.nextInt(1000);
long nanos = random.nextInt(1000_000_000);
long micros = random.nextInt(1000_000);
long epochMillis = epochSeconds * 1000L + millis;
long epochNanos = epochSeconds * 1000_000_000L + nanos;
long epochMicros = epochSeconds * 1000_000L + micros;

ZonedDateTime zdtSeconds = ZonedDateTime.ofInstant(Instant.ofEpochSecond(epochSeconds), java.time.ZoneId.systemDefault());
ZonedDateTime zdtMillis = ZonedDateTime.ofInstant(Instant.ofEpochMilli(epochMillis), java.time.ZoneId.systemDefault());
ZonedDateTime zdtNanos = ZonedDateTime.ofInstant(Instant.ofEpochSecond(epochSeconds, nanos), java.time.ZoneId.systemDefault());
ZonedDateTime zdtMicros = ZonedDateTime.ofInstant(Instant.ofEpochSecond(epochSeconds, micros * 1000), java.time.ZoneId.systemDefault());
String testFormat = "yyyy-MMM-dd HH:mm:ss.SSS";
String testNanosFormat = "yyyy-MMM-dd HH:mm:ss.nnnnnnnnnXXX";
String defaultFormat = DateProcessorConfig.DEFAULT_OUTPUT_FORMAT;
Expand All @@ -305,9 +308,14 @@ private static Stream<Arguments> getInputOutputFormats() {
arguments("epoch_nano", epochNanos, "epoch_milli", Long.toString(epochNanos/1000_000)),
arguments("epoch_nano", epochNanos, testNanosFormat, zdtNanos.format(DateTimeFormatter.ofPattern(testNanosFormat))),
arguments("epoch_nano", epochNanos, defaultFormat, zdtNanos.format(DateTimeFormatter.ofPattern(defaultFormat))),
arguments("epoch_micro", epochMicros, "epoch_second", Long.toString(epochSeconds)),
arguments("epoch_micro", epochMicros, "epoch_milli", Long.toString(epochMicros/1000)),
arguments("epoch_micro", epochMicros, testFormat, zdtMicros.format(DateTimeFormatter.ofPattern(testFormat))),
arguments("epoch_micro", epochMicros, defaultFormat, zdtMicros.format(DateTimeFormatter.ofPattern(defaultFormat))),
arguments(testNanosFormat, zdtNanos.format(DateTimeFormatter.ofPattern(testNanosFormat)), "epoch_second", Long.toString(epochSeconds)),
arguments(testNanosFormat, zdtNanos.format(DateTimeFormatter.ofPattern(testNanosFormat)), "epoch_milli", Long.toString(epochNanos/1000_000)),
arguments(testNanosFormat, zdtNanos.format(DateTimeFormatter.ofPattern(testNanosFormat)), "epoch_nano", Long.toString(epochNanos)),
arguments(testNanosFormat, zdtNanos.format(DateTimeFormatter.ofPattern(testNanosFormat)), "epoch_micro", Long.toString(epochNanos/1000)),
arguments(testNanosFormat, zdtNanos.format(DateTimeFormatter.ofPattern(testNanosFormat)), defaultFormat, zdtNanos.format(DateTimeFormatter.ofPattern(defaultFormat)))
);
}
Expand Down

0 comments on commit 72e5a92

Please sign in to comment.