diff --git a/docs/developer-guide/syntax-reference.rst b/docs/developer-guide/syntax-reference.rst index 0ceb43de8681..eacdac684a1b 100644 --- a/docs/developer-guide/syntax-reference.rst +++ b/docs/developer-guide/syntax-reference.rst @@ -1071,6 +1071,14 @@ Scalar functions +------------------------+------------------------------------------------------------+---------------------------------------------------+ | CONCAT | ``CONCAT(col1, '_hello')`` | Concatenate two strings. | +------------------------+------------------------------------------------------------+---------------------------------------------------+ +| DATETOSTRING | ``DATETOSTRING(START_DATE, 'yyyy-MM-dd')`` | Converts an INT date value into | +| | | the string representation of the date in | +| | | the given format. Single quotes in the | +| | | timestamp format can be escaped with '', for | +| | | example: 'yyyy-MM-dd''T'''. | +| | | The integer represents days since epoch | +| | | matching the encoding used by Kafka Connect dates.| ++------------------------+------------------------------------------------------------+---------------------------------------------------+ | EXTRACTJSONFIELD | ``EXTRACTJSONFIELD(message, '$.log.cloud')`` | Given a string column in JSON format, extract | | | | the field that matches. | | | | | @@ -1119,7 +1127,7 @@ Scalar functions | | | will return ``My Txxx--nnn``. | +------------------------+------------------------------------------------------------+---------------------------------------------------+ | MASK_KEEP_RIGHT | ``MASK_KEEP_RIGHT(col1, numChars, 'X', 'x', 'n', '-')`` | Similar to the ``MASK`` function above, except | -| | | that the last or rightt-most ``numChars`` | +| | | that the last or right-most ``numChars`` | | | | characters will not be masked in any way. | | | | For example:``MASK_KEEP_RIGHT("My Test $123", 4)``| | | | will return ``Xx-Xxxx-$123``. | @@ -1131,7 +1139,7 @@ Scalar functions | | | will return ``Xx-Xest $123``. | +------------------------+------------------------------------------------------------+---------------------------------------------------+ | MASK_RIGHT | ``MASK_RIGHT(col1, numChars, 'X', 'x', 'n', '-')`` | Similar to the ``MASK`` function above, except | -| | | that only the last or rightt-most ``numChars`` | +| | | that only the last or right-most ``numChars`` | | | | characters will have any masking applied to them. | | | | For example: ``MASK_RIGHT("My Test $123", 4)`` | | | | will return ``My Test -nnn``. | @@ -1140,6 +1148,12 @@ Scalar functions +------------------------+------------------------------------------------------------+---------------------------------------------------+ | ROUND | ``ROUND(col1)`` | Round a value to the nearest BIGINT value. | +------------------------+------------------------------------------------------------+---------------------------------------------------+ +| STRINGTODATE | ``STRINGTODATE(col1, 'yyyy-MM-dd')`` | Converts a string value in the given | +| | | format into the INT value | +| | | that represents days since epoch. Single | +| | | quotes in the timestamp format can be escaped with| +| | | '', for example: 'yyyy-MM-dd''T'''. | ++------------------------+------------------------------------------------------------+---------------------------------------------------+ | STRINGTOTIMESTAMP | ``STRINGTOTIMESTAMP(col1, 'yyyy-MM-dd HH:mm:ss.SSS')`` | Converts a string value in the given | | | | format into the BIGINT value | | | | that represents the millisecond timestamp. Single | diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/datetime/DateToString.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/datetime/DateToString.java new file mode 100644 index 000000000000..98dc30f05a73 --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/datetime/DateToString.java @@ -0,0 +1,29 @@ +package io.confluent.ksql.function.udf.datetime; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import io.confluent.ksql.function.udf.Udf; +import io.confluent.ksql.function.udf.UdfDescription; +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; + +@UdfDescription(name = "datetostring", author = "Confluent", + description = "Converts an integer representing days since epoch to a date string using the given format pattern." + + " Note this is the format Kafka Connect uses to represent dates with no time component." + + " The format pattern should be in the format expected by java.time.format.DateTimeFormatter") +public class DateToString { + + private final LoadingCache formatters = + CacheBuilder.newBuilder() + .maximumSize(1000) + .build(CacheLoader.from(DateTimeFormatter::ofPattern)); + + @Udf(description = "Converts an integer representing days since epoch to a string using the given format pattern." + + " The format pattern should be in the format expected by java.time.format.DateTimeFormatter") + public String dateToString(final Integer daysSinceEpoch, final String formatPattern) { + final DateTimeFormatter formatter = formatters.getUnchecked(formatPattern); + return LocalDate.ofEpochDay(daysSinceEpoch).format(formatter); + } + +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/datetime/StringToDate.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/datetime/StringToDate.java new file mode 100644 index 000000000000..c676b8e9598f --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/datetime/StringToDate.java @@ -0,0 +1,31 @@ +package io.confluent.ksql.function.udf.datetime; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import io.confluent.ksql.function.udf.Udf; +import io.confluent.ksql.function.udf.UdfDescription; +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; + +@UdfDescription(name = "stringtodate", author = "Confluent", + description = "Converts a string representation of a date into an integer representing" + + " days since epoch using the given format pattern." + + " Note this is the format Kafka Connect uses to represent dates with no time component." + + " The format pattern should be in the format expected by java.time.format.DateTimeFormatter") +public class StringToDate { + + private final LoadingCache formatters = + CacheBuilder.newBuilder() + .maximumSize(1000) + .build(CacheLoader.from(DateTimeFormatter::ofPattern)); + + @Udf(description = "Converts a string representation of a date into an integer representing" + + " days since epoch using the given format pattern." + + " The format pattern should be in the format expected by java.time.format.DateTimeFormatter") + public int stringToDate(final String formattedDate, final String formatPattern) { + DateTimeFormatter formatter = formatters.getUnchecked(formatPattern); + return ((int)LocalDate.parse(formattedDate, formatter).toEpochDay()); + } + +} \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/udf/datetime/DateToStringTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/udf/datetime/DateToStringTest.java new file mode 100644 index 000000000000..a461da4c6acf --- /dev/null +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/udf/datetime/DateToStringTest.java @@ -0,0 +1,127 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.ksql.function.udf.datetime; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +import com.google.common.util.concurrent.UncheckedExecutionException; +import io.confluent.ksql.function.KsqlFunctionException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.TimeZone; +import java.util.stream.IntStream; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class DateToStringTest { + + private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000; + private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); + + private DateToString udf; + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + @Before + public void setUp(){ + udf = new DateToString(); + } + + @Test + public void shouldConvertDateToString() { + // When: + final String result = udf.dateToString(16383, "yyyy-MM-dd"); + + // Then: + final String expectedResult = expectedResult(16383, "yyyy-MM-dd"); + assertThat(result, is(expectedResult)); + } + + @Test + public void shouldRoundTripWithStringToDate() { + final String format = "dd/MM/yyyy'Freya'"; + final StringToDate stringToDate = new StringToDate(); + IntStream.range(-10_000, 20_000) + .parallel() + .forEach(idx -> { + final String result = udf.dateToString(idx, format); + final String expectedResult = expectedResult(idx, format); + assertThat(result, is(expectedResult)); + + final int daysSinceEpoch = stringToDate.stringToDate(result, format); + assertThat(daysSinceEpoch, is(idx)); + }); + } + + @Test + public void shouldSupportEmbeddedChars() { + // When: + final Object result = udf.dateToString(12345, "yyyy-dd-MM'Fred'"); + + // Then: + final String expectedResult = expectedResult(12345, "yyyy-dd-MM'Fred'"); + assertThat(result, is(expectedResult)); + } + + @Test + public void shouldThrowIfFormatInvalid() { + expectedException.expect(UncheckedExecutionException.class); + expectedException.expectMessage("Unknown pattern letter: i"); + udf.dateToString(44444, "invalid"); + } + + @Test + public void shouldBeThreadSafe() { + IntStream.range(0, 10_000) + .parallel() + .forEach(idx -> { + shouldConvertDateToString(); + udf.dateToString(55555, "yyyy-MM-dd"); + }); + } + + @Test + public void shouldWorkWithManyDifferentFormatters() { + IntStream.range(0, 10_000) + .parallel() + .forEach(idx -> { + try { + final String pattern = "yyyy-MM-dd'X" + idx + "'"; + final String result = udf.dateToString(idx, pattern); + final String expectedResult = expectedResult(idx, pattern); + assertThat(result, is(expectedResult)); + } catch (final Exception e) { + Assert.fail(e.getMessage()); + } + }); + } + + private String expectedResult(final int daysSinceEpoch, final String formatPattern) { + SimpleDateFormat dateFormat = new SimpleDateFormat(formatPattern); + dateFormat.setCalendar(Calendar.getInstance(UTC)); + return dateFormat.format(new java.util.Date(daysSinceEpoch * MILLIS_PER_DAY)); + } + +} diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/udf/datetime/StringToDateTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/udf/datetime/StringToDateTest.java new file mode 100644 index 000000000000..40d6b2e0eb50 --- /dev/null +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/udf/datetime/StringToDateTest.java @@ -0,0 +1,130 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.ksql.function.udf.datetime; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +import com.google.common.util.concurrent.UncheckedExecutionException; +import io.confluent.ksql.function.KsqlFunctionException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.time.format.DateTimeParseException; +import java.util.Calendar; +import java.util.Date; +import java.util.Locale; +import java.util.TimeZone; +import java.util.stream.IntStream; +import org.apache.kafka.connect.errors.DataException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class StringToDateTest { + + private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000; + private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); + + private StringToDate udf; + + @Before + public void setUp(){ + udf = new StringToDate(); + } + + @Test + public void shouldConvertStringToDate() throws ParseException { + // When: + final int result = udf.stringToDate("2021-12-01", "yyyy-MM-dd"); + + // Then: + final int expectedResult = expectedResult("2021-12-01", "yyyy-MM-dd"); + assertThat(result, is(expectedResult)); + } + + @Test + public void shouldSupportEmbeddedChars() throws ParseException { + // When: + final Object result = udf.stringToDate("2021-12-01Fred", "yyyy-MM-dd'Fred'"); + + // Then: + final int expectedResult = expectedResult("2021-12-01Fred", "yyyy-MM-dd'Fred'"); + assertThat(result, is(expectedResult)); + } + + @Test(expected = UncheckedExecutionException.class) + public void shouldThrowIfFormatInvalid() { + udf.stringToDate("2021-12-01", "invalid"); + } + + @Test(expected = DateTimeParseException.class) + public void shouldThrowIfParseFails() { + udf.stringToDate("invalid", "yyyy-MM-dd"); + } + + @Test(expected = DateTimeParseException.class) + public void shouldThrowOnEmptyString() { + udf.stringToDate("", "yyyy-MM-dd"); + } + + @Test + public void shouldBeThreadSafe() { + IntStream.range(0, 10_000) + .parallel() + .forEach(idx -> { + try { + shouldConvertStringToDate(); + } catch (final Exception e) { + Assert.fail(e.getMessage()); + } + udf.stringToDate("1988-01-12", "yyyy-MM-dd"); + }); + } + + @Test + public void shouldWorkWithManyDifferentFormatters() { + IntStream.range(0, 10_000) + .parallel() + .forEach(idx -> { + try { + final String sourceDate = "2021-12-01X" + idx; + final String pattern = "yyyy-MM-dd'X" + idx + "'"; + final int result = udf.stringToDate(sourceDate, pattern); + final int expectedResult = expectedResult(sourceDate, pattern); + assertThat(result, is(expectedResult)); + } catch (final Exception e) { + Assert.fail(e.getMessage()); + } + }); + } + + + private int expectedResult(final String formattedDate, final String formatPattern) throws ParseException { + SimpleDateFormat dateFormat = new SimpleDateFormat(formatPattern); + dateFormat.setCalendar(Calendar.getInstance(UTC)); + Date parsedDate = dateFormat.parse(formattedDate); + Calendar calendar = Calendar.getInstance(UTC); + calendar.setTime(parsedDate); + if (calendar.get(Calendar.HOUR_OF_DAY) != 0 || calendar.get(Calendar.MINUTE) != 0 || + calendar.get(Calendar.SECOND) != 0 || calendar.get(Calendar.MILLISECOND) != 0) { + fail("Date should not have any time fields set to non-zero values."); + } + return (int)(calendar.getTimeInMillis() / MILLIS_PER_DAY); + } + +} \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/udf/datetime/StringToTimestampTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/udf/datetime/StringToTimestampTest.java index 0e5a740a0f42..7cef3428d6aa 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/function/udf/datetime/StringToTimestampTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/udf/datetime/StringToTimestampTest.java @@ -37,7 +37,7 @@ public void setUp(){ } @Test - public void shouldCovertStringToTimestamp() throws ParseException { + public void shouldConvertStringToTimestamp() throws ParseException { // When: final Object result = udf.evaluate("2021-12-01 12:10:11.123", "yyyy-MM-dd HH:mm:ss.SSS"); @@ -80,7 +80,7 @@ public void shouldThrowIfParseFails() { @Test(expected = KsqlFunctionException.class) public void shouldThrowOnEmptyString() { - udf.evaluate("invalid", "yyyy-MM-dd'T'HH:mm:ss.SSS"); + udf.evaluate("", "yyyy-MM-dd'T'HH:mm:ss.SSS"); } @Test @@ -89,7 +89,7 @@ public void shouldBeThreadSafe() { .parallel() .forEach(idx -> { try { - shouldCovertStringToTimestamp(); + shouldConvertStringToTimestamp(); } catch (final ParseException e) { Assert.fail(e.getMessage()); } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/udf/datetime/TimestampToStringTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/udf/datetime/TimestampToStringTest.java index cb12399cdf39..0a0c575823fa 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/function/udf/datetime/TimestampToStringTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/udf/datetime/TimestampToStringTest.java @@ -41,7 +41,7 @@ public void setUp(){ } @Test - public void shouldCovertTimestampToString() { + public void shouldConvertTimestampToString() { // When: final Object result = udf.evaluate(1638360611123L, "yyyy-MM-dd HH:mm:ss.SSS"); @@ -128,14 +128,14 @@ public void shouldThrowIfTooManyParameters() { public void shouldThrowIfFormatInvalid() { expectedException.expect(KsqlFunctionException.class); expectedException.expectMessage("Unknown pattern letter: i"); - udf.evaluate("2021-12-01 12:10:11.123", "invalid"); + udf.evaluate(1638360611123L, "invalid"); } @Test public void shouldThrowIfNotTimestamp() { expectedException.expect(KsqlFunctionException.class); expectedException.expectMessage("java.lang.String cannot be cast to java.lang.Long"); - udf.evaluate("invalid", "2021-12-01 12:10:11.123"); + udf.evaluate("invalid", "yyyy-MM-dd HH:mm:ss.SSS"); } @Test @@ -143,7 +143,7 @@ public void shouldBeThreadSafe() { IntStream.range(0, 10_000) .parallel() .forEach(idx -> { - shouldCovertTimestampToString(); + shouldConvertTimestampToString(); udf.evaluate(1538361611123L, "yyyy-MM-dd HH:mm:ss.SSS"); }); } diff --git a/ksql-engine/src/test/resources/query-validation-tests/datestring.json b/ksql-engine/src/test/resources/query-validation-tests/datestring.json new file mode 100644 index 000000000000..edf7433a8012 --- /dev/null +++ b/ksql-engine/src/test/resources/query-validation-tests/datestring.json @@ -0,0 +1,36 @@ +{ + "comments": [ + "You can specify multiple statements per test case, i.e., to set up the various streams needed", + "for joins etc, but currently only the final topology will be verified. This should be enough", + "for most tests as we can simulate the outputs from previous stages into the final stage. If we", + "take a modular approach to testing we can still verify that it all works correctly, i.e, if we", + "verify the output of a select or aggregate is correct, we can use simulated output to feed into", + "a join or another aggregate." + ], + "tests": [ + { + "name": "date to string", + "statements": [ + "CREATE STREAM TEST (ID bigint, START_DATE int, DATE_FORMAT varchar) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM DATE_STREAM AS select ID, datetostring(START_DATE, DATE_FORMAT) as CUSTOM_FORMATTED_START_DATE from test;" + ], + "inputs": [ + {"topic": "test_topic", "key": 1, "value": {"ID": 1, "START_DATE": 17662, "DATE_FORMAT": "yyyy-MM-dd"}, "timestamp": 0}, + {"topic": "test_topic", "key": 2, "value": {"ID": 2, "START_DATE": 18027, "DATE_FORMAT": "dd/MM/yyyy"}, "timestamp": 0}, + {"topic": "test_topic", "key": 3, "value": {"ID": 3, "START_DATE": 18993, "DATE_FORMAT": "dd-MMM-yyyy"}, "timestamp": 0}, + {"topic": "test_topic", "key": 4, "value": {"ID": 4, "START_DATE": 0, "DATE_FORMAT": "dd-MM-yyyy"}, "timestamp": 0}, + {"topic": "test_topic", "key": 5, "value": {"ID": 5, "START_DATE": -1, "DATE_FORMAT": "dd-MM-yyyy'Sophia'"}, "timestamp": 0} + + ], + "outputs": [ + {"topic": "DATE_STREAM", "key": 1, "value": {"ID": 1, "CUSTOM_FORMATTED_START_DATE": "2018-05-11"}, "timestamp": 0}, + {"topic": "DATE_STREAM", "key": 2, "value": {"ID": 2, "CUSTOM_FORMATTED_START_DATE": "11/05/2019"}, "timestamp": 0}, + {"topic": "DATE_STREAM", "key": 3, "value": {"ID": 3, "CUSTOM_FORMATTED_START_DATE": "01-Jan-2022"}, "timestamp": 0}, + {"topic": "DATE_STREAM", "key": 4, "value": {"ID": 4, "CUSTOM_FORMATTED_START_DATE": "01-01-1970"}, "timestamp": 0}, + {"topic": "DATE_STREAM", "key": 5, "value": {"ID": 5, "CUSTOM_FORMATTED_START_DATE": "31-12-1969Sophia"}, "timestamp": 0} + ] + } + ] +} + + diff --git a/ksql-engine/src/test/resources/query-validation-tests/stringdate.json b/ksql-engine/src/test/resources/query-validation-tests/stringdate.json new file mode 100644 index 000000000000..a6cd6eb56404 --- /dev/null +++ b/ksql-engine/src/test/resources/query-validation-tests/stringdate.json @@ -0,0 +1,31 @@ +{ + "comments": [ + "You can specify multiple statements per test case, i.e., to set up the various streams needed", + "for joins etc, but currently only the final topology will be verified. This should be enough", + "for most tests as we can simulate the outputs from previous stages into the final stage. If we", + "take a modular approach to testing we can still verify that it all works correctly, i.e, if we", + "verify the output of a select or aggregate is correct, we can use simulated output to feed into", + "a join or another aggregate." + ], + "tests": [ + { + "name": "string to date", + "statements": [ + "CREATE STREAM TEST (ID bigint, NAME varchar, date varchar, format varchar) WITH (kafka_topic='test_topic', value_format='DELIMITED', key='ID');", + "CREATE STREAM TS AS select id, stringtodate(date, format) as ts from test;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": "0,zero,2018-05-11,yyyy-MM-dd", "timestamp": 0}, + {"topic": "test_topic", "key": 1, "value": "1,zero,11/05/2019,dd/MM/yyyy", "timestamp": 0}, + {"topic": "test_topic", "key": 2, "value": "2,zero,01-Jan-2022,dd-MMM-yyyy", "timestamp": 0}, + {"topic": "test_topic", "key": 3, "value": "3,yyy,01-01-1970,dd-MM-yyyy", "timestamp": 0} + ], + "outputs": [ + {"topic": "TS", "key": 0, "value": "0,17662", "timestamp": 0}, + {"topic": "TS", "key": 1, "value": "1,18027", "timestamp": 0}, + {"topic": "TS", "key": 2, "value": "2,18993", "timestamp": 0}, + {"topic": "TS", "key": 3, "value": "3,0", "timestamp": 0} + ] + } + ] +} \ No newline at end of file