Skip to content

Commit

Permalink
Merge branch 'master' of github.com:confluentinc/ksql
Browse files Browse the repository at this point in the history
  • Loading branch information
big-andy-coates committed Dec 12, 2019
2 parents 54bc2be + da96259 commit 3eecd40
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 24 deletions.
13 changes: 13 additions & 0 deletions ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,13 @@ public class KsqlConfig extends AbstractConfig {
public static final Collection<CompatibilityBreakingConfigDef> COMPATIBLY_BREAKING_CONFIG_DEFS
= ImmutableList.of();

public static final String KSQL_SHUTDOWN_TIMEOUT_MS_CONFIG =
"ksql.streams.shutdown.timeout.ms";
public static final Long KSQL_SHUTDOWN_TIMEOUT_MS_DEFAULT = 300_000L;
public static final String KSQL_SHUTDOWN_TIMEOUT_MS_DOC = "Timeout in "
+ "milliseconds to block waiting for the underlying streams instance to exit";


private enum ConfigGeneration {
LEGACY,
CURRENT
Expand Down Expand Up @@ -499,6 +506,12 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_DEFAULT,
Importance.MEDIUM,
KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_DOC
).define(
KSQL_SHUTDOWN_TIMEOUT_MS_CONFIG,
Type.LONG,
KSQL_SHUTDOWN_TIMEOUT_MS_DEFAULT,
Importance.MEDIUM,
KSQL_SHUTDOWN_TIMEOUT_MS_DOC
)
.withClientSslSupport();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package io.confluent.ksql.query;

import static io.confluent.ksql.util.KsqlConfig.KSQL_SHUTDOWN_TIMEOUT_MS_CONFIG;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.errors.ProductionExceptionHandlerUtil;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
Expand Down Expand Up @@ -176,8 +178,8 @@ public TransientQueryMetadata buildTransientQuery(
built.topology,
streamsProperties,
overrides,
queryCloseCallback
);
queryCloseCallback,
ksqlConfig.getLong(KSQL_SHUTDOWN_TIMEOUT_MS_CONFIG));
}

private static Optional<MaterializationInfo> getMaterializationInfo(final Object result) {
Expand Down Expand Up @@ -237,8 +239,8 @@ public PersistentQueryMetadata buildQuery(
ksqlQueryBuilder.getSchemas(),
streamsProperties,
overrides,
queryCloseCallback
);
queryCloseCallback,
ksqlConfig.getLong(KSQL_SHUTDOWN_TIMEOUT_MS_CONFIG));
}

private TransientQueryQueue buildTransientQueryQueue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ public PersistentQueryMetadata(
final QuerySchemas schemas,
final Map<String, Object> streamsProperties,
final Map<String, Object> overriddenProperties,
final Consumer<QueryMetadata> closeCallback
) {
final Consumer<QueryMetadata> closeCallback,
final Long closeTimeout) {
// CHECKSTYLE_RULES.ON: ParameterNumberCheck
super(
statementString,
Expand All @@ -76,7 +76,8 @@ public PersistentQueryMetadata(
topology,
streamsProperties,
overriddenProperties,
closeCallback);
closeCallback,
closeTimeout);

this.id = requireNonNull(id, "id");
this.resultTopic = requireNonNull(resultTopic, "resultTopic");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import java.lang.Thread.UncaughtExceptionHandler;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -44,6 +45,7 @@ public class QueryMetadata {
private final Consumer<QueryMetadata> closeCallback;
private final Set<SourceName> sourceNames;
private final LogicalSchema logicalSchema;
private final Long closeTimeout;

private Optional<QueryStateListener> queryStateListener = Optional.empty();
private boolean everStarted = false;
Expand All @@ -59,8 +61,8 @@ protected QueryMetadata(
final Topology topology,
final Map<String, Object> streamsProperties,
final Map<String, Object> overriddenProperties,
final Consumer<QueryMetadata> closeCallback
) {
final Consumer<QueryMetadata> closeCallback,
final Long closeTimeout) {
// CHECKSTYLE_RULES.ON: ParameterNumberCheck
this.statementString = Objects.requireNonNull(statementString, "statementString");
this.kafkaStreams = Objects.requireNonNull(kafkaStreams, "kafkaStreams");
Expand All @@ -76,6 +78,7 @@ protected QueryMetadata(
this.closeCallback = Objects.requireNonNull(closeCallback, "closeCallback");
this.sourceNames = Objects.requireNonNull(sourceNames, "sourceNames");
this.logicalSchema = Objects.requireNonNull(logicalSchema, "logicalSchema");
this.closeTimeout = Objects.requireNonNull(closeTimeout, "closeTimeout");
}

protected QueryMetadata(final QueryMetadata other, final Consumer<QueryMetadata> closeCallback) {
Expand All @@ -89,6 +92,7 @@ protected QueryMetadata(final QueryMetadata other, final Consumer<QueryMetadata>
this.sourceNames = other.sourceNames;
this.logicalSchema = other.logicalSchema;
this.closeCallback = Objects.requireNonNull(closeCallback, "closeCallback");
this.closeTimeout = other.closeTimeout;
}

public void registerQueryStateListener(final QueryStateListener queryStateListener) {
Expand Down Expand Up @@ -141,7 +145,7 @@ public boolean hasEverBeenStarted() {
}

public void close() {
kafkaStreams.close();
kafkaStreams.close(Duration.ofMillis(closeTimeout));

kafkaStreams.cleanUp();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public TransientQueryMetadata(
final Topology topology,
final Map<String, Object> streamsProperties,
final Map<String, Object> overriddenProperties,
final Consumer<QueryMetadata> closeCallback) {
final Consumer<QueryMetadata> closeCallback,
final Long closeTimeout) {
// CHECKSTYLE_RULES.ON: ParameterNumberCheck
super(
statementString,
Expand All @@ -63,8 +64,8 @@ public TransientQueryMetadata(
topology,
streamsProperties,
overriddenProperties,
closeCallback
);
closeCallback,
closeTimeout);
this.limitHandlerSetter = Objects.requireNonNull(limitHandlerSetter, "limitHandlerSetter");
this.rowQueue = Objects.requireNonNull(rowQueue, "rowQueue");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

package io.confluent.ksql.util;

import static io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.inOrder;
Expand All @@ -28,6 +27,7 @@
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import java.time.Duration;
import java.util.Collections;
import java.util.Set;
import java.util.function.Consumer;
Expand All @@ -49,6 +49,7 @@ public class QueryMetadataTest {
.valueColumn(ColumnName.of("f0"), SqlTypes.STRING)
.build();
private static final Set<SourceName> SOME_SOURCES = ImmutableSet.of(SourceName.of("s1"), SourceName.of("s2"));
private static final Long closeTimeout = KsqlConfig.KSQL_SHUTDOWN_TIMEOUT_MS_DEFAULT;

@Mock
private Topology topoplogy;
Expand All @@ -72,8 +73,8 @@ public void setup() {
topoplogy,
Collections.emptyMap(),
Collections.emptyMap(),
closeCallback
);
closeCallback,
closeTimeout);
}

@Test
Expand Down Expand Up @@ -131,7 +132,7 @@ public void shouldCloseKStreamsAppOnCloseThenCloseCallback() {

// Then:
final InOrder inOrder = inOrder(kafkaStreams, closeCallback);
inOrder.verify(kafkaStreams).close();
inOrder.verify(kafkaStreams).close(Duration.ofMillis(closeTimeout));
inOrder.verify(closeCallback).accept(query);
}

Expand All @@ -142,7 +143,7 @@ public void shouldCleanUpKStreamsAppAfterCloseOnClose() {

// Then:
final InOrder inOrder = inOrder(kafkaStreams);
inOrder.verify(kafkaStreams).close();
inOrder.verify(kafkaStreams).close(Duration.ofMillis(closeTimeout));
inOrder.verify(kafkaStreams).cleanUp();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.QuerySchemas;
Expand Down Expand Up @@ -74,6 +75,7 @@ public class QueryDescriptionFactoryTest {
private static final ImmutableSet<SourceName> SOURCE_NAMES = ImmutableSet.of(SourceName.of("s1"), SourceName.of("s2"));
private static final String SQL_TEXT = "test statement";
private static final String TOPOLOGY_TEXT = "Topology Text";
private static final Long closeTimeout = KsqlConfig.KSQL_SHUTDOWN_TIMEOUT_MS_DEFAULT;

@Mock
private Consumer<QueryMetadata> queryCloseCallback;
Expand Down Expand Up @@ -108,7 +110,8 @@ public void setUp() {
topology,
STREAMS_PROPS,
PROP_OVERRIDES,
queryCloseCallback);
queryCloseCallback,
closeTimeout);

transientQueryDescription = QueryDescriptionFactory.forQueryMetadata(transientQuery);

Expand All @@ -128,7 +131,8 @@ public void setUp() {
QuerySchemas.of(new LinkedHashMap<>()),
STREAMS_PROPS,
PROP_OVERRIDES,
queryCloseCallback);
queryCloseCallback,
closeTimeout);

persistentQueryDescription = QueryDescriptionFactory.forQueryMetadata(persistentQuery);
}
Expand Down Expand Up @@ -221,7 +225,8 @@ public void shouldHandleRowTimeInValueSchemaForTransientQuery() {
topology,
STREAMS_PROPS,
PROP_OVERRIDES,
queryCloseCallback);
queryCloseCallback,
closeTimeout);

// When:
transientQueryDescription = QueryDescriptionFactory.forQueryMetadata(transientQuery);
Expand Down Expand Up @@ -255,7 +260,8 @@ public void shouldHandleRowKeyInValueSchemaForTransientQuery() {
topology,
STREAMS_PROPS,
PROP_OVERRIDES,
queryCloseCallback);
queryCloseCallback,
closeTimeout);

// When:
transientQueryDescription = QueryDescriptionFactory.forQueryMetadata(transientQuery);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public class StreamedQueryResourceTest {
private static final KsqlConfig VALID_CONFIG = new KsqlConfig(ImmutableMap.of(
StreamsConfig.APPLICATION_SERVER_CONFIG, "something:1"
));
private static final Long closeTimeout = KsqlConfig.KSQL_SHUTDOWN_TIMEOUT_MS_DEFAULT;

private static final String TOPIC_NAME = "test_stream";
private static final String PUSH_QUERY_STRING = "SELECT * FROM " + TOPIC_NAME + " EMIT CHANGES;";
Expand Down Expand Up @@ -378,7 +379,8 @@ public void shouldStreamRowsCorrectly() throws Throwable {
mock(Topology.class),
Collections.emptyMap(),
Collections.emptyMap(),
queryCloseCallback);
queryCloseCallback,
closeTimeout);

when(mockKsqlEngine.executeQuery(serviceContext,
ConfiguredStatement.of(query, requestStreamsProperties, VALID_CONFIG)))
Expand Down Expand Up @@ -451,7 +453,7 @@ public void shouldStreamRowsCorrectly() throws Throwable {
verify(mockKafkaStreams).start();
verify(mockKafkaStreams).setUncaughtExceptionHandler(any());
verify(mockKafkaStreams).cleanUp();
verify(mockKafkaStreams).close();
verify(mockKafkaStreams).close(Duration.ofMillis(closeTimeout));

// If one of the other threads has somehow managed to throw an exception without breaking things up until this
// point, we throw that exception now in the main thread and cause the test to fail
Expand Down

0 comments on commit 3eecd40

Please sign in to comment.