Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Add wait for exit function in WakeableLooper and fix exiting code in SocketChannelHelper #2651

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
* A WakeableLooper is a class that could:
Expand Down Expand Up @@ -52,12 +54,15 @@ public abstract class WakeableLooper {
// We will also multiple 1000*1000 to convert mill-seconds to nano-seconds
private static final Duration INFINITE_FUTURE = Duration.ofMillis(Integer.MAX_VALUE);
private volatile boolean exitLoop;
// Used as a flag that the looper has exited after exitLoop() is called.
private final CountDownLatch exitCountDownLatch;

public WakeableLooper() {
exitLoop = false;
tasksOnWakeup = new ArrayList<Runnable>();
timers = new PriorityQueue<TimerTask>();
exitTasks = new ArrayList<>();
exitCountDownLatch = new CountDownLatch(1);
}

public void clear() {
Expand Down Expand Up @@ -95,6 +100,21 @@ private void onExit() {
for (Runnable r : exitTasks) {
r.run();
}
exitCountDownLatch.countDown();
}

/**
* After exitLoop() is called, caller can use waitForExit() to make sure
* the looper has finished/skipped all sheduled tasks and the runOnce() function
* won't be called any more.
* @return true if the count down lanch reaches 0, false if the wait times out or is interrupted.
*/
public boolean waitForExit(long timeout, TimeUnit unit) {
try {
return exitCountDownLatch.await(timeout, unit);
} catch (InterruptedException e) {
return false;
}
}

protected abstract void doWait();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

import com.twitter.heron.common.basics.ByteAmount;
Expand Down Expand Up @@ -213,6 +214,10 @@ public void write() {
// Force to flush all data in underneath buffer queue to socket with best effort
// It is most likely happen when we are handling some unexpected cases, such as exiting
public void forceFlushWithBestEffort() {
looper.exitLoop();
// Wait for NIO loop to confirm stopping process.
looper.waitForExit(10, TimeUnit.SECONDS);
Copy link
Contributor

@huijunw huijunw Jan 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if the looper does not stop after 10 seconds

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function stops waiting and returns false.

In this case, the code works pretty much the same way as before after the waiting. Except exitLoop() set a flag to true so the looper won't do any real work afterwards.


LOG.info("Forcing to flush data to socket with best effort.");
while (!outgoingPacketsToWrite.isEmpty()) {
int writeState = outgoingPacketsToWrite.poll().writeToChannel(socketChannel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

import org.junit.After;
import org.junit.Assert;
Expand All @@ -27,7 +28,7 @@
* WakeableLooper Tester.
*/
public class WakeableLooperTest {
private static int globalValue;
private static volatile int globalValue;
private WakeableLooper slaveLooper;

@Before
Expand All @@ -41,6 +42,19 @@ public void after() {
slaveLooper = null;
}

class LooperThread extends Thread {
private WakeableLooper looper;

LooperThread(WakeableLooper looper) {
super();
this.looper = looper;
}

public void run() {
looper.loop();
}
}

/**
* Method: loop()
*/
Expand Down Expand Up @@ -142,6 +156,65 @@ public void run() {
Assert.assertEquals(10, globalValue);
}


/**
* Method: waitForExit()
*/
@Test
public void testWaitForExit() {
int sleepTimeMS = 200;
Runnable r = new Runnable() {
@Override
public void run() {
try {
slaveLooper.exitLoop(); // Exit after the first wake up
Thread.sleep(sleepTimeMS);
globalValue = 10;
} catch (InterruptedException e) {
return;
}
}
};
LooperThread looperThread = new LooperThread(slaveLooper);
looperThread.start();
long startTime = System.nanoTime();
slaveLooper.addTasksOnWakeup(r);
// Wait for it to finish.
boolean ret = slaveLooper.waitForExit(sleepTimeMS * 2, TimeUnit.MILLISECONDS);
long endTime = System.nanoTime();

Assert.assertTrue(ret);
Assert.assertTrue(endTime - startTime >= sleepTimeMS * 1000);
Assert.assertEquals(10, globalValue);
}

@Test
public void testWaitForExitTimeout() {
int sleepTimeMS = 200;
Runnable r = new Runnable() {
@Override
public void run() {
try {
slaveLooper.exitLoop(); // Exit after the first wake up
Thread.sleep(sleepTimeMS);
globalValue = 10;
} catch (InterruptedException e) {
return;
}
}
};
LooperThread looperThread = new LooperThread(slaveLooper);
looperThread.start();
long startTime = System.nanoTime();
slaveLooper.addTasksOnWakeup(r);
// Wait for it to finish.
boolean ret = slaveLooper.waitForExit(sleepTimeMS / 10, TimeUnit.MILLISECONDS);
long endTime = System.nanoTime();

Assert.assertFalse(ret);
Assert.assertEquals(6, globalValue);
}

/**
* Method: getNextTimeoutInterval()
*/
Expand Down