Skip to content

Commit

Permalink
fix: report clearer error message when AVG used with DELIMITED (#4295)
Browse files Browse the repository at this point in the history
See #4294

`AVG` doesn't work with `DELIMITED` format and the error message isn't great.

Example statements that cause the error:

```sql
-- Given:
CREATE STREAM INPUT (VALUE integer) WITH (kafka_topic='test_topic', value_format='DELIMITED');

-- When:
CREATE TABLE OUTPUT AS SELECT avg(value) AS avg FROM INPUT group by ROWKEY;
```

Old error message:

```
ksql> CREATE TABLE OUTPUT AS SELECT avg(value) AS avg FROM INPUT group by ROWKEY;
CREATE TABLE OUTPUT AS SELECT avg(value)Value format does not support value schema.
format: DELIMITED
schema: Persistence{schema=STRUCT<KSQL_INTERNAL_COL_0 INT, KSQL_INTERNAL_COL_1 VARCHAR, KSQL_AGG_VARIABLE_0 STRUCT<SUM INT, COUNT BIGINT>> NOT NULL, unwrapped=false}
reason: The 'DELIMITED' format does not support type 'STRUCT'
Caused by: The 'DELIMITED' format does not support type 'STRUCT'
```

This PR improves the error message a bit:

New error message:

```
One of the functions used in the statement has an intermediate type that the value format can not handle. Please remove the function or change the format.
Consider up-voting #3950, which will resolve this limitation
Caused by: Value format does not support value schema.
format: DELIMITED
schema:
	Persistence{schema=STRUCT<KSQL_INTERNAL_COL_0 INT, KSQL_INTERNAL_COL_1 VARCHAR,
	KSQL_AGG_VARIABLE_0 STRUCT<SUM INT, COUNT BIGINT>> NOT NULL,
	unwrapped=false}
reason: The 'DELIMITED' format does not support type 'STRUCT'
Caused by: The 'DELIMITED' format does not support type 'STRUCT'
```
  • Loading branch information
big-andy-coates authored Jan 13, 2020
1 parent 7a83bbf commit 307bf4d
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql;

import io.confluent.ksql.util.KsqlException;

/**
* Thrown to indicate the schema is not supported.
*/
public class SchemaNotSupportedException extends KsqlException {

public SchemaNotSupportedException(final String message, final Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@
{"topic": "OUTPUT", "key": "bob", "value": {"AVG": -66666.1}},
{"topic": "OUTPUT", "key": "alice", "value": {"AVG": 3.074457345651058E12}}
]
},
{
"name": "average - DELIMITED",
"comment": "DELIMITED does not support STRUCT, so can't support AVG until we use a different internal format",
"statements": [
"CREATE STREAM INPUT (VALUE integer) WITH (kafka_topic='test_topic', value_format='DELIMITED');",
"CREATE TABLE OUTPUT AS SELECT avg(value) AS avg FROM INPUT group by ROWKEY;"
],
"expectedException": {
"type": "io.confluent.ksql.SchemaNotSupportedException",
"message": "One of the functions used in the statement has an intermediate type that the value format can not handle. Please remove the function or change the format."
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@

import com.google.common.annotations.VisibleForTesting;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.SchemaNotSupportedException;
import io.confluent.ksql.logging.processing.LoggingDeserializer;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.SchemaUtil;
import java.util.Collections;
import java.util.Map;
Expand Down Expand Up @@ -116,7 +116,7 @@ private <T> Serde<Struct> createInner(
try {
serdeFactories.validate(format, schema);
} catch (final Exception e) {
throw new KsqlException("Key format does not support key schema."
throw new SchemaNotSupportedException("Key format does not support key schema."
+ System.lineSeparator()
+ "format: " + format.getFormat()
+ System.lineSeparator()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import com.google.common.annotations.VisibleForTesting;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.SchemaNotSupportedException;
import io.confluent.ksql.logging.processing.LoggingDeserializer;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.SchemaUtil;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -106,7 +106,7 @@ private <T> Serde<GenericRow> from(
try {
serdeFactories.validate(format, schema);
} catch (final Exception e) {
throw new KsqlException("Value format does not support value schema."
throw new SchemaNotSupportedException("Value format does not support value schema."
+ System.lineSeparator()
+ "format: " + format.getFormat()
+ System.lineSeparator()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.SchemaNotSupportedException;
import io.confluent.ksql.logging.processing.LoggingDeserializer;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.logging.processing.ProcessingLogger;
Expand All @@ -33,7 +34,6 @@
import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.serde.GenericKeySerDe.UnwrappedKeySerializer;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
Expand Down Expand Up @@ -122,7 +122,7 @@ public void shouldValidateFormatCanHandleSchema() {
.when(serdeFactories).validate(FORMAT, WRAPPED_SCHEMA);

// Expect:
expectedException.expect(KsqlException.class);
expectedException.expect(SchemaNotSupportedException.class);
expectedException.expectMessage("Key format does not support key schema."
+ System.lineSeparator()
+ "format: JSON"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.SchemaNotSupportedException;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.logging.processing.ProcessingLoggerFactory;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -131,7 +131,7 @@ public void shouldValidateFormatCanHandleSchema() {
.when(serdesFactories).validate(FORMAT, MUTLI_FIELD_SCHEMA);

// Expect:
expectedException.expect(KsqlException.class);
expectedException.expect(SchemaNotSupportedException.class);
expectedException.expectMessage("Value format does not support value schema."
+ System.lineSeparator()
+ "format: JSON"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.execution.streams;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.SchemaNotSupportedException;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.context.QueryContext.Stacker;
Expand All @@ -32,6 +33,7 @@
import org.apache.kafka.streams.state.KeyValueStore;

final class AggregateBuilderUtils {

private static final String MATERIALIZE_OP = "Materialize";
private static final String WINDOW_SELECT_OP = "WindowSelect";
private static final String TO_OUTPUT_SCHEMA_OP = "ToOutputSchema";
Expand Down Expand Up @@ -62,23 +64,23 @@ static Materialized<Struct, GenericRow, KeyValueStore<Bytes, byte[]>> buildMater
final LogicalSchema aggregateSchema,
final Formats formats,
final KsqlQueryBuilder queryBuilder,
final MaterializedFactory materializedFactory) {
final MaterializedFactory materializedFactory
) {
final PhysicalSchema physicalAggregationSchema = PhysicalSchema.from(
aggregateSchema,
formats.getOptions()
);

final QueryContext queryContext = materializeContext(step);
final Serde<Struct> keySerde = queryBuilder.buildKeySerde(
formats.getKeyFormat(),
physicalAggregationSchema,
queryContext
);
final Serde<GenericRow> valueSerde = queryBuilder.buildValueSerde(
formats.getValueFormat(),
physicalAggregationSchema,
queryContext
);
return materializedFactory.create(keySerde, valueSerde, StreamsUtil.buildOpName(queryContext));

final Serde<Struct> keySerde =
buildKeySerde(formats, queryBuilder, physicalAggregationSchema, queryContext);

final Serde<GenericRow> valueSerde =
buildValueSerde(formats, queryBuilder, physicalAggregationSchema, queryContext);

return materializedFactory
.create(keySerde, valueSerde, StreamsUtil.buildOpName(queryContext));
}

static MaterializationInfo.Builder materializationInfoBuilder(
Expand All @@ -91,4 +93,53 @@ static MaterializationInfo.Builder materializationInfoBuilder(
return MaterializationInfo.builder(StreamsUtil.buildOpName(queryContext), aggregationSchema)
.map(pl -> aggregator.getResultMapper(), outputSchema, queryContext);
}

private static Serde<Struct> buildKeySerde(
final Formats formats,
final KsqlQueryBuilder queryBuilder,
final PhysicalSchema physicalAggregationSchema,
final QueryContext queryContext
) {
try {
return queryBuilder.buildKeySerde(
formats.getKeyFormat(),
physicalAggregationSchema,
queryContext
);
} catch (final SchemaNotSupportedException e) {
throw schemaNotSupportedException(e, "key");
}
}

private static Serde<GenericRow> buildValueSerde(
final Formats formats,
final KsqlQueryBuilder queryBuilder,
final PhysicalSchema physicalAggregationSchema,
final QueryContext queryContext
) {
try {
return queryBuilder.buildValueSerde(
formats.getValueFormat(),
physicalAggregationSchema,
queryContext
);
} catch (final SchemaNotSupportedException e) {
throw schemaNotSupportedException(e, "value");
}
}

private static SchemaNotSupportedException schemaNotSupportedException(
final SchemaNotSupportedException e,
final String type
) {
return new SchemaNotSupportedException(
"One of the functions used in the statement has an intermediate type that the "
+ type + " format can not handle. "
+ "Please remove the function or change the format."
+ System.lineSeparator()
+ "Consider up-voting https://github.com/confluentinc/ksql/issues/3950, "
+ "which will resolve this limitation",
e
);
}
}

0 comments on commit 307bf4d

Please sign in to comment.