Skip to content

Commit

Permalink
Converted stringtotimestamp and timestamptostring to latest UDF format (
Browse files Browse the repository at this point in the history
#1851)

Added stringtodate and datetostring UDFs (#1851)
  • Loading branch information
Andy Bryant authored and andybryant committed Sep 27, 2018
1 parent d425cae commit 1c4291a
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 151 deletions.
19 changes: 13 additions & 6 deletions ksql-cli/src/test/java/io/confluent/ksql/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -666,16 +666,23 @@ public void shouldListFunctions() {
public void shouldDescribeScalarFunction() throws Exception {
final String expectedOutput =
"Name : TIMESTAMPTOSTRING\n" +
"Author : confluent\n" +
"Author : Confluent\n" +
"Overview : Converts a BIGINT millisecond timestamp value into the string " +
"representation of the \n" +
" timestamp in the given format.\n" +
"Type : scalar\n" +
"Jar : internal\n" +
"Variations : \n" +
"\n" +
"\tVariation : TIMESTAMPTOSTRING(BIGINT, VARCHAR)\n" +
"\tReturns : VARCHAR\n";
"Variations : \n";

localCli.handleLine("describe function timestamptostring;");
assertThat(terminal.getOutputString(), containsString(expectedOutput));
final String outputString = terminal.getOutputString();
assertThat(outputString, containsString(expectedOutput));

// variations for Udfs are loaded non-deterministically. Don't assume which variation is first
final String expectedVariation =
"\tVariation : TIMESTAMPTOSTRING(BIGINT, VARCHAR)\n" +
"\tReturns : VARCHAR\n";
assertThat(outputString, containsString(expectedVariation));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import io.confluent.ksql.function.udaf.topk.TopKAggregateFunctionFactory;
import io.confluent.ksql.function.udaf.topkdistinct.TopkDistinctAggFunctionFactory;
import io.confluent.ksql.function.udf.UdfMetadata;
import io.confluent.ksql.function.udf.datetime.StringToTimestamp;
import io.confluent.ksql.function.udf.datetime.TimestampToString;
import io.confluent.ksql.function.udf.geo.GeoDistanceKudf;
import io.confluent.ksql.function.udf.json.ArrayContainsKudf;
import io.confluent.ksql.function.udf.json.JsonExtractStringKudf;
Expand Down Expand Up @@ -75,7 +73,6 @@ private InternalFunctionRegistry(
private void init() {
addStringFunctions();
addMathFunctions();
addDateTimeFunctions();
addGeoFunctions();
addJsonFunctions();
addStructFieldFetcher();
Expand Down Expand Up @@ -249,25 +246,6 @@ private void addMathFunctions() {

}


private void addDateTimeFunctions() {

final KsqlFunction timestampToString = new KsqlFunction(
Schema.OPTIONAL_STRING_SCHEMA,
Arrays.asList(Schema.OPTIONAL_INT64_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA),
"TIMESTAMPTOSTRING",
TimestampToString.class);
addFunction(timestampToString);

final KsqlFunction stringToTimestamp = new KsqlFunction(
Schema.OPTIONAL_INT64_SCHEMA,
Arrays.asList(Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA),
"STRINGTOTIMESTAMP",
StringToTimestamp.class);
addFunction(stringToTimestamp);

}

private void addGeoFunctions() {
final KsqlFunction geoDistance = new KsqlFunction(
Schema.OPTIONAL_FLOAT64_SCHEMA,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.ksql.function.udf.datetime;

import com.google.common.cache.CacheBuilder;
Expand All @@ -9,18 +25,20 @@
import java.time.format.DateTimeFormatter;

@UdfDescription(name = "datetostring", author = "Confluent",
description = "Converts an integer representing days since epoch to a date string using the given format pattern."
+ " Note this is the format Kafka Connect uses to represent dates with no time component."
+ " The format pattern should be in the format expected by java.time.format.DateTimeFormatter")
description = "Converts an integer representing days since epoch to a date string"
+ " using the given format pattern. Note this is the format Kafka Connect uses"
+ " to represent dates with no time component. The format pattern should be"
+ " in the format expected by java.time.format.DateTimeFormatter")
public class DateToString {

private final LoadingCache<String, DateTimeFormatter> formatters =
CacheBuilder.newBuilder()
.maximumSize(1000)
.build(CacheLoader.from(DateTimeFormatter::ofPattern));
.maximumSize(1000)
.build(CacheLoader.from(DateTimeFormatter::ofPattern));

@Udf(description = "Converts an integer representing days since epoch to a string using the given format pattern."
+ " The format pattern should be in the format expected by java.time.format.DateTimeFormatter")
@Udf(description = "Converts an integer representing days since epoch to a string"
+ " using the given format pattern. The format pattern should be in the format"
+ " expected by java.time.format.DateTimeFormatter")
public String dateToString(final Integer daysSinceEpoch, final String formatPattern) {
final DateTimeFormatter formatter = formatters.getUnchecked(formatPattern);
return LocalDate.ofEpochDay(daysSinceEpoch).format(formatter);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.ksql.function.udf.datetime;

import com.google.common.cache.CacheBuilder;
Expand All @@ -12,7 +28,8 @@
description = "Converts a string representation of a date into an integer representing"
+ " days since epoch using the given format pattern."
+ " Note this is the format Kafka Connect uses to represent dates with no time component."
+ " The format pattern should be in the format expected by java.time.format.DateTimeFormatter")
+ " The format pattern should be in the format expected by"
+ " java.time.format.DateTimeFormatter")
public class StringToDate {

private final LoadingCache<String, DateTimeFormatter> formatters =
Expand All @@ -22,9 +39,10 @@ public class StringToDate {

@Udf(description = "Converts a string representation of a date into an integer representing"
+ " days since epoch using the given format pattern."
+ " The format pattern should be in the format expected by java.time.format.DateTimeFormatter")
+ " The format pattern should be in the format expected by"
+ " java.time.format.DateTimeFormatter")
public int stringToDate(final String formattedDate, final String formatPattern) {
DateTimeFormatter formatter = formatters.getUnchecked(formatPattern);
final DateTimeFormatter formatter = formatters.getUnchecked(formatPattern);
return ((int)LocalDate.parse(formattedDate, formatter).toEpochDay());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,30 @@

package io.confluent.ksql.function.udf.datetime;

import io.confluent.ksql.function.KsqlFunctionException;
import io.confluent.ksql.function.udf.Kudf;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.util.timestamp.StringToTimestampParser;

public class StringToTimestamp implements Kudf {
@UdfDescription(name = "stringtotimestamp", author = "Confluent",
description = "Converts a string value in the given format into the BIGINT value"
+ " that represents the millisecond timestamp.")
public class StringToTimestamp {

private StringToTimestampParser timestampParser;
private final LoadingCache<String, StringToTimestampParser> parsers =
CacheBuilder.newBuilder()
.maximumSize(1000)
.build(CacheLoader.from(StringToTimestampParser::new));

@Override
public Object evaluate(final Object... args) {
if (args.length != 2) {
throw new KsqlFunctionException("StringToTimestamp udf should have two input argument:"
+ " date value and format.");
}
try {
ensureInitialized(args);
return timestampParser.parse(args[0].toString());
} catch (final Exception e) {
throw new KsqlFunctionException("Exception running StringToTimestamp(" + args[0] + ", "
+ args[1] + ") : " + e.getMessage(), e);
}
}

private void ensureInitialized(final Object[] args) {
if (timestampParser == null) {
timestampParser = new StringToTimestampParser(args[1].toString());
}
@Udf(description = "Converts a string value in the given format into the BIGINT value"
+ " that represents the millisecond timestamp."
+ " Single quotes in the timestamp format can be escaped with '',"
+ " for example: 'yyyy-MM-dd''T''HH:mm:ssX'.")
public long stringToTimestamp(final String formattedTimestamp, final String formatPattern) {
final StringToTimestampParser timestampParser = parsers.getUnchecked(formatPattern);
return timestampParser.parse(formattedTimestamp);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,52 @@

package io.confluent.ksql.function.udf.datetime;

import io.confluent.ksql.function.KsqlFunctionException;
import io.confluent.ksql.function.udf.Kudf;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import java.sql.Timestamp;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;

public class TimestampToString implements Kudf {
@UdfDescription(name = "timestamptostring", author = "Confluent",
description = "Converts a BIGINT millisecond timestamp value into"
+ " the string representation of the timestamp in the given format.")
public class TimestampToString {

private DateTimeFormatter threadSafeFormatter;
private final LoadingCache<String, DateTimeFormatter> formatters =
CacheBuilder.newBuilder()
.maximumSize(1000)
.build(CacheLoader.from(DateTimeFormatter::ofPattern));

@Override
public Object evaluate(final Object... args) {
if (args.length < 2) {
throw new KsqlFunctionException("TimestampToString udf should have at least "
+ "two input arguments: date value and format.");
}

if (args.length > 3) {
throw new KsqlFunctionException("TimestampToString udf should have at most "
+ "three input arguments: date value, format and zone.");
}

try {
ensureInitialized(args);
final Timestamp timestamp = new Timestamp((Long) args[0]);
final ZoneId zoneId =
(args.length == 3) ? ZoneId.of(args[2].toString()) : ZoneId.systemDefault();
return timestamp.toInstant()
.atZone(zoneId)
.format(threadSafeFormatter);
} catch (final Exception e) {
throw new KsqlFunctionException("Exception running TimestampToString(" + args[0] + " , "
+ args[1] + ((args.length == 3) ? (" , " + args[2]) : "") + ") : " + e.getMessage(), e);
}
@Udf(description = "Converts a BIGINT millisecond timestamp value into the"
+ " string representation of the timestamp in the given format. Single quotes in the"
+ " timestamp format can be escaped with '', for example: 'yyyy-MM-dd''T''HH:mm:ssX'"
+ " The format pattern should be in the format expected"
+ " by java.time.format.DateTimeFormatter")
public String timestampToString(final long millisSinceEpoch, final String formatPattern) {
final Timestamp timestamp = new Timestamp(millisSinceEpoch);
final DateTimeFormatter formatter = formatters.getUnchecked(formatPattern);
return timestamp.toInstant()
.atZone(ZoneId.systemDefault())
.format(formatter);
}

private void ensureInitialized(final Object[] args) {
if (threadSafeFormatter == null) {
threadSafeFormatter = DateTimeFormatter.ofPattern(args[1].toString());
}
@Udf(description = "Converts a BIGINT millisecond timestamp value into the"
+ " string representation of the timestamp in the given format. Single quotes in the"
+ " timestamp format can be escaped with '', for example: 'yyyy-MM-dd''T''HH:mm:ssX'"
+ " The format pattern should be in the format expected by java.time.format.DateTimeFormatter"
+ " TIMEZONE is a java.util.TimeZone ID format, for example: \"UTC\","
+ " \"America/Los_Angeles\", \"PDT\", \"Europe/London\"")
public String timestampToString(final long millisSinceEpoch, final String formatPattern,
final String timeZone) {
final Timestamp timestamp = new Timestamp(millisSinceEpoch);
final DateTimeFormatter formatter = formatters.getUnchecked(formatPattern);
final ZoneId zoneId = ZoneId.of(timeZone);
return timestamp.toInstant()
.atZone(zoneId)
.format(formatter);
}

}
Loading

0 comments on commit 1c4291a

Please sign in to comment.