diff --git a/ksql-common/src/main/java/io/confluent/ksql/SchemaNotSupportedException.java b/ksql-common/src/main/java/io/confluent/ksql/SchemaNotSupportedException.java new file mode 100644 index 000000000000..2c2954b269a0 --- /dev/null +++ b/ksql-common/src/main/java/io/confluent/ksql/SchemaNotSupportedException.java @@ -0,0 +1,28 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql; + +import io.confluent.ksql.util.KsqlException; + +/** + * Thrown to indicate the schema is not supported. + */ +public class SchemaNotSupportedException extends KsqlException { + + public SchemaNotSupportedException(final String message, final Throwable cause) { + super(message, cause); + } +} diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/average-udaf.json b/ksql-functional-tests/src/test/resources/query-validation-tests/average-udaf.json index 940de096c401..41bec458d0c8 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/average-udaf.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/average-udaf.json @@ -77,6 +77,18 @@ {"topic": "OUTPUT", "key": "bob", "value": {"AVG": -66666.1}}, {"topic": "OUTPUT", "key": "alice", "value": {"AVG": 3.074457345651058E12}} ] + }, + { + "name": "average - DELIMITED", + "comment": "DELIMITED does not support STRUCT, so can't support AVG until we use a different internal format", + "statements": [ + "CREATE STREAM INPUT (VALUE integer) WITH (kafka_topic='test_topic', value_format='DELIMITED');", + "CREATE TABLE OUTPUT AS SELECT avg(value) AS avg FROM INPUT group by ROWKEY;" + ], + "expectedException": { + "type": "io.confluent.ksql.SchemaNotSupportedException", + "message": "One of the functions used in the statement has an intermediate type that the value format can not handle. Please remove the function or change the format." + } } ] } diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/GenericKeySerDe.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/GenericKeySerDe.java index 3c4a20584757..7b3a3de4e0a5 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/GenericKeySerDe.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/GenericKeySerDe.java @@ -20,12 +20,12 @@ import com.google.common.annotations.VisibleForTesting; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.ksql.SchemaNotSupportedException; import io.confluent.ksql.logging.processing.LoggingDeserializer; import io.confluent.ksql.logging.processing.ProcessingLogContext; import io.confluent.ksql.logging.processing.ProcessingLogger; import io.confluent.ksql.schema.ksql.PersistenceSchema; import io.confluent.ksql.util.KsqlConfig; -import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.SchemaUtil; import java.util.Collections; import java.util.Map; @@ -116,7 +116,7 @@ private Serde createInner( try { serdeFactories.validate(format, schema); } catch (final Exception e) { - throw new KsqlException("Key format does not support key schema." + throw new SchemaNotSupportedException("Key format does not support key schema." + System.lineSeparator() + "format: " + format.getFormat() + System.lineSeparator() diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/GenericRowSerDe.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/GenericRowSerDe.java index a60b6e9ac8c8..e30810eceb0f 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/GenericRowSerDe.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/GenericRowSerDe.java @@ -21,12 +21,12 @@ import com.google.common.annotations.VisibleForTesting; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.ksql.GenericRow; +import io.confluent.ksql.SchemaNotSupportedException; import io.confluent.ksql.logging.processing.LoggingDeserializer; import io.confluent.ksql.logging.processing.ProcessingLogContext; import io.confluent.ksql.logging.processing.ProcessingLogger; import io.confluent.ksql.schema.ksql.PersistenceSchema; import io.confluent.ksql.util.KsqlConfig; -import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.SchemaUtil; import java.util.ArrayList; import java.util.Collections; @@ -106,7 +106,7 @@ private Serde from( try { serdeFactories.validate(format, schema); } catch (final Exception e) { - throw new KsqlException("Value format does not support value schema." + throw new SchemaNotSupportedException("Value format does not support value schema." + System.lineSeparator() + "format: " + format.getFormat() + System.lineSeparator() diff --git a/ksql-serde/src/test/java/io/confluent/ksql/serde/GenericKeySerDeTest.java b/ksql-serde/src/test/java/io/confluent/ksql/serde/GenericKeySerDeTest.java index 76c406b67b84..3b842ba774ea 100644 --- a/ksql-serde/src/test/java/io/confluent/ksql/serde/GenericKeySerDeTest.java +++ b/ksql-serde/src/test/java/io/confluent/ksql/serde/GenericKeySerDeTest.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.ksql.SchemaNotSupportedException; import io.confluent.ksql.logging.processing.LoggingDeserializer; import io.confluent.ksql.logging.processing.ProcessingLogContext; import io.confluent.ksql.logging.processing.ProcessingLogger; @@ -33,7 +34,6 @@ import io.confluent.ksql.schema.ksql.PersistenceSchema; import io.confluent.ksql.serde.GenericKeySerDe.UnwrappedKeySerializer; import io.confluent.ksql.util.KsqlConfig; -import io.confluent.ksql.util.KsqlException; import java.time.Duration; import java.util.Collections; import java.util.Optional; @@ -122,7 +122,7 @@ public void shouldValidateFormatCanHandleSchema() { .when(serdeFactories).validate(FORMAT, WRAPPED_SCHEMA); // Expect: - expectedException.expect(KsqlException.class); + expectedException.expect(SchemaNotSupportedException.class); expectedException.expectMessage("Key format does not support key schema." + System.lineSeparator() + "format: JSON" diff --git a/ksql-serde/src/test/java/io/confluent/ksql/serde/GenericRowSerDeTest.java b/ksql-serde/src/test/java/io/confluent/ksql/serde/GenericRowSerDeTest.java index 2a9c484fb0e4..1a00523808c3 100644 --- a/ksql-serde/src/test/java/io/confluent/ksql/serde/GenericRowSerDeTest.java +++ b/ksql-serde/src/test/java/io/confluent/ksql/serde/GenericRowSerDeTest.java @@ -27,12 +27,12 @@ import com.google.common.collect.ImmutableMap; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.ksql.GenericRow; +import io.confluent.ksql.SchemaNotSupportedException; import io.confluent.ksql.logging.processing.ProcessingLogContext; import io.confluent.ksql.logging.processing.ProcessingLogger; import io.confluent.ksql.logging.processing.ProcessingLoggerFactory; import io.confluent.ksql.schema.ksql.PersistenceSchema; import io.confluent.ksql.util.KsqlConfig; -import io.confluent.ksql.util.KsqlException; import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.Optional; @@ -131,7 +131,7 @@ public void shouldValidateFormatCanHandleSchema() { .when(serdesFactories).validate(FORMAT, MUTLI_FIELD_SCHEMA); // Expect: - expectedException.expect(KsqlException.class); + expectedException.expect(SchemaNotSupportedException.class); expectedException.expectMessage("Value format does not support value schema." + System.lineSeparator() + "format: JSON" diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/AggregateBuilderUtils.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/AggregateBuilderUtils.java index c39ac1a1c28f..3f39217f632e 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/AggregateBuilderUtils.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/AggregateBuilderUtils.java @@ -16,6 +16,7 @@ package io.confluent.ksql.execution.streams; import io.confluent.ksql.GenericRow; +import io.confluent.ksql.SchemaNotSupportedException; import io.confluent.ksql.execution.builder.KsqlQueryBuilder; import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.execution.context.QueryContext.Stacker; @@ -32,6 +33,7 @@ import org.apache.kafka.streams.state.KeyValueStore; final class AggregateBuilderUtils { + private static final String MATERIALIZE_OP = "Materialize"; private static final String WINDOW_SELECT_OP = "WindowSelect"; private static final String TO_OUTPUT_SCHEMA_OP = "ToOutputSchema"; @@ -62,23 +64,23 @@ static Materialized> buildMater final LogicalSchema aggregateSchema, final Formats formats, final KsqlQueryBuilder queryBuilder, - final MaterializedFactory materializedFactory) { + final MaterializedFactory materializedFactory + ) { final PhysicalSchema physicalAggregationSchema = PhysicalSchema.from( aggregateSchema, formats.getOptions() ); + final QueryContext queryContext = materializeContext(step); - final Serde keySerde = queryBuilder.buildKeySerde( - formats.getKeyFormat(), - physicalAggregationSchema, - queryContext - ); - final Serde valueSerde = queryBuilder.buildValueSerde( - formats.getValueFormat(), - physicalAggregationSchema, - queryContext - ); - return materializedFactory.create(keySerde, valueSerde, StreamsUtil.buildOpName(queryContext)); + + final Serde keySerde = + buildKeySerde(formats, queryBuilder, physicalAggregationSchema, queryContext); + + final Serde valueSerde = + buildValueSerde(formats, queryBuilder, physicalAggregationSchema, queryContext); + + return materializedFactory + .create(keySerde, valueSerde, StreamsUtil.buildOpName(queryContext)); } static MaterializationInfo.Builder materializationInfoBuilder( @@ -91,4 +93,53 @@ static MaterializationInfo.Builder materializationInfoBuilder( return MaterializationInfo.builder(StreamsUtil.buildOpName(queryContext), aggregationSchema) .map(pl -> aggregator.getResultMapper(), outputSchema, queryContext); } + + private static Serde buildKeySerde( + final Formats formats, + final KsqlQueryBuilder queryBuilder, + final PhysicalSchema physicalAggregationSchema, + final QueryContext queryContext + ) { + try { + return queryBuilder.buildKeySerde( + formats.getKeyFormat(), + physicalAggregationSchema, + queryContext + ); + } catch (final SchemaNotSupportedException e) { + throw schemaNotSupportedException(e, "key"); + } + } + + private static Serde buildValueSerde( + final Formats formats, + final KsqlQueryBuilder queryBuilder, + final PhysicalSchema physicalAggregationSchema, + final QueryContext queryContext + ) { + try { + return queryBuilder.buildValueSerde( + formats.getValueFormat(), + physicalAggregationSchema, + queryContext + ); + } catch (final SchemaNotSupportedException e) { + throw schemaNotSupportedException(e, "value"); + } + } + + private static SchemaNotSupportedException schemaNotSupportedException( + final SchemaNotSupportedException e, + final String type + ) { + return new SchemaNotSupportedException( + "One of the functions used in the statement has an intermediate type that the " + + type + " format can not handle. " + + "Please remove the function or change the format." + + System.lineSeparator() + + "Consider up-voting https://github.com/confluentinc/ksql/issues/3950, " + + "which will resolve this limitation", + e + ); + } }