diff --git a/ksql-common/src/main/java/io/confluent/ksql/serde/FormatInfo.java b/ksql-common/src/main/java/io/confluent/ksql/serde/FormatInfo.java index be89be01e126..9a8c5fbb5a24 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/serde/FormatInfo.java +++ b/ksql-common/src/main/java/io/confluent/ksql/serde/FormatInfo.java @@ -62,6 +62,11 @@ private FormatInfo( } this.delimiter = Objects.requireNonNull(delimiter, "delimiter"); + + if (format != Format.DELIMITED && delimiter.isPresent()) { + throw new KsqlException("Delimiter only supported with DELIMITED format"); + } + } public Format getFormat() { diff --git a/ksql-common/src/test/java/io/confluent/ksql/serde/FormatInfoTest.java b/ksql-common/src/test/java/io/confluent/ksql/serde/FormatInfoTest.java index fd3554b23ef1..2e410950a5ad 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/serde/FormatInfoTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/serde/FormatInfoTest.java @@ -123,4 +123,24 @@ public void shouldGetAvroSchemaName() { assertThat(FormatInfo.of(AVRO, Optional.empty(), Optional.empty()).getFullSchemaName(), is(Optional.empty())); } + + @Test + public void shouldThrowWhenAttemptingToUseValueDelimiterWithAvroFormat() { + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Delimiter only supported with DELIMITED format"); + + // When: + FormatInfo.of(Format.AVRO, Optional.of("something"), Optional.of(Delimiter.of('x'))); + } + + @Test + public void shouldThrowWhenAttemptingToUseValueDelimiterWithJsonFormat() { + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Delimiter only supported with DELIMITED format"); + + // When: + FormatInfo.of(Format.JSON, Optional.empty(), Optional.of(Delimiter.of('x'))); + } } \ No newline at end of file diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java index d5d0c29786d6..525254a711c1 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java @@ -258,6 +258,16 @@ private Format getValueFormat(final Sink sink) { } private Optional getValueDelimiter(final Sink sink) { + if (getValueFormat(sink) != Format.DELIMITED) { + // the delimiter is not inherited across non-delimited types + // (e.g. if source A is DELIMITED with |, and I create sink B + // with JSON from A and then sink C with DELIMITED from B, C + // will use the default delimiter, as opposed to |) + // see https://github.com/confluentinc/ksql/issues/4368 for + // more context + return Optional.empty(); + } + if (sink.getProperties().getValueDelimiter().isPresent()) { return sink.getProperties().getValueDelimiter(); } diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/delimited.json b/ksql-functional-tests/src/test/resources/query-validation-tests/delimited.json index dd6364057c1e..935020433f74 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/delimited.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/delimited.json @@ -165,6 +165,24 @@ {"topic": "S2", "key": "100", "value": "100\t100\t500", "timestamp": 0}, {"topic": "S2", "key": "100", "value": "100\t100\t100", "timestamp": 0} ] + }, + { + "name": "validate cannot specify delimiter with json format", + "statements": [ + "CREATE STREAM TEST WITH (kafka_topic='test_topic', value_format='JSON', value_delimiter='|');" + ], + "topics": [ + { + "name": "test_topic", + "format": "JSON" + } + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlStatementException", + "message": "Delimiter only supported with DELIMITED format" + }, + "inputs": [], + "outputs": [] } ] } \ No newline at end of file