diff --git a/docs/syntax-reference.rst b/docs/syntax-reference.rst index 4343814ddf35..5289cc2a9864 100644 --- a/docs/syntax-reference.rst +++ b/docs/syntax-reference.rst @@ -1058,8 +1058,21 @@ Scalar functions | | | quotes in the timestamp format can be escaped with| | | | '', for example: 'yyyy-MM-dd''T''HH:mm:ssX'. | +------------------------+------------------------------------------------------------+---------------------------------------------------+ -| SUBSTRING | ``SUBSTRING(col1, 2, 5)`` | Return the substring with the start and end | -| | | indices. | +| SUBSTRING | ``SUBSTRING(col1, 2, 5)`` | ``SUBSTRING(str, pos, [len]``. | +| | | Return a substring of ``str`` that starts at | +| | | ``pos`` and had length ``len``, or continues to | +| | | the end of the string. | +| | | | +| | | NOTE: prior to v5.1 of KSQL the syntax was: | +| | | ``SUBSTRING(str, start, [end]`` | +| | | Where ``start`` and ``end`` where base-zero | +| | | indexes to start (inclusive) and end (exclusive) | +| | | the substring. | +| | | | +| | | It is possible to switch back to this legacy mode | +| | | by setting | +| | | ``ksql.functions.substring.legacy.args`` to | +| | | ``true`` | +------------------------+------------------------------------------------------------+---------------------------------------------------+ | TIMESTAMPTOSTRING | ``TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss.SSS')`` | Converts a BIGINT millisecond timestamp value into| | | | the string representation of the timestamp in | diff --git a/docs/udf.rst b/docs/udf.rst index 29a0d78e205e..169d007685e3 100644 --- a/docs/udf.rst +++ b/docs/udf.rst @@ -44,8 +44,9 @@ Conversely, using boxed types indicates the function can accept null values for It is up to the implementor of the UDF to chose which is the most appropriate. A common pattern is to return ``null`` if the input is ``null``, though generally this is only for parameters that are expected to be supplied from the source row being processed. For example, -a ``substring(String value, int beginIndex)`` UDF might return null if ``value`` is null, but a -null ``beginIndex`` parameter would be treated as an error, and hence should be a primitive. +a ``substring(String str, int pos)`` UDF might return null if ``str`` is null, but a +null ``pos`` parameter would be treated as an error, and hence should be a primitive. +(In actual fact, the in-built substring is more lenient and would return null if pos was null). The return type of a UDF can also be a primitive or boxed type. A primitive return type indicates the function will never return ``null``, where as a boxed type indicates it may return ``null``. @@ -129,15 +130,12 @@ of the UDF does, for example: .. code:: java - @Udf(description = "Returns a string that is a substring of this string. The" - + " substring begins with the character at the specified startIndex and" - + " extends to the end of this string.") - public String substring(final String value, final int startIndex) + @Udf(description = "Returns a substring of str that starts at pos" + + " and continues to the end of the string") + public String substring(final String str, final int pos) - @Udf(description = "Returns a string that is a substring of this string. The" - + " substring begins with the character at the specified startIndex and" - + " extends to the character at endIndex -1.") - public String substring(final String value, final int startIndex, final int endIndex) + @Udf(description = "Returns a substring of str that starts at pos and is of length len") + public String substring(final String str, final int pos, final int len) UdfParameter Annotation ~~~~~~~~~~~~~~~~~~~~~~~ @@ -154,8 +152,8 @@ can be used to better describe what the parameter does, for example: @Udf public String substring( - @UdfParameter("Value") final String value, - @UdfParameter(value = "Value", description = "Zero based start index") final int startIndex) + @UdfParameter("str") final String str, + @UdfParameter(value = "pos", description = "Starting position of the substring") final int pos) Configurable UDF ~~~~~~~~~~~~~~~~ diff --git a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java index c3611500059c..bb394a6c2c09 100644 --- a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java +++ b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.StringTokenizer; import org.apache.commons.lang3.StringUtils; import org.jline.reader.EndOfFileException; import org.jline.terminal.Terminal; @@ -700,19 +701,21 @@ private void printQueryDescriptionList(final QueryDescriptionList queryDescripti private void printFunctionDescription(final FunctionDescriptionList describeFunction) { final String functionName = describeFunction.getName().toUpperCase(); - writer().printf("%-12s: %s%n", "Name", functionName); + final String baseFormat = "%-12s: %s%n"; + final String subFormat = "\t%-12s: %s%n"; + writer().printf(baseFormat, "Name", functionName); if (!describeFunction.getAuthor().trim().isEmpty()) { - writer().printf("%-12s: %s%n", "Author", describeFunction.getAuthor()); + writer().printf(baseFormat, "Author", describeFunction.getAuthor()); } if (!describeFunction.getVersion().trim().isEmpty()) { - writer().printf("%-12s: %s%n", "Version", describeFunction.getVersion()); + writer().printf(baseFormat, "Version", describeFunction.getVersion()); } - if (!describeFunction.getDescription().trim().isEmpty()) { - writer().printf("%-12s: %s%n", "Overview", describeFunction.getDescription()); - } - writer().printf("%-12s: %s%n", "Type", describeFunction.getType().name()); - writer().printf("%-12s: %s%n", "Jar", describeFunction.getPath()); - writer().printf("%-12s: %n", "Variations"); + + printDescription(baseFormat, "Overview", describeFunction.getDescription()); + + writer().printf(baseFormat, "Type", describeFunction.getType().name()); + writer().printf(baseFormat, "Jar", describeFunction.getPath()); + writer().printf(baseFormat, "Variations", ""); final Collection functions = describeFunction.getFunctions(); functions.forEach(functionInfo -> { final String arguments = functionInfo.getArguments().stream() @@ -722,18 +725,60 @@ private void printFunctionDescription(final FunctionDescriptionList describeFunc .collect(Collectors.joining(", ")); writer().printf("%n\t%-12s: %s(%s)%n", "Variation", functionName, arguments); - writer().printf("\t%-12s: %s%n", "Returns", functionInfo.getReturnType()); - if (!functionInfo.getDescription().trim().isEmpty()) { - writer().printf("\t%-12s: %s%n", "Description", functionInfo.getDescription()); - } - - functionInfo.getArguments().stream() - .filter(a -> !a.getDescription().trim().isEmpty()) - .forEach(a -> writer().printf("\t%-12s: %s%n", a.getName(), a.getDescription())); + + writer().printf(subFormat, "Returns", functionInfo.getReturnType()); + printDescription(subFormat, "Description", functionInfo.getDescription()); + functionInfo.getArguments() + .forEach(a -> printDescription(subFormat, a.getName(), a.getDescription())); } ); } + private void printDescription(final String format, final String name, final String description) { + final String trimmed = description.trim(); + if (trimmed.isEmpty()) { + return; + } + + final int labelLen = String.format(format.replace("%n", ""), name, "") + .replace("\t", " ") + .length(); + + final int width = Math.max(getWidth(), 80) - labelLen; + + final String fixedWidth = splitLongLine(trimmed, width); + + final String indent = String.format("%-" + labelLen + "s", ""); + + final String result = fixedWidth + .replace(System.lineSeparator(), System.lineSeparator() + indent); + + writer().printf(format, name, result); + } + + private static String splitLongLine(final String input, final int maxLineLength) { + final StringTokenizer spaceTok = new StringTokenizer(input, " \n", true); + final StringBuilder output = new StringBuilder(input.length()); + int lineLen = 0; + while (spaceTok.hasMoreTokens()) { + final String word = spaceTok.nextToken(); + final boolean isNewLineChar = word.equals("\n"); + + if (isNewLineChar || lineLen + word.length() > maxLineLength) { + output.append(System.lineSeparator()); + lineLen = 0; + + if (isNewLineChar) { + continue; + } + } + + output.append(word); + lineLen += word.length(); + } + return output.toString(); + } + private void printAsJson(final Object o) throws IOException { if (!((o instanceof PropertiesList || (o instanceof KsqlEntityList)))) { log.warn( diff --git a/ksql-cli/src/test/java/io/confluent/ksql/CliTest.java b/ksql-cli/src/test/java/io/confluent/ksql/CliTest.java index 708ca23c952c..228253a9da06 100644 --- a/ksql-cli/src/test/java/io/confluent/ksql/CliTest.java +++ b/ksql-cli/src/test/java/io/confluent/ksql/CliTest.java @@ -63,6 +63,7 @@ import io.confluent.ksql.version.metrics.VersionCheckerAgent; import static io.confluent.ksql.TestResult.build; +import static io.confluent.ksql.testutils.AssertEventually.assertThatEventually; import static io.confluent.ksql.util.KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG; import static io.confluent.ksql.util.KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT; import static io.confluent.ksql.util.KsqlConfig.KSQL_SERVICE_ID_CONFIG; @@ -265,14 +266,15 @@ private static void selectWithLimit(String selectQuery, int limit, TestResult.Or @Test public void testPrint() throws InterruptedException { - - Thread wait = new Thread(() -> run("print 'ORDER_TOPIC' FROM BEGINNING INTERVAL 2;", false)); - wait.start(); - Thread.sleep(1000); - wait.interrupt(); - - String terminalOutput = terminal.getOutputString(); - assertThat(terminalOutput, containsString("Format:JSON")); + final Thread thread = + new Thread(() -> run("print 'ORDER_TOPIC' FROM BEGINNING INTERVAL 2;", false)); + thread.start(); + + try { + assertThatEventually(() -> terminal.getOutputString(), containsString("Format:JSON")); + } finally { + thread.interrupt(); + } } @Test @@ -564,27 +566,34 @@ public void shouldDescribeScalarFunction() throws Exception { @Test public void shouldDescribeOverloadedScalarFunction() throws Exception { - final String expectedSummary = + // Given: + localCli.handleLine("describe function substring;"); + + // Then: + final String output = terminal.getOutputString(); + + // Summary output: + assertThat(output, containsString( "Name : SUBSTRING\n" + "Author : Confluent\n" - + "Overview : returns a substring of the passed in value\n" - + "Type : scalar\n" + + "Overview : Returns a substring of the passed in value.\n" + )); + assertThat(output, containsString( + "Type : scalar\n" + "Jar : internal\n" - + "Variations : \n"; + + "Variations :" + )); - final String expectedVariant = - "\tVariation : SUBSTRING(value VARCHAR, startIndex INT, endIndex INT)\n" + // Variant output: + assertThat(output, containsString( + "\tVariation : SUBSTRING(str VARCHAR, pos INT)\n" + "\tReturns : VARCHAR\n" - + "\tDescription : Returns a string that is a substring of this string. " - + "The substring begins with the character at the specified startIndex and extends to the character at endIndex -1.\n" - + "\tstartIndex : The zero-based start index, inclusive.\n" - + "\tendIndex : The zero-based end index, exclusive."; - - localCli.handleLine("describe function substring;"); - - final String output = terminal.getOutputString(); - assertThat(output, containsString(expectedSummary)); - assertThat(output, containsString(expectedVariant)); + + "\tDescription : Returns a substring of str that starts at pos and continues to the end" + )); + assertThat(output, containsString( + "\tstr : The source string. If null, then function returns null.\n" + + "\tpos : The base-one position the substring starts from." + )); } @Test diff --git a/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java b/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java index bc0871ff35b5..dea375f95da3 100644 --- a/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java +++ b/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java @@ -1,4 +1,4 @@ -/** +/* * Copyright 2017 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -18,11 +18,14 @@ import com.google.common.collect.ImmutableList; +import io.confluent.ksql.rest.entity.ArgumentInfo; import io.confluent.ksql.rest.entity.EntityQueryId; +import io.confluent.ksql.rest.entity.FunctionDescriptionList; +import io.confluent.ksql.rest.entity.FunctionInfo; +import io.confluent.ksql.rest.entity.FunctionType; import io.confluent.ksql.rest.entity.RunningQuery; import io.confluent.ksql.rest.entity.FieldInfo; import io.confluent.ksql.rest.util.EntityUtil; -import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.SchemaBuilder; import org.junit.After; import org.junit.Test; @@ -68,8 +71,8 @@ @RunWith(Parameterized.class) public class ConsoleTest { - private TestTerminal terminal; - private KsqlRestClient client; + private final TestTerminal terminal; + private final KsqlRestClient client; @Parameterized.Parameters(name = "{0}") public static Collection data() { @@ -163,6 +166,77 @@ public void shouldPrintTopicDescribeExtended() throws IOException { } } + @Test + public void shouldPrintFunctionDescription() throws IOException { + final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of( + new FunctionDescriptionList( + "DESCRIBE FUNCTION foo;", + "FOO", + "Description that is very, very, very, very, very, very, very, very, very, " + + "very, very, very, very, very, very, very, very, very, very, very, very long\n" + + "and containing new lines\n" + + "\tAND TABS\n" + + "too!", + "Andy", + "v1.1.0", + "some.jar", + ImmutableList.of(new FunctionInfo( + ImmutableList.of( + new ArgumentInfo( + "arg1", + "INT", + "Another really, really, really, really, really, really, really," + + "really, really, really, really, really, really, really, really " + + " really, really, really, really, really, really, really, long\n" + + "description\n" + + "\tContaining Tabs\n" + + "and stuff" + ) + ), + "LONG", + "The function description, which too can be really, really, really, " + + "really, really, really, really, really, really, really, really, really, " + + "really, really, really, really, really, really, really, really, long\n" + + "and contains\n\ttabs and stuff" + )), FunctionType.scalar))); + + terminal.printKsqlEntityList(entityList); + + final String output = terminal.getOutputString(); + if (terminal.getOutputFormat() == OutputFormat.JSON) { + assertThat(output, containsString("\"name\" : \"FOO\"")); + } else { + final String expected = "" + + "Name : FOO\n" + + "Author : Andy\n" + + "Version : v1.1.0\n" + + "Overview : Description that is very, very, very, very, very, very, very, very, very, very, very, \n" + + " very, very, very, very, very, very, very, very, very, very long\n" + + " and containing new lines\n" + + " \tAND TABS\n" + + " too!\n" + + "Type : scalar\n" + + "Jar : some.jar\n" + + "Variations : \n" + + "\n" + + "\tVariation : FOO(arg1 INT)\n" + + "\tReturns : LONG\n" + + "\tDescription : The function description, which too can be really, really, really, really, really, \n" + + " really, really, really, really, really, really, really, really, really, really, \n" + + " really, really, really, really, really, long\n" + + " and contains\n" + + " \ttabs and stuff\n" + + "\targ1 : Another really, really, really, really, really, really, really,really, really, \n" + + " really, really, really, really, really, really really, really, really, really, \n" + + " really, really, really, long\n" + + " description\n" + + " \tContaining Tabs\n" + + " and stuff"; + + assertThat(output, containsString(expected)); + } + } + private List buildTestSchema(int size) { SchemaBuilder dataSourceBuilder = SchemaBuilder.struct().name("TestSchema"); for (int i = 0; i < size; i++) { diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index 25c53c2c5f57..22ebbdf43542 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -44,7 +44,7 @@ public class KsqlConfig extends AbstractConfig implements Cloneable { public static final String KSQ_FUNCTIONS_PROPERTY_PREFIX = KSQL_CONFIG_PROPERTY_PREFIX + "functions."; - public static final String KSQ_FUNCTIONS_GLOBAL_PROPERTY_PREFIX = + static final String KSQ_FUNCTIONS_GLOBAL_PROPERTY_PREFIX = KSQ_FUNCTIONS_PROPERTY_PREFIX + "_global_."; public static final String SINK_NUMBER_OF_PARTITIONS_PROPERTY = "ksql.sink.partitions"; @@ -93,6 +93,19 @@ public class KsqlConfig extends AbstractConfig implements Cloneable { + "'CREATE STREAM S AS ...' will create a topic 'thing-S', where as the statement " + "'CREATE STREAM S WITH(KAFKA_TOPIC = 'foo') AS ...' will create a topic 'foo'."; + public static final String KSQL_FUNCTIONS_SUBSTRING_LEGACY_ARGS_CONFIG = + KSQ_FUNCTIONS_PROPERTY_PREFIX + "substring.legacy.args"; + private static final String + KSQL_FUNCTIONS_SUBSTRING_LEGACY_ARGS_DOCS = "Switch the SUBSTRING function into legacy mode," + + " i.e. back to how it was in version 5.0 and earlier of KSQL." + + " Up to version 5.0.x substring took different args:" + + " VARCHAR SUBSTRING(str VARCHAR, startIndex INT, endIndex INT), where startIndex and" + + " endIndex were both base-zero indexed, e.g. a startIndex of '0' selected the start of the" + + " string, and the last argument is a character index, rather than the length of the" + + " substring to extract. Later versions of KSQL use:" + + " VARCHAR SUBSTRING(str VARCHAR, pos INT, length INT), where pos is base-one indexed," + + " and the last argument is the length of the substring to extract."; + public static final String defaultSchemaRegistryUrl = "http://localhost:8081"; @@ -103,9 +116,9 @@ public class KsqlConfig extends AbstractConfig implements Cloneable { public static final String DEFAULT_EXT_DIR = "ext"; - private static final Collection COMPATIBILTY_BREAKING_CONFIG_DEFS + private static final Collection COMPATIBLY_BREAKING_CONFIG_DEBS = ImmutableList.of( - new CompatibiltyBreakingConfigDef( + new CompatibilityBreakingConfigDef( KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG, ConfigDef.Type.STRING, KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT, @@ -113,7 +126,7 @@ public class KsqlConfig extends AbstractConfig implements Cloneable { ConfigDef.Importance.MEDIUM, "Second part of the prefix for persistent queries. For instance if " + "the prefix is query_ the query name will be ksql_query_1."), - new CompatibiltyBreakingConfigDef( + new CompatibilityBreakingConfigDef( KSQL_TABLE_STATESTORE_NAME_SUFFIX_CONFIG, ConfigDef.Type.STRING, KSQL_TABLE_STATESTORE_NAME_SUFFIX_DEFAULT, @@ -121,9 +134,17 @@ public class KsqlConfig extends AbstractConfig implements Cloneable { ConfigDef.Importance.MEDIUM, "Suffix for state store names in Tables. For instance if the suffix is " + "_ksql_statestore the state " - + "store name would be ksql_query_1_ksql_statestore _ksql_statestore ")); + + "store name would be ksql_query_1_ksql_statestore _ksql_statestore "), + new CompatibilityBreakingConfigDef( + KSQL_FUNCTIONS_SUBSTRING_LEGACY_ARGS_CONFIG, + ConfigDef.Type.BOOLEAN, + true, + false, + ConfigDef.Importance.LOW, + KSQL_FUNCTIONS_SUBSTRING_LEGACY_ARGS_DOCS) + ); - private static class CompatibiltyBreakingConfigDef { + private static class CompatibilityBreakingConfigDef { private final String name; private final ConfigDef.Type type; private final Object defaultValueOld; @@ -131,12 +152,12 @@ private static class CompatibiltyBreakingConfigDef { private final ConfigDef.Importance importance; private final String documentation; - CompatibiltyBreakingConfigDef(final String name, - final ConfigDef.Type type, - final Object defaultValueOld, - final Object defaultValueCurrent, - final ConfigDef.Importance importance, - final String documentation) { + CompatibilityBreakingConfigDef(final String name, + final ConfigDef.Type type, + final Object defaultValueOld, + final Object defaultValueCurrent, + final ConfigDef.Importance importance, + final String documentation) { this.name = name; this.type = type; this.defaultValueOld = defaultValueOld; @@ -243,8 +264,8 @@ private static ConfigDef configDef(final boolean current) { + " calling System.exit or executing processes" ) .withClientSslSupport(); - for (final CompatibiltyBreakingConfigDef compatiblityConfigDef - : COMPATIBILTY_BREAKING_CONFIG_DEFS) { + for (final CompatibilityBreakingConfigDef compatiblityConfigDef + : COMPATIBLY_BREAKING_CONFIG_DEBS) { if (current) { compatiblityConfigDef.defineCurrent(configDef); } else { @@ -469,8 +490,8 @@ public KsqlConfig cloneWithPropertyOverwrite(final Map props) { public KsqlConfig overrideBreakingConfigsWithOriginalValues(final Map props) { final KsqlConfig originalConfig = new KsqlConfig(false, props); final Map mergedProperties = new HashMap<>(values()); - COMPATIBILTY_BREAKING_CONFIG_DEFS.stream() - .map(CompatibiltyBreakingConfigDef::getName) + COMPATIBLY_BREAKING_CONFIG_DEBS.stream() + .map(CompatibilityBreakingConfigDef::getName) .forEach( k -> mergedProperties.put(k, originalConfig.get(k))); return new KsqlConfig(true, mergedProperties, ksqlStreamConfigProps); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/string/Substring.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/string/Substring.java index 2d3350e04622..eb82f99de2f1 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/string/Substring.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/string/Substring.java @@ -1,4 +1,4 @@ -/** +/* * Copyright 2017 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -16,35 +16,132 @@ package io.confluent.ksql.function.udf.string; +import io.confluent.common.Configurable; +import io.confluent.common.config.ConfigException; import io.confluent.ksql.function.udf.Udf; import io.confluent.ksql.function.udf.UdfDescription; import io.confluent.ksql.function.udf.UdfParameter; +import io.confluent.ksql.util.KsqlConfig; +import java.util.Map; +import java.util.Objects; + @SuppressWarnings("unused") // Invoked via reflection. @UdfDescription(name = "substring", author = "Confluent", - description = "returns a substring of the passed in value") -public class Substring { + description = "Returns a substring of the passed in value.\n" + + "The behaviour of this function changed in release 5.1. " + + "It is possible to switch the function back to pre-v5.1 functionality via the setting:\n" + + "\t" + KsqlConfig.KSQL_FUNCTIONS_SUBSTRING_LEGACY_ARGS_CONFIG + "\n" + + "This can be set globally, through the server configuration file, " + + "or per sessions or query via the set command.") +public class Substring implements Configurable { + + private Impl impl = new CurrentImpl(); + + @Override + public void configure(final Map props) { + final boolean legacyArgs = + getProps(props, KsqlConfig.KSQL_FUNCTIONS_SUBSTRING_LEGACY_ARGS_CONFIG, false); - @Udf(description = "Returns a string that is a substring of this string. The" - + " substring begins with the character at the specified startIndex and" - + " extends to the end of this string, inclusive.") + impl = legacyArgs ? new LegacyImpl() : new CurrentImpl(); + } + + @Udf(description = "Returns a substring of str that starts at pos" + + " and continues to the end of the string") public String substring( - @UdfParameter("value") final String value, - @UdfParameter(value = "startIndex", - description = "The zero-based starting index") final int startIndex) { - return value.substring(startIndex); + @UdfParameter(value = "str", + description = "The source string. If null, then function returns null.") final String str, + @UdfParameter(value = "pos", + description = "The base-one position the substring starts from." + + " If null, then function returns null." + + " (If in legacy mode, this argument is base-zero)") final Integer pos) { + return impl.substring(str, pos); } - @Udf(description = "Returns a string that is a substring of this string. The" - + " substring begins with the character at the specified startIndex and" - + " extends to the character at endIndex -1.") + @Udf(description = "Returns a substring of str that starts at pos and is of length len") public String substring( - @UdfParameter("value") final String value, - @UdfParameter(value = "startIndex", - description = "The zero-based start index, inclusive.") final int startIndex, - @UdfParameter(value = "endIndex", - description = "The zero-based end index, exclusive.") final int endIndex) { - return value.substring(startIndex, endIndex); + @UdfParameter(value = "str", + description = "The source string. If null, then function returns null.") final String str, + @UdfParameter(value = "pos", + description = "The base-one position the substring starts from." + + " If null, then function returns null." + + " (If in legacy mode, this argument is base-zero)") final Integer pos, + @UdfParameter(value = "len", + description = "The length of the substring to extract." + + " If null, then function returns null." + + " (If in legacy mode, this argument is the endIndex (exclusive)," + + " rather than the length") final Integer length) { + return impl.substring(str, pos, length); + } + + private interface Impl { + String substring(String value, Integer pos); + + String substring(String value, Integer pos, Integer length); + } + + private static final class LegacyImpl implements Impl { + public String substring(final String value, final Integer startIndex) { + Objects.requireNonNull(startIndex, "startIndex"); + return value.substring(startIndex); + } + + public String substring(final String value, final Integer startIndex, final Integer endIndex) { + Objects.requireNonNull(startIndex, "startIndex"); + Objects.requireNonNull(endIndex, "endIndex"); + return value.substring(startIndex, endIndex); + } + } + + private static final class CurrentImpl implements Impl { + public String substring(final String str, final Integer pos) { + if (str == null || pos == null) { + return null; + } + final int start = getStartIndex(str, pos); + return str.substring(start); + } + + public String substring(final String str, final Integer pos, final Integer length) { + if (str == null || pos == null || length == null) { + return null; + } + final int start = getStartIndex(str, pos); + final int end = getEndIndex(str, start, length); + return str.substring(start, end); + } + + private static int getStartIndex(final String value, final Integer pos) { + return pos < 0 + ? Math.max(value.length() + pos, 0) + : Math.max(Math.min(pos - 1, value.length()), 0); + } + + private static int getEndIndex(final String value, final int start, final int length) { + return Math.max(Math.min(start + length, value.length()), start); + } + } + + @SuppressWarnings("SameParameterValue") + private static boolean getProps( + final Map props, + final String name, + final boolean defaultValue) { + + final Object value = props.get(name); + if (value == null) { + return defaultValue; + } + + if (value instanceof String) { + return Boolean.valueOf((String)value); + } + + if (!(value instanceof Boolean)) { + throw new ConfigException(name, value, "Value is not boolean"); + } + + return (Boolean)value; } -} +} \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/codegen/CodeGenRunnerTest.java b/ksql-engine/src/test/java/io/confluent/ksql/codegen/CodeGenRunnerTest.java index 632a2c7f47f8..2a2f3fa35e2e 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/codegen/CodeGenRunnerTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/codegen/CodeGenRunnerTest.java @@ -423,7 +423,7 @@ public void shouldHandleRandomUdf() { public void shouldHandleStringUdfs() { // Given: final String query = - "SELECT LCASE(col1), UCASE(col1), TRIM(col1), CONCAT(col1,'_test'), SUBSTRING(col1, 1, 3)" + "SELECT LCASE(col1), UCASE(col1), TRIM(col1), CONCAT(col1,'_test'), SUBSTRING(col1, 2, 4)" + " FROM codegen_test;"; final Map inputValues = ImmutableMap.of(1, " Hello "); @@ -432,7 +432,7 @@ public void shouldHandleStringUdfs() { final List columns = executeExpression(query, inputValues); // Then: - assertThat(columns, contains(" hello ", " HELLO ", "Hello", " Hello _test", "He")); + assertThat(columns, contains(" hello ", " HELLO ", "Hello", " Hello _test", "Hell")); } @Test diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/UdfLoaderTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/UdfLoaderTest.java index a21d7b4c08ef..9ecabcb58f5b 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/function/UdfLoaderTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/UdfLoaderTest.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.function.udf.UdfParameter; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.connect.data.Schema; @@ -81,11 +82,11 @@ public void shouldLoadFunctionsInKsqlEngine() { final Kudf substring1 = function.getFunction( Arrays.asList(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA)).newInstance(ksqlConfig); - assertThat(substring1.evaluate("foo", 1), equalTo("oo")); + assertThat(substring1.evaluate("foo", 2), equalTo("oo")); final Kudf substring2 = function.getFunction( Arrays.asList(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA, Schema.INT32_SCHEMA)).newInstance(ksqlConfig); - assertThat(substring2.evaluate("foo", 1,2), equalTo("o")); + assertThat(substring2.evaluate("foo", 2, 1), equalTo("o")); } @SuppressWarnings("unchecked") @@ -141,15 +142,15 @@ public void shouldCreateUdfFactoryWithInternalPathWhenInternal() { @Test public void shouldSupportUdfParameterAnnotation() { - final UdfFactory substring = metaStore.getUdfFactory("substring"); + final UdfFactory substring = metaStore.getUdfFactory("somefunction"); final KsqlFunction function = substring.getFunction( - ImmutableList.of(Schema.OPTIONAL_STRING_SCHEMA, Schema.INT32_SCHEMA, Schema.INT32_SCHEMA)); + ImmutableList.of(Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA)); final List arguments = function.getArguments(); - assertThat(arguments.get(0).name(), is("value")); + assertThat(arguments.get(0).name(), is("justValue")); assertThat(arguments.get(0).doc(), is("")); - assertThat(arguments.get(1).name(), is("startIndex")); - assertThat(arguments.get(1).doc(), is("The zero-based start index, inclusive.")); + assertThat(arguments.get(1).name(), is("valueAndDescription")); + assertThat(arguments.get(1).doc(), is("Some description")); } @Test @@ -313,4 +314,18 @@ public int foo(final int bar) { return bar; } } + + @SuppressWarnings("unused") // Invoked via reflection in test. + @UdfDescription( + name = "SomeFunction", + description = "A test-only UDF for testing configure() is called") + public static class SomeFunctionUdf { + @Udf + public int foo( + @UdfParameter("justValue") final String v1, + @UdfParameter(value = "valueAndDescription", + description = "Some description") final String v2) { + return 0; + } + } } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/udf/string/SubstringTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/udf/string/SubstringTest.java new file mode 100644 index 000000000000..1b1fbf70aeb1 --- /dev/null +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/udf/string/SubstringTest.java @@ -0,0 +1,249 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.ksql.function.udf.string; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableMap; +import io.confluent.common.config.ConfigException; +import io.confluent.ksql.util.KsqlConfig; +import org.junit.Before; +import org.junit.Test; + +public class SubstringTest { + + private Substring udf; + + @Before + public void setUp() { + udf = new Substring(); + } + + @Test + public void shouldReturnNullOnNullValue() { + assertThat(udf.substring(null, 1), is(nullValue())); + assertThat(udf.substring(null, 1, 1), is(nullValue())); + assertThat(udf.substring("some string", null, 1), is(nullValue())); + assertThat(udf.substring("some string", 1, null), is(nullValue())); + } + + @Test + public void shouldUseOneBasedIndexing() { + assertThat(udf.substring("a test string", 1, 1), is("a")); + assertThat(udf.substring("a test string", -1, 1), is("g")); + } + + @Test + public void shouldExtractFromStartForPositivePositions() { + assertThat(udf.substring("a test string", 3), is("test string")); + assertThat(udf.substring("a test string", 3, 4), is("test")); + } + + @Test + public void shouldExtractFromEndForNegativePositions() { + assertThat(udf.substring("a test string", -6), is("string")); + assertThat(udf.substring("a test string", -6, 2), is("st")); + } + + @Test + public void shouldTruncateOutOfBoundIndexes() { + assertThat(udf.substring("a test string", 0), is("a test string")); + assertThat(udf.substring("a test string", 100), is("")); + assertThat(udf.substring("a test string", -100), is("a test string")); + assertThat(udf.substring("a test string", 3, 100), is("test string")); + assertThat(udf.substring("a test string", 3, -100), is("")); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNPEOnNullValueWithTwoArgs() { + // Given: + givenInLegacyMode(); + + // Then: + udf.substring(null, 0); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNPEOnNullValueWithThreeArgs() { + // Given: + givenInLegacyMode(); + + // Then: + udf.substring(null, 0, 0); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNPEOnNullStartIndexWithTwoArgs() { + // Given: + givenInLegacyMode(); + + // Then: + udf.substring("some-string", null); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNPEOnNullStartIndexWithThreeArgs() { + // Given: + givenInLegacyMode(); + + // Then: + udf.substring("some-string", null, 0); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNPEOnNullEndIndex() { + // Given: + givenInLegacyMode(); + + // Then: + udf.substring("some-string", 1, null); + } + + @Test + public void shouldUseZeroBasedIndexingIfInLegacyMode() { + // Given: + givenInLegacyMode(); + + // Then: + assertThat(udf.substring("a test string", 0, 1), is("a")); + } + + @Test(expected = StringIndexOutOfBoundsException.class) + public void shouldThrowInLegacyModeIfStartIndexIsNegative() { + // Given: + givenInLegacyMode(); + + // Then: + udf.substring("a test string", -1, 1); + } + + @Test + public void shouldExtractFromStartInLegacyMode() { + // Given: + givenInLegacyMode(); + + // Then: + assertThat(udf.substring("a test string", 2), is("test string")); + assertThat(udf.substring("a test string", 2, 6), is("test")); + } + + @Test(expected = StringIndexOutOfBoundsException.class) + public void shouldThrowInLegacyModeIfEndIndexIsLessThanStartIndex() { + // Given: + givenInLegacyMode(); + + // Then: + assertThat(udf.substring("a test string", 4, 2), is("st")); + } + + @Test(expected = StringIndexOutOfBoundsException.class) + public void shouldThrowInLegacyModeIfStartIndexOutOfBounds() { + // Given: + givenInLegacyMode(); + + // Then: + udf.substring("a test string", 100); + } + + @Test(expected = StringIndexOutOfBoundsException.class) + public void shouldThrowInLegacyModeIfEndIndexOutOfBounds() { + // Given: + givenInLegacyMode(); + + // Then: + udf.substring("a test string", 3, 100); + } + + @Test + public void shouldNotEnterLegacyModeIfConfigMissing() { + // When: + udf.configure(ImmutableMap.of()); + + // Then: + assertThat(udfIsInLegacyMode(), is(false)); + } + + @Test + public void shouldEnterLegacyModeWithTrueStringConfig() { + // When: + configure("true"); + + // Then: + assertThat(udfIsInLegacyMode(), is(true)); + } + + @Test + public void shouldEnterLegacyModeWithTrueBooleanConfig() { + // When: + configure(true); + + // Then: + assertThat(udfIsInLegacyMode(), is(true)); + } + + @Test + public void shouldNotEnterLegacyModeWithFalseStringConfig() { + // When: + configure("false"); + + // Then: + assertThat(udfIsInLegacyMode(), is(false)); + } + + @Test + public void shouldNotEnterLegacyModeWithFalseBooleanConfig() { + // When: + configure(false); + + // Then: + assertThat(udfIsInLegacyMode(), is(false)); + } + + @Test + public void shouldNotEnterLegacyModeWithOtherStringConfig() { + // When: + configure("what ever"); + + // Then: + assertThat(udfIsInLegacyMode(), is(false)); + } + + @Test(expected = ConfigException.class) + public void shouldThrowOnInvalidLegacyModeValueType() { + configure(1.0); + } + + private boolean udfIsInLegacyMode() { + // In legacy mode an NPE is thrown on null args: + try { + udf.substring(null, null); + return false; + } catch (final NullPointerException e) { + return true; + } + } + + private void givenInLegacyMode() { + configure(true); + } + + private void configure(final Object legacyMode) { + udf.configure(ImmutableMap.of(KsqlConfig.KSQL_FUNCTIONS_SUBSTRING_LEGACY_ARGS_CONFIG, legacyMode)); + } +} \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/UdfIntTest.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/UdfIntTest.java index 0d4f48d340af..917ff4519878 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/integration/UdfIntTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/UdfIntTest.java @@ -181,7 +181,7 @@ private void testShouldCastSelectedColumns(String resultStreamName, throws Exception { final String selectColumns = " CAST (ORDERUNITS AS INTEGER), CAST( PRICEARRAY[1]>1000 AS STRING), CAST (SUBSTRING" - + "(ITEMID, 5) AS DOUBLE), CAST(ORDERUNITS AS VARCHAR) "; + + "(ITEMID, 6) AS DOUBLE), CAST(ORDERUNITS AS VARCHAR) "; final String queryString = String.format( "CREATE STREAM %s AS SELECT %s FROM %s WHERE %s;", diff --git a/ksql-engine/src/test/resources/query-validation-tests/substring.json b/ksql-engine/src/test/resources/query-validation-tests/substring.json new file mode 100644 index 000000000000..9c34d1c3a1d5 --- /dev/null +++ b/ksql-engine/src/test/resources/query-validation-tests/substring.json @@ -0,0 +1,85 @@ +{ + "comments": [ + "Tests covering the use of the SUBSTRING function" + ], + "tests": [ + { + "name": "do substring with just pos", + "format": ["JSON"], + "statements": [ + "CREATE STREAM TEST (source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT SUBSTRING(source, 6) AS SUBSTRING, SUBSTRING(null, 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_POS FROM TEST;" + ], + "inputs": [ + {"topic": "test_topic", "key": 1, "value": {"source": "some_string"}, "timestamp": 0}, + {"topic": "test_topic", "key": 2, "value": {"source": "another"}, "timestamp": 0}, + {"topic": "test_topic", "key": 2, "value": {"source": "short"}, "timestamp": 0} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 1, "value": {"SUBSTRING":"string", "NULL_STR":null, "NULL_POS":null}, "timestamp": 0}, + {"topic": "OUTPUT", "key": 2, "value": {"SUBSTRING":"er", "NULL_STR":null, "NULL_POS":null}, "timestamp": 0}, + {"topic": "OUTPUT", "key": 2, "value": {"SUBSTRING":"", "NULL_STR":null, "NULL_POS":null}, "timestamp": 0} + ] + }, + { + "name": "do substring with pos and length", + "format": ["JSON"], + "statements": [ + "CREATE STREAM TEST (source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT SUBSTRING(source, 6, 3) AS SUBSTRING, SUBSTRING(null, 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_POS, SUBSTRING(source, 6, null) AS NULL_LEN FROM TEST;" + ], + "inputs": [ + {"topic": "test_topic", "key": 1, "value": {"source": "some_string"}, "timestamp": 0}, + {"topic": "test_topic", "key": 2, "value": {"source": "another"}, "timestamp": 0}, + {"topic": "test_topic", "key": 2, "value": {"source": "short"}, "timestamp": 0} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 1, "value": {"SUBSTRING":"str", "NULL_STR":null, "NULL_POS":null, "NULL_LEN":null}, "timestamp": 0}, + {"topic": "OUTPUT", "key": 2, "value": {"SUBSTRING":"er", "NULL_STR":null, "NULL_POS":null, "NULL_LEN":null}, "timestamp": 0}, + {"topic": "OUTPUT", "key": 2, "value": {"SUBSTRING":"", "NULL_STR":null, "NULL_POS":null, "NULL_LEN":null}, "timestamp": 0} + ] + }, + { + "name": "do substring in legacy mode with just start index", + "format": ["JSON"], + "properties": { + "ksql.functions.substring.legacy.args": true + }, + "statements": [ + "CREATE STREAM TEST (source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT SUBSTRING(source, 5) AS SUBSTRING, SUBSTRING(null, 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_START FROM TEST;" + ], + "inputs": [ + {"topic": "test_topic", "key": 1, "value": {"source": "some_string"}, "timestamp": 0}, + {"topic": "test_topic", "key": 2, "value": {"source": "another"}, "timestamp": 0}, + {"topic": "test_topic", "key": 2, "value": {"source": "short"}, "timestamp": 0} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 1, "value": {"SUBSTRING":"string", "NULL_STR":null, "NULL_START":null}, "timestamp": 0}, + {"topic": "OUTPUT", "key": 2, "value": {"SUBSTRING":"er", "NULL_STR":null, "NULL_START":null}, "timestamp": 0}, + {"topic": "OUTPUT", "key": 2, "value": {"SUBSTRING":"", "NULL_STR":null, "NULL_START":null}, "timestamp": 0} + ] + }, + { + "name": "do substring in legacy mode with start and end indexes", + "format": ["JSON"], + "properties": { + "ksql.functions.substring.legacy.args": true + }, + "statements": [ + "CREATE STREAM TEST (source VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT SUBSTRING(source, 1, 6) AS SUBSTRING, SUBSTRING(null, 1) AS NULL_STR, SUBSTRING(source, null) AS NULL_START, SUBSTRING(source, 6, null) AS NULL_END FROM TEST;" + ], + "inputs": [ + {"topic": "test_topic", "key": 1, "value": {"source": "some_string"}, "timestamp": 0}, + {"topic": "test_topic", "key": 2, "value": {"source": "another"}, "timestamp": 0}, + {"topic": "test_topic", "key": 2, "value": {"source": "short"}, "timestamp": 0} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 1, "value": {"SUBSTRING":"ome_s", "NULL_STR":null, "NULL_START":null, "NULL_END":null}, "timestamp": 0}, + {"topic": "OUTPUT", "key": 2, "value": {"SUBSTRING":"nothe", "NULL_STR":null, "NULL_START":null, "NULL_END":null}, "timestamp": 0}, + {"topic": "OUTPUT", "key": 2, "value": {"SUBSTRING":null, "NULL_STR":null, "NULL_START":null, "NULL_END":null}, "timestamp": 0} + ] + } + ] +} \ No newline at end of file