Skip to content

Commit

Permalink
fix feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
rodesai committed Feb 27, 2020
1 parent 2ec6f22 commit 6997894
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,10 @@ public void close(final boolean cleanUp) {
// write to the blocking queue, otherwise super.close call can deadlock:
rowQueue.close();

// Now safe to close:
super.close(cleanUp);
// Now safe to close
// We always pass cleanUp true, since transient queries should always have
// their internal topics cleaned up.
super.close(true);
isRunning.set(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.verify;

import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.query.BlockingRowQueue;
Expand Down Expand Up @@ -87,4 +88,22 @@ public void shouldCloseQueueBeforeTopologyToAvoidDeadLock() {
inOrder.verify(rowQueue).close();
inOrder.verify(kafkaStreams).close(any());
}

@Test
public void shouldCloseWithCleanUpTrue() {
// When:
query.close(true);

// Then:
verify(closeCallback).accept(query, true);
}

@Test
public void shouldCloseWithCleanUpTrueEvenIfPassedFalse() {
// When:
query.close(false);

// Then:
verify(closeCallback).accept(query, true);
}
}

0 comments on commit 6997894

Please sign in to comment.