Skip to content

Commit

Permalink
fix: UncaughtExceptionHandler not being set for Persistent Queries (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenpyzhang authored Dec 9, 2019
1 parent 1577801 commit e193a2a
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.query;

import io.confluent.ksql.util.KafkaStreamsUncaughtExceptionHandler;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
Expand All @@ -40,6 +41,7 @@ public BuildResult buildKafkaStreams(
props.putAll(conf);
final Topology topology = builder.build(props);
final KafkaStreams kafkaStreams = new KafkaStreams(topology, props, clientSupplier);
kafkaStreams.setUncaughtExceptionHandler(new KafkaStreamsUncaughtExceptionHandler());
return new BuildResult(topology, kafkaStreams);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import io.confluent.ksql.serde.GenericKeySerDe;
import io.confluent.ksql.serde.KeyFormat;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KafkaStreamsUncaughtExceptionHandler;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
Expand Down Expand Up @@ -163,8 +162,6 @@ public TransientQueryMetadata buildTransientQuery(
final BuildResult built =
kafkaStreamsBuilder.buildKafkaStreams(streamsBuilder, streamsProperties);

built.kafkaStreams.setUncaughtExceptionHandler(new KafkaStreamsUncaughtExceptionHandler());

final LogicalSchema transientSchema = buildTransientQuerySchema(schema);

return new TransientQueryMetadata(
Expand Down

0 comments on commit e193a2a

Please sign in to comment.