From 6997894f0e65c7af0483298cde031d6f0c5fe3ff Mon Sep 17 00:00:00 2001 From: Rohan Date: Thu, 27 Feb 2020 09:40:48 -0800 Subject: [PATCH] fix feedback --- .../ksql/util/TransientQueryMetadata.java | 6 ++++-- .../ksql/util/TransientQueryMetadataTest.java | 19 +++++++++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/ksql-engine/src/main/java/io/confluent/ksql/util/TransientQueryMetadata.java b/ksql-engine/src/main/java/io/confluent/ksql/util/TransientQueryMetadata.java index edb2b4fab07d..938419443be4 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/util/TransientQueryMetadata.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/util/TransientQueryMetadata.java @@ -104,8 +104,10 @@ public void close(final boolean cleanUp) { // write to the blocking queue, otherwise super.close call can deadlock: rowQueue.close(); - // Now safe to close: - super.close(cleanUp); + // Now safe to close + // We always pass cleanUp true, since transient queries should always have + // their internal topics cleaned up. + super.close(true); isRunning.set(false); } } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/util/TransientQueryMetadataTest.java b/ksql-engine/src/test/java/io/confluent/ksql/util/TransientQueryMetadataTest.java index db35665cb8de..66cda3b6001d 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/util/TransientQueryMetadataTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/util/TransientQueryMetadataTest.java @@ -17,6 +17,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.verify; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.query.BlockingRowQueue; @@ -87,4 +88,22 @@ public void shouldCloseQueueBeforeTopologyToAvoidDeadLock() { inOrder.verify(rowQueue).close(); inOrder.verify(kafkaStreams).close(any()); } + + @Test + public void shouldCloseWithCleanUpTrue() { + // When: + query.close(true); + + // Then: + verify(closeCallback).accept(query, true); + } + + @Test + public void shouldCloseWithCleanUpTrueEvenIfPassedFalse() { + // When: + query.close(false); + + // Then: + verify(closeCallback).accept(query, true); + } } \ No newline at end of file