Skip to content

Commit

Permalink
fix: delimiters reset across non-delimited types (reverts confluentin…
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed Jan 23, 2020
1 parent a2b69f7 commit b537f70
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ private FormatInfo(
}

this.delimiter = Objects.requireNonNull(delimiter, "delimiter");

if (format != Format.DELIMITED && delimiter.isPresent()) {
throw new KsqlException("Delimiter only supported with DELIMITED format");
}

}

public Format getFormat() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,24 @@ public void shouldGetAvroSchemaName() {
assertThat(FormatInfo.of(AVRO, Optional.empty(), Optional.empty()).getFullSchemaName(),
is(Optional.empty()));
}

@Test
public void shouldThrowWhenAttemptingToUseValueDelimiterWithAvroFormat() {
// Then:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Delimiter only supported with DELIMITED format");

// When:
FormatInfo.of(Format.AVRO, Optional.of("something"), Optional.of(Delimiter.of('x')));
}

@Test
public void shouldThrowWhenAttemptingToUseValueDelimiterWithJsonFormat() {
// Then:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Delimiter only supported with DELIMITED format");

// When:
FormatInfo.of(Format.JSON, Optional.empty(), Optional.of(Delimiter.of('x')));
}
}
10 changes: 10 additions & 0 deletions ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,16 @@ private Format getValueFormat(final Sink sink) {
}

private Optional<Delimiter> getValueDelimiter(final Sink sink) {
if (getValueFormat(sink) != Format.DELIMITED) {
// the delimiter is not inherited across non-delimited types
// (e.g. if source A is DELIMITED with |, and I create sink B
// with JSON from A and then sink C with DELIMITED from B, C
// will use the default limiter, as opposed to |)
// see https://github.com/confluentinc/ksql/issues/4368 for
// more context
return Optional.empty();
}

if (sink.getProperties().getValueDelimiter().isPresent()) {
return sink.getProperties().getValueDelimiter();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,24 @@
{"topic": "S2", "key": "100", "value": "100\t100\t500", "timestamp": 0},
{"topic": "S2", "key": "100", "value": "100\t100\t100", "timestamp": 0}
]
},
{
"name": "validate cannot specify delimiter with json format",
"statements": [
"CREATE STREAM TEST WITH (kafka_topic='test_topic', value_format='JSON', value_delimiter='|');"
],
"topics": [
{
"name": "test_topic",
"format": "JSON"
}
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Delimiter only supported with DELIMITED format"
},
"inputs": [],
"outputs": []
}
]
}

0 comments on commit b537f70

Please sign in to comment.