Skip to content

Commit

Permalink
[FLINK-11594][tests] Add assert to test that TM re-connects to JM
Browse files Browse the repository at this point in the history
  • Loading branch information
GJL committed Feb 18, 2019
1 parent 11972e1 commit 1e19407
Showing 1 changed file with 8 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -316,10 +316,12 @@ public void testHeartbeatTimeoutWithJobManager() throws Exception {
final UUID jmLeaderId = UUID.randomUUID();

final ResourceID jmResourceId = ResourceID.generate();
final CountDownLatch registrationAttempts = new CountDownLatch(2);
final CompletableFuture<TaskManagerLocation> taskManagerLocationFuture = new CompletableFuture<>();
final CompletableFuture<ResourceID> disconnectTaskManagerFuture = new CompletableFuture<>();
final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder()
.setRegisterTaskManagerFunction((s, taskManagerLocation) -> {
registrationAttempts.countDown();
taskManagerLocationFuture.complete(taskManagerLocation);
return CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId));
})
Expand Down Expand Up @@ -371,6 +373,9 @@ public void testHeartbeatTimeoutWithJobManager() throws Exception {
final ResourceID resourceID = disconnectTaskManagerFuture.get(heartbeatTimeout * 50L, TimeUnit.MILLISECONDS);
assertThat(resourceID, equalTo(taskManagerLocation.getResourceID()));

assertTrue(
"The TaskExecutor should try to reconnect to the JM",
registrationAttempts.await(timeout.toMilliseconds(), TimeUnit.SECONDS));
} finally {
RpcUtils.terminateRpcEndpoint(taskManager, timeout);
}
Expand Down Expand Up @@ -449,9 +454,9 @@ public void testHeartbeatTimeoutWithResourceManager() throws Exception {
// heartbeat timeout should trigger disconnect TaskManager from ResourceManager
assertThat(taskExecutorDisconnectFuture.get(heartbeatTimeout * 50L, TimeUnit.MILLISECONDS), equalTo(taskManagerLocation.getResourceID()));

// the TaskExecutor should try to reconnect to the RM
registrationAttempts.await();

assertTrue(
"The TaskExecutor should try to reconnect to the RM",
registrationAttempts.await(timeout.toMilliseconds(), TimeUnit.SECONDS));
} finally {
RpcUtils.terminateRpcEndpoint(taskManager, timeout);
}
Expand Down

0 comments on commit 1e19407

Please sign in to comment.