Skip to content

Commit

Permalink
Fixing Deadlock Bug Between ZK Callback & Event Thread On Acquiring C…
Browse files Browse the repository at this point in the history
…oordinator Object (#964)

* Fixing Deadlock Bug Between ZK Callback & Event Thread On Acquiring Coordinator Object

* Add INFO level logging

* Time Bound Waiting For CVs

* Added Couple Test Scenarios Ensuring No Deadlocks

* Checkstyle Fixes

* Use Intrinsic CV instead of explicity defining one

* Moving the Notify Method to Coordinator & Suppressing the Naked Notify Warning

* Customizing the heartbeat period for tests to 1 seconds if not explicity provided

---------

Co-authored-by: Shrinand Thakkar <[email protected]>
  • Loading branch information
shrinandthakkar and Shrinand Thakkar authored Oct 30, 2023
1 parent 0faec8e commit 55d2b08
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package com.linkedin.datastream.connectors.kafka;

import java.time.Duration;
import java.util.Properties;

import com.linkedin.datastream.common.zk.ZkClient;
Expand Down Expand Up @@ -41,6 +42,9 @@ public static Coordinator createCoordinator(String zkAddr, String cluster, Prope
props.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, zkAddr);
props.put(CoordinatorConfig.CONFIG_ZK_SESSION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_SESSION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
if (!props.containsKey(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS)) {
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));
}
props.putAll(override);
ZkClient client = new ZkClient(zkAddr);
CachedDatastreamReader cachedDatastreamReader = new CachedDatastreamReader(client, cluster);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ private Coordinator createCoordinator(String zkAddr, String cluster, Properties
props.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, zkAddr);
props.put(CoordinatorConfig.CONFIG_ZK_SESSION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_SESSION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
if (!props.containsKey(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS)) {
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));
}
props.putAll(override);
ZkClient client = new ZkClient(zkAddr);
_cachedDatastreamReader = new CachedDatastreamReader(client, cluster);
Expand All @@ -178,6 +181,9 @@ private Coordinator createCoordinator(String zkAddr, String cluster, Properties
props.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, zkAddr);
props.put(CoordinatorConfig.CONFIG_ZK_SESSION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_SESSION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
if (!props.containsKey(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS)) {
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));
}
props.putAll(override);
ZkClient client = new ZkClient(zkAddr);
_cachedDatastreamReader = new CachedDatastreamReader(client, cluster);
Expand Down Expand Up @@ -2986,6 +2992,7 @@ public void testCoordinatorLeaderCleanupTasksPostElection() throws Exception {
props.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, _zkConnectionString);
props.put(CoordinatorConfig.CONFIG_ZK_SESSION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_SESSION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));

ZkClient zkClient = new ZkClient(_zkConnectionString);
_cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
Expand Down Expand Up @@ -3098,6 +3105,7 @@ public void testNewlyElectedLeaderRevokesAssignmentTokens() throws Exception {
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ENABLE_ASSIGNMENT_TOKENS, String.valueOf(true));
props.put(CoordinatorConfig.CONFIG_STOP_PROPAGATION_TIMEOUT_MS, String.valueOf(1000));
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));

ZkClient zkClient = new ZkClient(_zkConnectionString);
_cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
Expand Down Expand Up @@ -3185,6 +3193,7 @@ void testOnSessionExpired(boolean handleNewSession) throws DatastreamException,
props.put(CoordinatorConfig.CONFIG_ZK_SESSION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_SESSION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_REINIT_ON_NEW_ZK_SESSION, String.valueOf(handleNewSession));
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));

ZkClient zkClient = new ZkClient(_zkConnectionString);
_cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
Expand Down Expand Up @@ -3275,6 +3284,7 @@ public void testClaimAssignmentTokensForStoppingStreams() throws Exception {
props.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, _zkConnectionString);
props.put(CoordinatorConfig.CONFIG_ZK_SESSION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_SESSION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));

ZkClient zkClient = new ZkClient(_zkConnectionString);
_cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
Expand Down Expand Up @@ -3341,6 +3351,7 @@ public void testTokensNotClaimedForConnectorThatFailedToStop() throws Exception
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_TASK_STOP_CHECK_TIMEOUT_MS, "100");
props.put(CoordinatorConfig.CONFIG_TASK_STOP_CHECK_RETRY_PERIOD_MS, "10");
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));

ZkClient zkClient = new ZkClient(_zkConnectionString);
_cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
Expand Down Expand Up @@ -3419,6 +3430,7 @@ public void testLeaderDoesNotPollForTokensIfFeatureIsDisabled() throws Exception
props.put(CoordinatorConfig.CONFIG_ZK_SESSION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_SESSION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ENABLE_ASSIGNMENT_TOKENS, String.valueOf(false));
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));

ZkClient zkClient = new ZkClient(_zkConnectionString);
_cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
Expand Down Expand Up @@ -3452,6 +3464,7 @@ public void testLeaderPollsForTokensAndRevokesThemIfTheyAreUnclaimed() throws Ex
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ENABLE_ASSIGNMENT_TOKENS, String.valueOf(true));
props.put(CoordinatorConfig.CONFIG_STOP_PROPAGATION_TIMEOUT_MS, String.valueOf(6000));
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));

ZkClient zkClient = new ZkClient(_zkConnectionString);
_cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
Expand Down Expand Up @@ -3503,6 +3516,7 @@ public void testLeaderPollsForTokensAndMarksTheDatastreamStoppedIfTheyAreClaimed
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ENABLE_ASSIGNMENT_TOKENS, String.valueOf(true));
props.put(CoordinatorConfig.CONFIG_STOP_PROPAGATION_TIMEOUT_MS, String.valueOf(6000));
props.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(Duration.ofSeconds(1).toMillis()));

ZkClient zkClient = new ZkClient(_zkConnectionString);
_cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
Expand Down Expand Up @@ -4077,6 +4091,153 @@ protected synchronized void handleEvent(CoordinatorEvent event) {
});
}

@Test
public void testSessionExpiryCallbackThreadAttemptingToAcquireCoordinatorObjectBeforeHandlingEvent() throws Exception {
String testCluster = "dummyCluster";
long testHeartbeatPeriod = Duration.ofSeconds(2).toMillis();

Properties properties = new Properties();
properties.put(CoordinatorConfig.CONFIG_CLUSTER, testCluster);
properties.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, _zkConnectionString);
// custom heartbeat period of 2 second.
properties.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(testHeartbeatPeriod));

final Coordinator.CoordinatorEventProcessor[] testCoordinatorEventProcessor = {null};
Coordinator coordinator =
createCoordinator(_zkConnectionString, testCluster, properties, new DummyTransportProviderAdminFactory(),
(cachedDatastreamReader, props) -> new Coordinator(cachedDatastreamReader, props) {
@Override
protected synchronized void createEventThread() {
testCoordinatorEventProcessor[0] = new CoordinatorEventProcessor() {
// Mimicking the coordinator's event thread's runnable method.
// 1. Sleeping before calling handleEvent to let zk session expiry
// thread acquire coordinator object before event thread enters
// handleEvent.
// 2. Handling a No-Op Event.
// 3. Notifying the zk session expiry threads to attempt acquiring the
// coordinator object.
@Override
public void run() {
// This flag will be enabled when an interrupt was called
// on the event thread while the event thread was sleeping.
boolean isInterruptedInSleep = false;
while (!isInterrupted()) {
try {
// Step 1
// Making sure we sleep for more than heartbeat period to
// mock the scenario where the zk session expiry thread
// acquires the coordinator object before the event thread does.
Thread.sleep(testHeartbeatPeriod + Duration.ofMillis(500).toMillis());
} catch (InterruptedException e) {
isInterruptedInSleep = true;
}
// Step 2
// Handling an event requires acquiring the coordinator's object.
handleEvent(new CoordinatorEvent(CoordinatorEvent.EventType.NO_OP, null));
// Step 3
notifyThreadsWaitingForCoordinatorObjectSynchronization();
if (isInterruptedInSleep) {
break;
}
}
}
};
testCoordinatorEventProcessor[0].setDaemon(true);
}

@Override
CoordinatorEventProcessor getEventThread() {
return testCoordinatorEventProcessor[0];
}
});

coordinator.start();
ZkClient zkClient = new ZkClient(_zkConnectionString);

coordinator.onSessionExpired();
Assert.assertTrue(PollUtils.poll(coordinator::isZkSessionExpired, 100, testHeartbeatPeriod));
// Making sure we don't run into a deadlock scenario.
Assert.assertTrue(PollUtils.poll(() -> !coordinator.getEventThread().isAlive(), 100, testHeartbeatPeriod));

coordinator.stop();
zkClient.close();
coordinator.getDatastreamCache().getZkclient().close();
}

@Test
public void testSessionExpiryCallbackThreadAttemptingToAcquireCoordinatorObjectAfterHandlingEvent() throws Exception {
String testCluster = "dummyCluster";
long testHeartbeatPeriod = Duration.ofSeconds(2).toMillis();

Properties properties = new Properties();
properties.put(CoordinatorConfig.CONFIG_CLUSTER, testCluster);
properties.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, _zkConnectionString);
// custom heartbeat period of 2 second.
properties.put(CoordinatorConfig.CONFIG_HEARTBEAT_PERIOD_MS, String.valueOf(testHeartbeatPeriod));

final Coordinator.CoordinatorEventProcessor[] testCoordinatorEventProcessor = {null};
Coordinator coordinator =
createCoordinator(_zkConnectionString, testCluster, properties, new DummyTransportProviderAdminFactory(),
(cachedDatastreamReader, props) -> new Coordinator(cachedDatastreamReader, props) {
@Override
protected synchronized void createEventThread() {
testCoordinatorEventProcessor[0] = new CoordinatorEventProcessor() {
// Mimicking the coordinator's event thread's runnable method.
// 1. Handling a No-Op Event.
// 2. Sleeping after calling handleEvent to let zk session expiry
// thread wait for notification from event thread to access coordinator
// object.
// 3. Notifying the zk session expiry threads to attempt acquiring the
// coordinator object.
@Override
public void run() {
// This flag will be enabled when an interrupt was called
// on the event thread while the event thread was sleeping.
boolean isInterruptedInSleep = false;
// Step 1
// Handling an event requires acquiring the coordinator's object.
handleEvent(new CoordinatorEvent(CoordinatorEvent.EventType.NO_OP, null));
while (!isInterrupted()) {
try {
// Step 2
// Making sure we sleep for less than heartbeat period to
// mock the scenario where the zk session expiry thread
// is waiting for notification from the event thread.
Thread.sleep(testHeartbeatPeriod - Duration.ofMillis(500).toMillis());
} catch (InterruptedException e) {
isInterruptedInSleep = true;
}
// Step 3
notifyThreadsWaitingForCoordinatorObjectSynchronization();
if (isInterruptedInSleep) {
break;
}
}
}
};
testCoordinatorEventProcessor[0].setDaemon(true);
}

@Override
CoordinatorEventProcessor getEventThread() {
return testCoordinatorEventProcessor[0];
}
});


coordinator.start();
ZkClient zkClient = new ZkClient(_zkConnectionString);

coordinator.onSessionExpired();
Assert.assertTrue(PollUtils.poll(coordinator::isZkSessionExpired, 100, testHeartbeatPeriod));
// Making sure we don't run into a deadlock scenario.
Assert.assertTrue(PollUtils.poll(() -> !coordinator.getEventThread().isAlive(), 100, testHeartbeatPeriod));

coordinator.stop();
zkClient.close();
coordinator.getDatastreamCache().getZkclient().close();
}

// This helper function helps compare the requesting topics with the topics reflected in the server.
private BooleanSupplier validateIfViolatingTopicsAreReflectedInServer(Datastream testStream, Coordinator coordinator,
Set<String> requestedThroughputViolatingTopics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,23 +321,30 @@ public void start() {
_heartbeatPeriod.toMillis() * 3, _heartbeatPeriod.toMillis(), TimeUnit.MILLISECONDS);
}

private synchronized void createEventThread() {
protected synchronized void createEventThread() {
_eventThread = new CoordinatorEventProcessor();
_eventThread.setDaemon(true);
}

private synchronized void startEventThread() {
if (!_shutdown) {
_eventThread.start();
CoordinatorEventProcessor eventThread = getEventThread();
eventThread.start();
}
}

private synchronized boolean stopEventThread() {
// interrupt the thread if it's not gracefully shutdown
while (_eventThread.isAlive()) {
CoordinatorEventProcessor eventThread = getEventThread();
while (eventThread.isAlive()) {
// Waits to acquire the Coordinator object for a maximum of _heartbeat period.
// The time bound waiting prevents the caller thread to not infinitely wait if
// the event thread is already shutdown.
waitForNotificationFromEventThread(_heartbeatPeriod);
try {
_eventThread.interrupt();
_eventThread.join(EVENT_THREAD_SHORT_JOIN_TIMEOUT);
_log.info("Attempting to interrupt the event thread.");
eventThread.interrupt();
eventThread.join(EVENT_THREAD_SHORT_JOIN_TIMEOUT);
} catch (InterruptedException e) {
_log.warn("Exception caught while interrupting the event thread", e);
return true;
Expand All @@ -346,16 +353,40 @@ private synchronized boolean stopEventThread() {
return false;
}

// Waiting for the event thread to die.
private synchronized boolean waitForEventThreadToJoin() {
CoordinatorEventProcessor eventThread = getEventThread();
if (!eventThread.isAlive()) {
return false;
}
// Waits to acquire the Coordinator object for a maximum of _heartbeat period.
// The time bound waiting prevents the caller thread to not infinitely wait if
// the event thread is already shutdown.
waitForNotificationFromEventThread(_heartbeatPeriod);
try {
_eventThread.join(EVENT_THREAD_LONG_JOIN_TIMEOUT);
_log.info("Waiting for {} milliseconds for the event thread to die.", EVENT_THREAD_LONG_JOIN_TIMEOUT);
eventThread.join(EVENT_THREAD_LONG_JOIN_TIMEOUT);
} catch (InterruptedException e) {
_log.warn("Exception caught while waiting the event thread to stop", e);
return true;
}
return false;
}

// Waits for a notification for specified duration from the event thread before acquiring the Coordinator object.
private synchronized void waitForNotificationFromEventThread(Duration duration) {
try {
// This intrinsic conditional variable helps to halt threads (zk callback threads, main server thread) before
// attempting to acquire the Coordinator object. We never halt the event thread (coordinator thread)
// explicitly via this CV.
_log.info("Thread {} will wait for notification from the event thread for {} ms.",
Thread.currentThread().getName(), duration.toMillis());
this.wait(duration.toMillis());
} catch (InterruptedException exception) {
_log.warn("Exception caught while waiting for the notification from the event thread", exception);
}
}

/**
* Stop coordinator (and all connectors)
*/
Expand Down Expand Up @@ -2206,6 +2237,14 @@ private void populateDatastreamDestinationFromExistingDatastream(Datastream data
existingStream.getMetadata().get(DatastreamMetadataConstants.TASK_PREFIX));
}

// Via the intrinsic conditional variable, notify other threads that might
// be waiting on acquiring access on the Coordinator object.
// We are only calling notify on the synchronized Coordinator Object's ("this") waiting threads.
// Suppressing the Naked_Notify warning on this.
protected synchronized void notifyThreadsWaitingForCoordinatorObjectSynchronization() {
this.notifyAll();
}

@Override
public List<BrooklinMetricInfo> getMetricInfos() {
return _metrics.getMetricInfos();
Expand Down Expand Up @@ -2296,7 +2335,7 @@ CachedDatastreamReader getDatastreamCache() {
return _datastreamCache;
}

private class CoordinatorEventProcessor extends Thread {
protected class CoordinatorEventProcessor extends Thread {
@Override
public void run() {
_log.info("START CoordinatorEventProcessor thread");
Expand All @@ -2305,6 +2344,7 @@ public void run() {
CoordinatorEvent event = _eventQueue.take();
if (event != null) {
handleEvent(event);
notifyThreadsWaitingForCoordinatorObjectSynchronization();
}
} catch (InterruptedException e) {
_log.warn("CoordinatorEventProcessor interrupted", e);
Expand Down
Loading

0 comments on commit 55d2b08

Please sign in to comment.