Skip to content

Commit

Permalink
Remove old timeout when deadline changes
Browse files Browse the repository at this point in the history
I had a memory issue and @franz1981 suggested #211 as the cause. This patch is my fix for that bug, though I don't believe my mem issue was ultimately caused by this.

This PR does the legwork for adding ioringOpTimeoutRemove, and implementing a test. However two things can still be improved:

- [ ] could use IORING_TIMEOUT_UPDATE (see #211) to save one sqe.
- [ ] there may be a race in IOUringEventLoop between the addTimeout and the IORING_OP_TIMEOUT handler. If the kernel fires a deadline cqe, then we send a deadline update sqe, and only then we process the first cqe, prevDeadlineNanos is NONE even though we've submitted a new deadline. I'm not sure if this can actually happen since deadline changes should only adjust the deadline downwards, not upwards? Not sure.
  • Loading branch information
yawkat committed Jan 24, 2024
1 parent cdfcb8a commit 4cb676d
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,13 @@ protected void run() {
try {
if (!hasTasks()) {
if (curDeadlineNanos != prevDeadlineNanos) {
if (prevDeadlineNanos != NONE) {
submissionQueue.removeTimeout((short) 0);
}
if (curDeadlineNanos != NONE) {
submissionQueue.addTimeout(deadlineToDelayNanos(curDeadlineNanos), (short) 0);
}
prevDeadlineNanos = curDeadlineNanos;
submissionQueue.addTimeout(deadlineToDelayNanos(curDeadlineNanos), (short) 0);
}

// Check there were any completion events to process
Expand Down Expand Up @@ -251,6 +256,8 @@ private void handle(int fd, int res, int flags, byte op, short data) {
if (res == Native.ERRNO_ETIME_NEGATIVE) {
prevDeadlineNanos = NONE;
}
} else if (op == Native.IORING_OP_TIMEOUT_REMOVE) {
// do nothing
} else {
// Remaining events should be channel-specific
final AbstractIOUringChannel channel = channels.get(fd);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ boolean addTimeout(long nanoSeconds, short extraData) {
return enqueueSqe(Native.IORING_OP_TIMEOUT, 0, 0, -1, timeoutMemoryAddress, 1, 0, extraData);
}

boolean removeTimeout(short extraData) {
return enqueueSqe(Native.IORING_OP_TIMEOUT_REMOVE, 0, 0, -1, encode(-1, Native.IORING_OP_TIMEOUT, extraData), 0, 0, extraData);
}

boolean addPollIn(int fd) {
return addPoll(fd, Native.POLLIN);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public void run() {
static final int ERRNO_ETIME_NEGATIVE = -NativeStaticallyReferencedJniMethods.etime();
static final byte IORING_OP_POLL_ADD = NativeStaticallyReferencedJniMethods.ioringOpPollAdd();
static final byte IORING_OP_TIMEOUT = NativeStaticallyReferencedJniMethods.ioringOpTimeout();
static final byte IORING_OP_TIMEOUT_REMOVE = NativeStaticallyReferencedJniMethods.ioringOpTimeoutRemove();
static final byte IORING_OP_ACCEPT = NativeStaticallyReferencedJniMethods.ioringOpAccept();
static final byte IORING_OP_READ = NativeStaticallyReferencedJniMethods.ioringOpRead();
static final byte IORING_OP_WRITE = NativeStaticallyReferencedJniMethods.ioringOpWrite();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ private NativeStaticallyReferencedJniMethods() { }
static native byte ioringOpPollAdd();
static native byte ioringOpPollRemove();
static native byte ioringOpTimeout();
static native byte ioringOpTimeoutRemove();
static native byte ioringOpAccept();
static native byte ioringOpRead();
static native byte ioringOpWrite();
Expand Down
5 changes: 5 additions & 0 deletions transport-native-io_uring/src/main/c/netty_io_uring_native.c
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,10 @@ static jbyte netty_io_uring_ioringOpTimeout(JNIEnv* env, jclass clazz) {
return IORING_OP_TIMEOUT;
}

static jbyte netty_io_uring_ioringOpTimeoutRemove(JNIEnv* env, jclass clazz) {
return IORING_OP_TIMEOUT_REMOVE;
}

static jbyte netty_io_uring_ioringOpAccept(JNIEnv* env, jclass clazz) {
return IORING_OP_ACCEPT;
}
Expand Down Expand Up @@ -601,6 +605,7 @@ static const JNINativeMethod statically_referenced_fixed_method_table[] = {
{ "ioringOpPollAdd", "()B", (void *) netty_io_uring_ioringOpPollAdd },
{ "ioringOpPollRemove", "()B", (void *) netty_io_uring_ioringOpPollRemove },
{ "ioringOpTimeout", "()B", (void *) netty_io_uring_ioringOpTimeout },
{ "ioringOpTimeoutRemove", "()B", (void *) netty_io_uring_ioringOpTimeoutRemove },
{ "ioringOpAccept", "()B", (void *) netty_io_uring_ioringOpAccept },
{ "ioringOpRead", "()B", (void *) netty_io_uring_ioringOpRead },
{ "ioringOpWrite", "()B", (void *) netty_io_uring_ioringOpWrite },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.nio.charset.Charset;
import java.nio.file.Path;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -142,6 +143,64 @@ public void handle(int fd, int res, int flags, byte op, short mask) {
ringBuffer.close();
}

@Test
public void timeoutRemoveTest() throws Exception {

RingBuffer ringBuffer = Native.createRingBuffer(32);
IOUringSubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
final IOUringCompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();

assertNotNull(ringBuffer);
assertNotNull(submissionQueue);
assertNotNull(completionQueue);

Thread thread = new Thread() {
@Override
public void run() {
completionQueue.ioUringWaitCqe();
completionQueue.process(new IOUringCompletionQueueCallback() {
int i = 0;

@Override
public void handle(int fd, int res, int flags, byte op, short mask) {
if (i == 0) {
assertEquals(Native.IORING_OP_TIMEOUT, op);
assertEquals(-125, res);
} else if (i == 1) {
assertEquals(Native.IORING_OP_TIMEOUT_REMOVE, op);
} else {
fail();
}
i++;
}
});
completionQueue.ioUringWaitCqe();
completionQueue.process(new IOUringCompletionQueueCallback() {
@Override
public void handle(int fd, int res, int flags, byte op, short mask) {
System.out.println("evt " + op);
}
});
}
};
thread.start();
try {
Thread.sleep(80);
} catch (InterruptedException e) {
e.printStackTrace();
}

Duration d = Duration.ofSeconds(5);
submissionQueue.addTimeout(d.toNanos(), (short) 0);
submissionQueue.removeTimeout((short) 0);
submissionQueue.submit();

thread.join(d.multipliedBy(2).toMillis());
assertTrue(thread.isAlive());
thread.interrupt();
ringBuffer.close();
}

//Todo clean
@Test
public void eventfdTest() throws Exception {
Expand Down

0 comments on commit 4cb676d

Please sign in to comment.