diff --git a/ksql-common/src/main/java/io/confluent/ksql/serde/FormatInfo.java b/ksql-common/src/main/java/io/confluent/ksql/serde/FormatInfo.java index 990f2e3ef005..be89be01e126 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/serde/FormatInfo.java +++ b/ksql-common/src/main/java/io/confluent/ksql/serde/FormatInfo.java @@ -62,11 +62,6 @@ private FormatInfo( } this.delimiter = Objects.requireNonNull(delimiter, "delimiter"); - - if (format != Format.DELIMITED && delimiter.isPresent()) { - throw new KsqlException("Delimeter only supported with DELIMITED format"); - } - } public Format getFormat() { diff --git a/ksql-common/src/test/java/io/confluent/ksql/serde/FormatInfoTest.java b/ksql-common/src/test/java/io/confluent/ksql/serde/FormatInfoTest.java index a40d8503b9ec..fd3554b23ef1 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/serde/FormatInfoTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/serde/FormatInfoTest.java @@ -123,24 +123,4 @@ public void shouldGetAvroSchemaName() { assertThat(FormatInfo.of(AVRO, Optional.empty(), Optional.empty()).getFullSchemaName(), is(Optional.empty())); } - - @Test - public void shouldThrowWhenAttemptingToUseValueDelimeterWithAvroFormat() { - // Then: - expectedException.expect(KsqlException.class); - expectedException.expectMessage("Delimeter only supported with DELIMITED format"); - - // When: - FormatInfo.of(Format.AVRO, Optional.of("something"), Optional.of(Delimiter.of('x'))); - } - - @Test - public void shouldThrowWhenAttemptingToUseValueDelimeterWithJsonFormat() { - // Then: - expectedException.expect(KsqlException.class); - expectedException.expectMessage("Delimeter only supported with DELIMITED format"); - - // When: - FormatInfo.of(Format.JSON, Optional.empty(), Optional.of(Delimiter.of('x'))); - } } \ No newline at end of file diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java index 1cffda3855da..d5d0c29786d6 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java @@ -261,6 +261,7 @@ private Optional getValueDelimiter(final Sink sink) { if (sink.getProperties().getValueDelimiter().isPresent()) { return sink.getProperties().getValueDelimiter(); } + return analysis .getFromDataSources() .get(0) diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/delimited.json b/ksql-functional-tests/src/test/resources/query-validation-tests/delimited.json index 2868824d0357..dd6364057c1e 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/delimited.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/delimited.json @@ -22,6 +22,20 @@ {"topic": "S2", "key": "100", "value": "100,100,100", "timestamp": 0} ] }, + { + "name": "select delimited value_format into another format", + "format": ["JSON", "AVRO"], + "statements": [ + "CREATE STREAM TEST (ID bigint, NAME varchar, VALUE integer) WITH (kafka_topic='test_topic', value_format='DELIMITED', value_delimiter=',');", + "CREATE STREAM S2 WITH(value_format='{FORMAT}') as SELECT id, name, value FROM test;" + ], + "inputs": [ + {"topic": "test_topic", "key": "0", "value": "0,zero,0", "timestamp": 0} + ], + "outputs": [ + {"topic": "S2", "key": "0", "value": {"ID": 0, "NAME": "zero", "VALUE": 0}, "timestamp": 0} + ] + }, { "name": "validate value_delimiter to be single character", "statements": [ @@ -35,7 +49,7 @@ "outputs": [] }, { - "name": "validate delimeter is not empty", + "name": "validate delimiter is not empty", "statements": [ "CREATE STREAM TEST WITH (kafka_topic='test_topic', value_format='DELIMITED', value_delimiter='');" ], @@ -53,7 +67,7 @@ "outputs": [] }, { - "name": "validate delimeter is not a space", + "name": "validate delimiter is not a space", "statements": [ "CREATE STREAM TEST WITH (kafka_topic='test_topic', value_format='DELIMITED', value_delimiter=' ');" ], @@ -65,7 +79,7 @@ "outputs": [] }, { - "name": "validate delimeter is not a tab character", + "name": "validate delimiter is not a tab character", "statements": [ "CREATE STREAM TEST WITH (kafka_topic='test_topic', value_format='DELIMITED', value_delimiter='\t');" ], @@ -151,24 +165,6 @@ {"topic": "S2", "key": "100", "value": "100\t100\t500", "timestamp": 0}, {"topic": "S2", "key": "100", "value": "100\t100\t100", "timestamp": 0} ] - }, - { - "name": "validate cannot specify delimeter with json format", - "statements": [ - "CREATE STREAM TEST WITH (kafka_topic='test_topic', value_format='JSON', value_delimiter='|');" - ], - "topics": [ - { - "name": "test_topic", - "format": "JSON" - } - ], - "expectedException": { - "type": "io.confluent.ksql.util.KsqlStatementException", - "message": "Delimeter only supported with DELIMITED format" - }, - "inputs": [], - "outputs": [] } ] } \ No newline at end of file diff --git a/ksql-serde/src/test/java/io/confluent/ksql/serde/delimited/KsqlDelimitedSerializerTest.java b/ksql-serde/src/test/java/io/confluent/ksql/serde/delimited/KsqlDelimitedSerializerTest.java index f292aa7d04ff..39ef57a4a734 100644 --- a/ksql-serde/src/test/java/io/confluent/ksql/serde/delimited/KsqlDelimitedSerializerTest.java +++ b/ksql-serde/src/test/java/io/confluent/ksql/serde/delimited/KsqlDelimitedSerializerTest.java @@ -217,16 +217,16 @@ public void shouldSerializeNegativeDecimalWithPaddedZeros() { } @Test - public void shouldSerializeRowCorrectlyWithTabDelimeter() { - shouldSerializeRowCorrectlyWithNonDefaultDelimeter('\t'); + public void shouldSerializeRowCorrectlyWithTabDelimiter() { + shouldSerializeRowCorrectlyWithNonDefaultDelimiter('\t'); } @Test - public void shouldSerializeRowCorrectlyWithBarDelimeter() { - shouldSerializeRowCorrectlyWithNonDefaultDelimeter('|'); + public void shouldSerializeRowCorrectlyWithBarDelimiter() { + shouldSerializeRowCorrectlyWithNonDefaultDelimiter('|'); } - private void shouldSerializeRowCorrectlyWithNonDefaultDelimeter(final char delimiter) { + private void shouldSerializeRowCorrectlyWithNonDefaultDelimiter(final char delimiter) { // Given: final Struct data = new Struct(SCHEMA) .put("ORDERTIME", 1511897796092L)