diff --git a/src/main/java/io/pravega/connectors/flink/ReaderCheckpointHook.java b/src/main/java/io/pravega/connectors/flink/ReaderCheckpointHook.java index d1b449c1..409d6e47 100644 --- a/src/main/java/io/pravega/connectors/flink/ReaderCheckpointHook.java +++ b/src/main/java/io/pravega/connectors/flink/ReaderCheckpointHook.java @@ -114,7 +114,7 @@ public CompletableFuture triggerCheckpoint( final String checkpointName = createCheckpointName(checkpointId); final CompletableFuture checkpointResult = - this.readerGroup.initiateCheckpoint(checkpointName, scheduledExecutorService) + this.readerGroup.initiateCheckpoint(checkpointName) .exceptionally(e -> { if (e instanceof MaxNumberOfCheckpointsExceededException) { readerGroup.cancelOutstandingCheckpoints(); diff --git a/src/test/java/io/pravega/connectors/flink/ReaderCheckpointHookTest.java b/src/test/java/io/pravega/connectors/flink/ReaderCheckpointHookTest.java index 77a878e9..c25c7c40 100644 --- a/src/test/java/io/pravega/connectors/flink/ReaderCheckpointHookTest.java +++ b/src/test/java/io/pravega/connectors/flink/ReaderCheckpointHookTest.java @@ -66,10 +66,10 @@ public void testTriggerCheckpoint() throws Exception { CompletableFuture checkpointPromise = new CompletableFuture<>(); TestableReaderCheckpointHook hook = new TestableReaderCheckpointHook(HOOK_UID, READER_GROUP_NAME, SCOPE, Time.minutes(1), clientConfig, readerGroupConfig); - when(hook.readerGroup.initiateCheckpoint(anyString(), any())).thenReturn(checkpointPromise); + when(hook.readerGroup.initiateCheckpoint(anyString())).thenReturn(checkpointPromise); CompletableFuture checkpointFuture = hook.triggerCheckpoint(1L, 1L, Executors.directExecutor()); assertThat(checkpointFuture).isNotNull(); - verify(hook.readerGroup).initiateCheckpoint(anyString(), any()); + verify(hook.readerGroup).initiateCheckpoint(anyString()); // complete the checkpoint promise Checkpoint expectedCheckpoint = mock(Checkpoint.class); @@ -85,11 +85,11 @@ public void testTriggerCheckpointTimeout() throws Exception { CompletableFuture checkpointPromise = new CompletableFuture<>(); TestableReaderCheckpointHook hook = new TestableReaderCheckpointHook(HOOK_UID, READER_GROUP_NAME, SCOPE, Time.minutes(1), clientConfig, readerGroupConfig); - when(hook.readerGroup.initiateCheckpoint(anyString(), any())).thenReturn(checkpointPromise); + when(hook.readerGroup.initiateCheckpoint(anyString())).thenReturn(checkpointPromise); CompletableFuture checkpointFuture = hook.triggerCheckpoint(1L, 1L, Executors.directExecutor()); assertThat(checkpointFuture).isNotNull(); - verify(hook.readerGroup).initiateCheckpoint(anyString(), any()); + verify(hook.readerGroup).initiateCheckpoint(anyString()); // invoke the timeout callback hook.invokeScheduledCallables(); @@ -104,11 +104,11 @@ public void testCancelWhenExceedingMaxOutstandingCheckpoints() throws Exception checkpointPromise.completeExceptionally(new MaxNumberOfCheckpointsExceededException("test")); TestableReaderCheckpointHook hook = new TestableReaderCheckpointHook(HOOK_UID, READER_GROUP_NAME, SCOPE, Time.minutes(1), clientConfig, readerGroupConfig); - when(hook.readerGroup.initiateCheckpoint(anyString(), any())).thenReturn(checkpointPromise); + when(hook.readerGroup.initiateCheckpoint(anyString())).thenReturn(checkpointPromise); CompletableFuture checkpointFuture = hook.triggerCheckpoint(1L, 1L, Executors.directExecutor()); assertThat(checkpointFuture).isNotNull(); - verify(hook.readerGroup).initiateCheckpoint(anyString(), any()); + verify(hook.readerGroup).initiateCheckpoint(anyString()); // invoke the cancelOutstandingCheckpoints verify(hook.readerGroup).cancelOutstandingCheckpoints();