Skip to content

Commit

Permalink
Added stringtodate and datetostring UDFs (#1851)
Browse files Browse the repository at this point in the history
  • Loading branch information
Andy Bryant authored and andybryant committed Sep 27, 2018
1 parent 965e1e4 commit d425cae
Show file tree
Hide file tree
Showing 9 changed files with 407 additions and 9 deletions.
18 changes: 16 additions & 2 deletions docs/developer-guide/syntax-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,14 @@ Scalar functions
+------------------------+------------------------------------------------------------+---------------------------------------------------+
| CONCAT | ``CONCAT(col1, '_hello')`` | Concatenate two strings. |
+------------------------+------------------------------------------------------------+---------------------------------------------------+
| DATETOSTRING | ``DATETOSTRING(START_DATE, 'yyyy-MM-dd')`` | Converts an INT date value into |
| | | the string representation of the date in |
| | | the given format. Single quotes in the |
| | | timestamp format can be escaped with '', for |
| | | example: 'yyyy-MM-dd''T'''. |
| | | The integer represents days since epoch |
| | | matching the encoding used by Kafka Connect dates.|
+------------------------+------------------------------------------------------------+---------------------------------------------------+
| EXTRACTJSONFIELD | ``EXTRACTJSONFIELD(message, '$.log.cloud')`` | Given a string column in JSON format, extract |
| | | the field that matches. |
| | | |
Expand Down Expand Up @@ -1119,7 +1127,7 @@ Scalar functions
| | | will return ``My Txxx--nnn``. |
+------------------------+------------------------------------------------------------+---------------------------------------------------+
| MASK_KEEP_RIGHT | ``MASK_KEEP_RIGHT(col1, numChars, 'X', 'x', 'n', '-')`` | Similar to the ``MASK`` function above, except |
| | | that the last or rightt-most ``numChars`` |
| | | that the last or right-most ``numChars`` |
| | | characters will not be masked in any way. |
| | | For example:``MASK_KEEP_RIGHT("My Test $123", 4)``|
| | | will return ``Xx-Xxxx-$123``. |
Expand All @@ -1131,7 +1139,7 @@ Scalar functions
| | | will return ``Xx-Xest $123``. |
+------------------------+------------------------------------------------------------+---------------------------------------------------+
| MASK_RIGHT | ``MASK_RIGHT(col1, numChars, 'X', 'x', 'n', '-')`` | Similar to the ``MASK`` function above, except |
| | | that only the last or rightt-most ``numChars`` |
| | | that only the last or right-most ``numChars`` |
| | | characters will have any masking applied to them. |
| | | For example: ``MASK_RIGHT("My Test $123", 4)`` |
| | | will return ``My Test -nnn``. |
Expand All @@ -1140,6 +1148,12 @@ Scalar functions
+------------------------+------------------------------------------------------------+---------------------------------------------------+
| ROUND | ``ROUND(col1)`` | Round a value to the nearest BIGINT value. |
+------------------------+------------------------------------------------------------+---------------------------------------------------+
| STRINGTODATE | ``STRINGTODATE(col1, 'yyyy-MM-dd')`` | Converts a string value in the given |
| | | format into the INT value |
| | | that represents days since epoch. Single |
| | | quotes in the timestamp format can be escaped with|
| | | '', for example: 'yyyy-MM-dd''T'''. |
+------------------------+------------------------------------------------------------+---------------------------------------------------+
| STRINGTOTIMESTAMP | ``STRINGTOTIMESTAMP(col1, 'yyyy-MM-dd HH:mm:ss.SSS')`` | Converts a string value in the given |
| | | format into the BIGINT value |
| | | that represents the millisecond timestamp. Single |
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.confluent.ksql.function.udf.datetime;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;

@UdfDescription(name = "datetostring", author = "Confluent",
description = "Converts an integer representing days since epoch to a date string using the given format pattern."
+ " Note this is the format Kafka Connect uses to represent dates with no time component."
+ " The format pattern should be in the format expected by java.time.format.DateTimeFormatter")
public class DateToString {

private final LoadingCache<String, DateTimeFormatter> formatters =
CacheBuilder.newBuilder()
.maximumSize(1000)
.build(CacheLoader.from(DateTimeFormatter::ofPattern));

@Udf(description = "Converts an integer representing days since epoch to a string using the given format pattern."
+ " The format pattern should be in the format expected by java.time.format.DateTimeFormatter")
public String dateToString(final Integer daysSinceEpoch, final String formatPattern) {
final DateTimeFormatter formatter = formatters.getUnchecked(formatPattern);
return LocalDate.ofEpochDay(daysSinceEpoch).format(formatter);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.confluent.ksql.function.udf.datetime;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;

@UdfDescription(name = "stringtodate", author = "Confluent",
description = "Converts a string representation of a date into an integer representing"
+ " days since epoch using the given format pattern."
+ " Note this is the format Kafka Connect uses to represent dates with no time component."
+ " The format pattern should be in the format expected by java.time.format.DateTimeFormatter")
public class StringToDate {

private final LoadingCache<String, DateTimeFormatter> formatters =
CacheBuilder.newBuilder()
.maximumSize(1000)
.build(CacheLoader.from(DateTimeFormatter::ofPattern));

@Udf(description = "Converts a string representation of a date into an integer representing"
+ " days since epoch using the given format pattern."
+ " The format pattern should be in the format expected by java.time.format.DateTimeFormatter")
public int stringToDate(final String formattedDate, final String formatPattern) {
DateTimeFormatter formatter = formatters.getUnchecked(formatPattern);
return ((int)LocalDate.parse(formattedDate, formatter).toEpochDay());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.ksql.function.udf.datetime;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

import com.google.common.util.concurrent.UncheckedExecutionException;
import io.confluent.ksql.function.KsqlFunctionException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.TimeZone;
import java.util.stream.IntStream;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class DateToStringTest {

private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
private static final TimeZone UTC = TimeZone.getTimeZone("UTC");

private DateToString udf;

@Rule
public final ExpectedException expectedException = ExpectedException.none();

@Before
public void setUp(){
udf = new DateToString();
}

@Test
public void shouldConvertDateToString() {
// When:
final String result = udf.dateToString(16383, "yyyy-MM-dd");

// Then:
final String expectedResult = expectedResult(16383, "yyyy-MM-dd");
assertThat(result, is(expectedResult));
}

@Test
public void shouldRoundTripWithStringToDate() {
final String format = "dd/MM/yyyy'Freya'";
final StringToDate stringToDate = new StringToDate();
IntStream.range(-10_000, 20_000)
.parallel()
.forEach(idx -> {
final String result = udf.dateToString(idx, format);
final String expectedResult = expectedResult(idx, format);
assertThat(result, is(expectedResult));

final int daysSinceEpoch = stringToDate.stringToDate(result, format);
assertThat(daysSinceEpoch, is(idx));
});
}

@Test
public void shouldSupportEmbeddedChars() {
// When:
final Object result = udf.dateToString(12345, "yyyy-dd-MM'Fred'");

// Then:
final String expectedResult = expectedResult(12345, "yyyy-dd-MM'Fred'");
assertThat(result, is(expectedResult));
}

@Test
public void shouldThrowIfFormatInvalid() {
expectedException.expect(UncheckedExecutionException.class);
expectedException.expectMessage("Unknown pattern letter: i");
udf.dateToString(44444, "invalid");
}

@Test
public void shouldBeThreadSafe() {
IntStream.range(0, 10_000)
.parallel()
.forEach(idx -> {
shouldConvertDateToString();
udf.dateToString(55555, "yyyy-MM-dd");
});
}

@Test
public void shouldWorkWithManyDifferentFormatters() {
IntStream.range(0, 10_000)
.parallel()
.forEach(idx -> {
try {
final String pattern = "yyyy-MM-dd'X" + idx + "'";
final String result = udf.dateToString(idx, pattern);
final String expectedResult = expectedResult(idx, pattern);
assertThat(result, is(expectedResult));
} catch (final Exception e) {
Assert.fail(e.getMessage());
}
});
}

private String expectedResult(final int daysSinceEpoch, final String formatPattern) {
SimpleDateFormat dateFormat = new SimpleDateFormat(formatPattern);
dateFormat.setCalendar(Calendar.getInstance(UTC));
return dateFormat.format(new java.util.Date(daysSinceEpoch * MILLIS_PER_DAY));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.ksql.function.udf.datetime;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

import com.google.common.util.concurrent.UncheckedExecutionException;
import io.confluent.ksql.function.KsqlFunctionException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.format.DateTimeParseException;
import java.util.Calendar;
import java.util.Date;
import java.util.Locale;
import java.util.TimeZone;
import java.util.stream.IntStream;
import org.apache.kafka.connect.errors.DataException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class StringToDateTest {

private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
private static final TimeZone UTC = TimeZone.getTimeZone("UTC");

private StringToDate udf;

@Before
public void setUp(){
udf = new StringToDate();
}

@Test
public void shouldConvertStringToDate() throws ParseException {
// When:
final int result = udf.stringToDate("2021-12-01", "yyyy-MM-dd");

// Then:
final int expectedResult = expectedResult("2021-12-01", "yyyy-MM-dd");
assertThat(result, is(expectedResult));
}

@Test
public void shouldSupportEmbeddedChars() throws ParseException {
// When:
final Object result = udf.stringToDate("2021-12-01Fred", "yyyy-MM-dd'Fred'");

// Then:
final int expectedResult = expectedResult("2021-12-01Fred", "yyyy-MM-dd'Fred'");
assertThat(result, is(expectedResult));
}

@Test(expected = UncheckedExecutionException.class)
public void shouldThrowIfFormatInvalid() {
udf.stringToDate("2021-12-01", "invalid");
}

@Test(expected = DateTimeParseException.class)
public void shouldThrowIfParseFails() {
udf.stringToDate("invalid", "yyyy-MM-dd");
}

@Test(expected = DateTimeParseException.class)
public void shouldThrowOnEmptyString() {
udf.stringToDate("", "yyyy-MM-dd");
}

@Test
public void shouldBeThreadSafe() {
IntStream.range(0, 10_000)
.parallel()
.forEach(idx -> {
try {
shouldConvertStringToDate();
} catch (final Exception e) {
Assert.fail(e.getMessage());
}
udf.stringToDate("1988-01-12", "yyyy-MM-dd");
});
}

@Test
public void shouldWorkWithManyDifferentFormatters() {
IntStream.range(0, 10_000)
.parallel()
.forEach(idx -> {
try {
final String sourceDate = "2021-12-01X" + idx;
final String pattern = "yyyy-MM-dd'X" + idx + "'";
final int result = udf.stringToDate(sourceDate, pattern);
final int expectedResult = expectedResult(sourceDate, pattern);
assertThat(result, is(expectedResult));
} catch (final Exception e) {
Assert.fail(e.getMessage());
}
});
}


private int expectedResult(final String formattedDate, final String formatPattern) throws ParseException {
SimpleDateFormat dateFormat = new SimpleDateFormat(formatPattern);
dateFormat.setCalendar(Calendar.getInstance(UTC));
Date parsedDate = dateFormat.parse(formattedDate);
Calendar calendar = Calendar.getInstance(UTC);
calendar.setTime(parsedDate);
if (calendar.get(Calendar.HOUR_OF_DAY) != 0 || calendar.get(Calendar.MINUTE) != 0 ||
calendar.get(Calendar.SECOND) != 0 || calendar.get(Calendar.MILLISECOND) != 0) {
fail("Date should not have any time fields set to non-zero values.");
}
return (int)(calendar.getTimeInMillis() / MILLIS_PER_DAY);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void setUp(){
}

@Test
public void shouldCovertStringToTimestamp() throws ParseException {
public void shouldConvertStringToTimestamp() throws ParseException {
// When:
final Object result = udf.evaluate("2021-12-01 12:10:11.123", "yyyy-MM-dd HH:mm:ss.SSS");

Expand Down Expand Up @@ -80,7 +80,7 @@ public void shouldThrowIfParseFails() {

@Test(expected = KsqlFunctionException.class)
public void shouldThrowOnEmptyString() {
udf.evaluate("invalid", "yyyy-MM-dd'T'HH:mm:ss.SSS");
udf.evaluate("", "yyyy-MM-dd'T'HH:mm:ss.SSS");
}

@Test
Expand All @@ -89,7 +89,7 @@ public void shouldBeThreadSafe() {
.parallel()
.forEach(idx -> {
try {
shouldCovertStringToTimestamp();
shouldConvertStringToTimestamp();
} catch (final ParseException e) {
Assert.fail(e.getMessage());
}
Expand Down
Loading

0 comments on commit d425cae

Please sign in to comment.