From b537f708bbb64e2c264203e2e44d5313d2064c4d Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Thu, 23 Jan 2020 09:03:44 -0800 Subject: [PATCH] fix: delimiters reset across non-delimited types (reverts #4366) --- .../io/confluent/ksql/serde/FormatInfo.java | 5 +++++ .../confluent/ksql/serde/FormatInfoTest.java | 20 +++++++++++++++++++ .../io/confluent/ksql/analyzer/Analyzer.java | 10 ++++++++++ .../query-validation-tests/delimited.json | 18 +++++++++++++++++ 4 files changed, 53 insertions(+) diff --git a/ksql-common/src/main/java/io/confluent/ksql/serde/FormatInfo.java b/ksql-common/src/main/java/io/confluent/ksql/serde/FormatInfo.java index be89be01e126..9a8c5fbb5a24 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/serde/FormatInfo.java +++ b/ksql-common/src/main/java/io/confluent/ksql/serde/FormatInfo.java @@ -62,6 +62,11 @@ private FormatInfo( } this.delimiter = Objects.requireNonNull(delimiter, "delimiter"); + + if (format != Format.DELIMITED && delimiter.isPresent()) { + throw new KsqlException("Delimiter only supported with DELIMITED format"); + } + } public Format getFormat() { diff --git a/ksql-common/src/test/java/io/confluent/ksql/serde/FormatInfoTest.java b/ksql-common/src/test/java/io/confluent/ksql/serde/FormatInfoTest.java index fd3554b23ef1..2e410950a5ad 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/serde/FormatInfoTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/serde/FormatInfoTest.java @@ -123,4 +123,24 @@ public void shouldGetAvroSchemaName() { assertThat(FormatInfo.of(AVRO, Optional.empty(), Optional.empty()).getFullSchemaName(), is(Optional.empty())); } + + @Test + public void shouldThrowWhenAttemptingToUseValueDelimiterWithAvroFormat() { + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Delimiter only supported with DELIMITED format"); + + // When: + FormatInfo.of(Format.AVRO, Optional.of("something"), Optional.of(Delimiter.of('x'))); + } + + @Test + public void shouldThrowWhenAttemptingToUseValueDelimiterWithJsonFormat() { + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Delimiter only supported with DELIMITED format"); + + // When: + FormatInfo.of(Format.JSON, Optional.empty(), Optional.of(Delimiter.of('x'))); + } } \ No newline at end of file diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java index d5d0c29786d6..8a866b9fb6ca 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java @@ -258,6 +258,16 @@ private Format getValueFormat(final Sink sink) { } private Optional getValueDelimiter(final Sink sink) { + if (getValueFormat(sink) != Format.DELIMITED) { + // the delimiter is not inherited across non-delimited types + // (e.g. if source A is DELIMITED with |, and I create sink B + // with JSON from A and then sink C with DELIMITED from B, C + // will use the default limiter, as opposed to |) + // see https://github.com/confluentinc/ksql/issues/4368 for + // more context + return Optional.empty(); + } + if (sink.getProperties().getValueDelimiter().isPresent()) { return sink.getProperties().getValueDelimiter(); } diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/delimited.json b/ksql-functional-tests/src/test/resources/query-validation-tests/delimited.json index dd6364057c1e..935020433f74 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/delimited.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/delimited.json @@ -165,6 +165,24 @@ {"topic": "S2", "key": "100", "value": "100\t100\t500", "timestamp": 0}, {"topic": "S2", "key": "100", "value": "100\t100\t100", "timestamp": 0} ] + }, + { + "name": "validate cannot specify delimiter with json format", + "statements": [ + "CREATE STREAM TEST WITH (kafka_topic='test_topic', value_format='JSON', value_delimiter='|');" + ], + "topics": [ + { + "name": "test_topic", + "format": "JSON" + } + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlStatementException", + "message": "Delimiter only supported with DELIMITED format" + }, + "inputs": [], + "outputs": [] } ] } \ No newline at end of file