Skip to content

Commit

Permalink
fix: fix parsing issue with LIST property overrides (#3601)
Browse files Browse the repository at this point in the history
Any config of type LIST would not be coerced properly by `KsqlRequest`.

e.g. setting `set 'consumer.interceptor.classes'='io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor';`

would see the value coerced to a LIST containing a single value of '[io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor]', rather than `io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor`.

This commit resolves this issue. No other `ConfigDef.Type` should have a similar issue, as they are all single value.
  • Loading branch information
big-andy-coates authored Oct 17, 2019
1 parent ba2b88b commit 6459fa1
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.confluent.ksql.util.KsqlException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -124,10 +125,21 @@ private static Map<String, Object> coerceTypes(final Map<String, Object> streams

private static Object coerceType(final String key, final Object value) {
try {
final String stringValue = value == null ? null : String.valueOf(value);
final String stringValue = value == null
? null
: value instanceof List
? listToString((List<?>) value)
: String.valueOf(value);

return PROPERTY_PARSER.parse(key, stringValue);
} catch (final Exception e) {
throw new KsqlException("Failed to set '" + key + "' to '" + value + "'", e);
}
}

private static String listToString(final List<?> value) {
return value.stream()
.map(e -> e == null ? null : e.toString())
.collect(Collectors.joining(","));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.testing.EqualsTester;
import io.confluent.ksql.json.JsonMapper;
Expand Down Expand Up @@ -202,6 +204,27 @@ public void shouldHandleNullPropertyValue() {
assertThat(props.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), is("earliest"));
}

@Test
public void shouldHandleOverridesOfTypeList() {
// Given:
final KsqlRequest request = new KsqlRequest(
"sql",
ImmutableMap.of(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ImmutableList.of("some.type")
),
null
);

// When:
final Map<String, Object> props = request.getStreamsProperties();

// Then:
assertThat(
props,
hasEntry(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ImmutableList.of("some.type"))
);
}

private static String serialize(final KsqlRequest request) {
try {
return OBJECT_MAPPER.writeValueAsString(request);
Expand Down

0 comments on commit 6459fa1

Please sign in to comment.