Skip to content

Commit

Permalink
Remove old timeout when deadline changes (#232)
Browse files Browse the repository at this point in the history
Remove old timeout when deadline changes
Motivation:

When the event loop timer deadline changes, the old timer would not be removed. This could potentially lead to memory issues.

Modification:

Use IORING_OP_TIMEOUT_REMOVE to remove a previously set timeout before a new one is registered. Use a generation counter to prevent race conditions between timeout add/remove in sqe/cqe.

Result:

Old timers are removed. At any time, there is only at most one timer per event loop. Fixes #211

---------

Co-authored-by: Norman Maurer <[email protected]>
  • Loading branch information
yawkat and normanmaurer authored Feb 19, 2024
1 parent bfdadb2 commit 1a6c356
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ public final class IOUringEventLoop extends SingleThreadEventLoop {
private final Runnable submitIOTask = () -> getRingBuffer().ioUringSubmissionQueue().submit();

private long prevDeadlineNanos = NONE;
/**
* This is a "generation" counter that is passed to addTimeout. It ensures that the expiry of a previous timeout
* doesn't make us think the current timeout has expired, which could lead to wrongly not removing the current
* timeout when it's adjusted again.
*/
private short prevTimeoutGeneration = -1;
private boolean pendingWakeup;

IOUringEventLoop(IOUringEventLoopGroup parent, Executor executor, int ringSize, int iosqeAsyncThreshold,
Expand Down Expand Up @@ -179,8 +185,15 @@ protected void run() {
try {
if (!hasTasks()) {
if (curDeadlineNanos != prevDeadlineNanos) {
if (prevDeadlineNanos != NONE) {
submissionQueue.removeTimeout(prevTimeoutGeneration);
}
if (curDeadlineNanos != NONE) {
short generation = (short) (prevTimeoutGeneration + 1);
submissionQueue.addTimeout(deadlineToDelayNanos(curDeadlineNanos), generation);
prevTimeoutGeneration = generation;
}
prevDeadlineNanos = curDeadlineNanos;
submissionQueue.addTimeout(deadlineToDelayNanos(curDeadlineNanos), (short) 0);
}

// Check there were any completion events to process
Expand Down Expand Up @@ -249,8 +262,14 @@ private void handle(int fd, int res, int flags, byte op, short data) {
addEventFdRead(ringBuffer.ioUringSubmissionQueue());
} else if (op == Native.IORING_OP_TIMEOUT) {
if (res == Native.ERRNO_ETIME_NEGATIVE) {
prevDeadlineNanos = NONE;
if (data == prevTimeoutGeneration) {
prevDeadlineNanos = NONE;
} else {
logger.trace("Timeout of previous generation timer");
}
}
} else if (op == Native.IORING_OP_TIMEOUT_REMOVE) {
// do nothing
} else {
// Remaining events should be channel-specific
final AbstractIOUringChannel channel = channels.get(fd);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ boolean addTimeout(long nanoSeconds, short extraData) {
return enqueueSqe(Native.IORING_OP_TIMEOUT, 0, 0, -1, timeoutMemoryAddress, 1, 0, extraData);
}

boolean removeTimeout(short extraData) {
return enqueueSqe(Native.IORING_OP_TIMEOUT_REMOVE, 0, 0, -1, encode(-1, Native.IORING_OP_TIMEOUT, extraData), 0, 0, extraData);
}

boolean addPollIn(int fd) {
return addPoll(fd, Native.POLLIN);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public void run() {
static final int ERRNO_ETIME_NEGATIVE = -NativeStaticallyReferencedJniMethods.etime();
static final byte IORING_OP_POLL_ADD = NativeStaticallyReferencedJniMethods.ioringOpPollAdd();
static final byte IORING_OP_TIMEOUT = NativeStaticallyReferencedJniMethods.ioringOpTimeout();
static final byte IORING_OP_TIMEOUT_REMOVE = NativeStaticallyReferencedJniMethods.ioringOpTimeoutRemove();
static final byte IORING_OP_ACCEPT = NativeStaticallyReferencedJniMethods.ioringOpAccept();
static final byte IORING_OP_READ = NativeStaticallyReferencedJniMethods.ioringOpRead();
static final byte IORING_OP_WRITE = NativeStaticallyReferencedJniMethods.ioringOpWrite();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ private NativeStaticallyReferencedJniMethods() { }
static native byte ioringOpPollAdd();
static native byte ioringOpPollRemove();
static native byte ioringOpTimeout();
static native byte ioringOpTimeoutRemove();
static native byte ioringOpAccept();
static native byte ioringOpRead();
static native byte ioringOpWrite();
Expand Down
5 changes: 5 additions & 0 deletions transport-native-io_uring/src/main/c/netty_io_uring_native.c
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,10 @@ static jbyte netty_io_uring_ioringOpTimeout(JNIEnv* env, jclass clazz) {
return IORING_OP_TIMEOUT;
}

static jbyte netty_io_uring_ioringOpTimeoutRemove(JNIEnv* env, jclass clazz) {
return IORING_OP_TIMEOUT_REMOVE;
}

static jbyte netty_io_uring_ioringOpAccept(JNIEnv* env, jclass clazz) {
return IORING_OP_ACCEPT;
}
Expand Down Expand Up @@ -601,6 +605,7 @@ static const JNINativeMethod statically_referenced_fixed_method_table[] = {
{ "ioringOpPollAdd", "()B", (void *) netty_io_uring_ioringOpPollAdd },
{ "ioringOpPollRemove", "()B", (void *) netty_io_uring_ioringOpPollRemove },
{ "ioringOpTimeout", "()B", (void *) netty_io_uring_ioringOpTimeout },
{ "ioringOpTimeoutRemove", "()B", (void *) netty_io_uring_ioringOpTimeoutRemove },
{ "ioringOpAccept", "()B", (void *) netty_io_uring_ioringOpAccept },
{ "ioringOpRead", "()B", (void *) netty_io_uring_ioringOpRead },
{ "ioringOpWrite", "()B", (void *) netty_io_uring_ioringOpWrite },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.nio.charset.Charset;
import java.nio.file.Path;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -142,6 +143,67 @@ public void handle(int fd, int res, int flags, byte op, short mask) {
ringBuffer.close();
}

@Test
public void timeoutRemoveTest() throws Exception {

RingBuffer ringBuffer = Native.createRingBuffer(32);
IOUringSubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
final IOUringCompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();

assertNotNull(ringBuffer);
assertNotNull(submissionQueue);
assertNotNull(completionQueue);

Thread thread = new Thread() {
@Override
public void run() {
completionQueue.ioUringWaitCqe();
completionQueue.process(new IOUringCompletionQueueCallback() {
boolean seenTimeout = false;
boolean seenTimeoutRemove = false;

@Override
public void handle(int fd, int res, int flags, byte op, short mask) {
if (op == Native.IORING_OP_TIMEOUT) {
assertFalse(seenTimeout);
seenTimeout = true;
// -ECANCELED
assertEquals(-125, res);
} else if (op == Native.IORING_OP_TIMEOUT_REMOVE) {
assertFalse(seenTimeoutRemove);
seenTimeoutRemove = true;
} else {
fail();
}
}
});
completionQueue.ioUringWaitCqe();
completionQueue.process(new IOUringCompletionQueueCallback() {
@Override
public void handle(int fd, int res, int flags, byte op, short mask) {
System.out.println("evt " + op);
}
});
}
};
thread.start();
try {
Thread.sleep(80);
} catch (InterruptedException e) {
e.printStackTrace();
}

Duration d = Duration.ofSeconds(5);
submissionQueue.addTimeout(d.toNanos(), (short) 0);
submissionQueue.removeTimeout((short) 0);
submissionQueue.submit();

thread.join(d.multipliedBy(2).toMillis());
assertTrue(thread.isAlive());
thread.interrupt();
ringBuffer.close();
}

//Todo clean
@Test
public void eventfdTest() throws Exception {
Expand Down

0 comments on commit 1a6c356

Please sign in to comment.