Skip to content

Commit

Permalink
Remove dependency on sun.misc.Unsafe (java-native-access#302)
Browse files Browse the repository at this point in the history
Motivation:

At the moment our code depends on sun.misc.Unsafe. We should better not do this if possible as it might go away at some point.

Modifications:

- Rewrite code to just use ByteBuffer directly and not Unsafe

Result:

No more dependency on sun.misc.Unsafe
  • Loading branch information
normanmaurer authored Jun 18, 2021
1 parent c0e7880 commit fdd5be1
Show file tree
Hide file tree
Showing 8 changed files with 299 additions and 215 deletions.
19 changes: 19 additions & 0 deletions src/main/java/io/netty/incubator/codec/quic/Quiche.java
Original file line number Diff line number Diff line change
Expand Up @@ -594,11 +594,30 @@ static long memoryAddress(ByteBuf buf) {
buffer_memory_address(buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()));
}

/**
* Returns the memory address of the given {@link ByteBuffer}. If you want to also respect the
* {@link ByteBuffer#position()} use {@link #memoryAddressWithPosition(ByteBuffer)}.
*
* @param buf the {@link ByteBuffer} of which we want to obtain the memory address..
* @return the memory address of this {@link ByteBuffer}.
*/
static long memoryAddress(ByteBuffer buf) {
assert buf.isDirect();
return buffer_memory_address(buf);
}

/**
* Returns the memory address of the given {@link ByteBuffer} taking its current {@link ByteBuffer#position()} into
* account.
*
* @param buf the {@link ByteBuffer} of which we want to obtain the memory address
* (taking its {@link ByteBuffer#position()} into account).
* @return the memory address of this {@link ByteBuffer}s position.
*/
static long memoryAddressWithPosition(ByteBuffer buf) {
return memoryAddress(buf) + buf.position();
}

@SuppressWarnings("deprecation")
static ByteBuf allocateNativeOrder(int capacity) {
// Just use Unpooled as the life-time of these buffers is long.
Expand Down
59 changes: 23 additions & 36 deletions src/main/java/io/netty/incubator/codec/quic/QuicheQuicChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,6 @@ public void operationComplete(ChannelFuture future) {
private ByteBuffer key;
private CloseData closeData;
private QuicConnectionStats statsAtClose;

private long currentRecvInfoAddress;
private long currentSendInfoAddress;
private InetSocketAddress remote;
private boolean supportsDatagram;
private boolean recvDatagramPending;
Expand Down Expand Up @@ -209,9 +206,7 @@ void attachQuicheConnection(QuicheQuicConnection connection) {
this.traceId = new String(traceId);
}

connection.initInfoAddresses(remote);
currentRecvInfoAddress = connection.recvInfoAddress();
currentSendInfoAddress = connection.sendInfoAddress();
connection.initInfo(remote);

// Setup QLOG if needed.
QLogConfiguration configuration = config.getQLogConfiguration();
Expand Down Expand Up @@ -240,7 +235,7 @@ void attachQuicheConnection(QuicheQuicConnection connection) {

private void connect(Function<QuicChannel, ? extends QuicSslEngine> engineProvider,
long configAddr, int localConnIdLength,
boolean supportsDatagram, long sockaddr) throws Exception {
boolean supportsDatagram, ByteBuffer sockaddrMemory) throws Exception {
assert this.connection == null;
assert this.traceId == null;
assert this.key == null;
Expand All @@ -267,10 +262,11 @@ private void connect(Function<QuicChannel, ? extends QuicSslEngine> engineProvid
ByteBuffer connectId = address.connId.duplicate();
ByteBuf idBuffer = alloc().directBuffer(connectId.remaining()).writeBytes(connectId.duplicate());
try {
int sockaddrLen = SockaddrIn.write(sockaddr, remote);
int sockaddrLen = SockaddrIn.setAddress(sockaddrMemory, remote);
QuicheQuicConnection connection = quicheEngine.createConnection(ssl ->
Quiche.quiche_conn_new_with_tls(Quiche.memoryAddress(idBuffer) + idBuffer.readerIndex(),
idBuffer.readableBytes(), -1, -1, sockaddr, sockaddrLen,
idBuffer.readableBytes(), -1, -1,
Quiche.memoryAddressWithPosition(sockaddrMemory), sockaddrLen,
configAddr, ssl, false));
if (connection == null) {
failConnectPromiseAndThrow(new ConnectException());
Expand Down Expand Up @@ -745,7 +741,7 @@ private int streamSend0(long streamId, ByteBuf buffer, boolean fin) throws Close

private int streamSend(long streamId, ByteBuffer buffer, boolean fin) throws ClosedChannelException {
return Quiche.quiche_conn_stream_send(connectionAddressChecked(), streamId,
Quiche.memoryAddress(buffer) + buffer.position(), buffer.remaining(), fin);
Quiche.memoryAddressWithPosition(buffer), buffer.remaining(), fin);
}

StreamRecvResult streamRecv(long streamId, ByteBuf buffer) throws Exception {
Expand Down Expand Up @@ -891,13 +887,14 @@ private boolean connectionSendSegments(SegmentedDatagramPacketAllocator segmente
ByteBuf out = alloc().directBuffer(bufferSize);
int lastWritten = -1;
for (;;) {
long sendInfo = connection.nextSendInfoAddress(currentSendInfoAddress);
ByteBuffer sendInfo = connection.nextSendInfo();
InetSocketAddress sendToAddress = this.remote;

boolean done;
int writerIndex = out.writerIndex();
int written = Quiche.quiche_conn_send(
connAddr, Quiche.memoryAddress(out) + writerIndex, out.writableBytes(), sendInfo);
connAddr, Quiche.memoryAddress(out) + writerIndex, out.writableBytes(),
Quiche.memoryAddressWithPosition(sendInfo));
if (written == 0) {
// No need to create a new datagram packet. Just try again.
continue;
Expand Down Expand Up @@ -929,14 +926,10 @@ private boolean connectionSendSegments(SegmentedDatagramPacketAllocator segmente

boolean needWriteNow = false;

if (SockaddrIn.cmp(QuicheSendInfo.sockAddress(currentSendInfoAddress),
QuicheSendInfo.sockAddress(sendInfo)) != 0) {
// Update the current address so we can keep track when it change again.
currentSendInfoAddress = sendInfo;

if (connection.isSendInfoChanged()) {
// Change the cached address
InetSocketAddress oldRemote = remote;
remote = QuicheSendInfo.read(sendInfo);
remote = QuicheSendInfo.getAddress(sendInfo);
pipeline().fireUserEventTriggered(
new QuicConnectionMigrationEvent(oldRemote, remote));
needWriteNow = true;
Expand Down Expand Up @@ -1001,11 +994,12 @@ private boolean connectionSendSimple() {
long connAddr = connection.address();
boolean packetWasWritten = false;
for (;;) {
long sendInfo = connection.nextSendInfoAddress(currentSendInfoAddress);
ByteBuffer sendInfo = connection.nextSendInfo();
ByteBuf out = alloc().directBuffer(Quic.MAX_DATAGRAM_SIZE);
int writerIndex = out.writerIndex();
int written = Quiche.quiche_conn_send(
connAddr, Quiche.memoryAddress(out) + writerIndex, out.writableBytes(), sendInfo);
connAddr, Quiche.memoryAddress(out) + writerIndex, out.writableBytes(),
Quiche.memoryAddressWithPosition(sendInfo));

try {
if (Quiche.throwIfError(written)) {
Expand All @@ -1023,14 +1017,10 @@ private boolean connectionSendSimple() {
out.release();
continue;
}
if (SockaddrIn.cmp(QuicheSendInfo.sockAddress(currentSendInfoAddress),
QuicheSendInfo.sockAddress(sendInfo)) != 0) {
// Update the current address so we can keep track when it change again.
currentSendInfoAddress = sendInfo;

if (connection.isSendInfoChanged()) {
// Change the cached address
InetSocketAddress oldRemote = remote;
remote = QuicheSendInfo.read(sendInfo);
remote = QuicheSendInfo.getAddress(sendInfo);
pipeline().fireUserEventTriggered(
new QuicConnectionMigrationEvent(oldRemote, remote));
}
Expand Down Expand Up @@ -1176,15 +1166,13 @@ void connectionRecv(InetSocketAddress sender, ByteBuf buffer) {
int bufferReaderIndex = buffer.readerIndex();
long memoryAddress = Quiche.memoryAddress(buffer) + bufferReaderIndex;

long recvInfoAddress = connection.nextRecvInfoAddress(currentRecvInfoAddress);
QuicheRecvInfo.write(recvInfoAddress, sender);
ByteBuffer recvInfo = connection.nextRecvInfo();
QuicheRecvInfo.setRecvInfo(recvInfo, sender);

SocketAddress oldRemote = remote;

if (SockaddrIn.cmp(QuicheRecvInfo.sockAddress(currentRecvInfoAddress),
QuicheRecvInfo.sockAddress(recvInfoAddress)) != 0) {
if (connection.isRecvInfoChanged()) {
// Update the cached address
currentRecvInfoAddress = recvInfoAddress;
remote = sender;
pipeline().fireUserEventTriggered(
new QuicConnectionMigrationEvent(oldRemote, sender));
Expand All @@ -1194,7 +1182,8 @@ void connectionRecv(InetSocketAddress sender, ByteBuf buffer) {
try {
do {
// Call quiche_conn_recv(...) until we consumed all bytes or we did receive some error.
int res = Quiche.quiche_conn_recv(connAddr, memoryAddress, bufferReadable, recvInfoAddress);
int res = Quiche.quiche_conn_recv(connAddr, memoryAddress, bufferReadable,
Quiche.memoryAddressWithPosition(recvInfo));
boolean done;
try {
done = Quiche.throwIfError(res);
Expand Down Expand Up @@ -1246,8 +1235,6 @@ void connectionRecv(InetSocketAddress sender, ByteBuf buffer) {
}
} while (bufferReadable > 0);
} finally {
// Store for later usage.
currentRecvInfoAddress = recvInfoAddress;
buffer.skipBytes((int) (memoryAddress - Quiche.memoryAddress(buffer)));
if (tmpBuffer != null) {
tmpBuffer.release();
Expand Down Expand Up @@ -1432,11 +1419,11 @@ void finishConnect() {
// TODO: Come up with something better.
static QuicheQuicChannel handleConnect(Function<QuicChannel, ? extends QuicSslEngine> sslEngineProvider,
SocketAddress address, long config, int localConnIdLength,
boolean supportsDatagram, long sockaddr) throws Exception {
boolean supportsDatagram, ByteBuffer sockaddrMemory) throws Exception {
if (address instanceof QuicheQuicChannel.QuicheQuicChannelAddress) {
QuicheQuicChannel.QuicheQuicChannelAddress addr = (QuicheQuicChannel.QuicheQuicChannelAddress) address;
QuicheQuicChannel channel = addr.channel;
channel.connect(sslEngineProvider, config, localConnIdLength, supportsDatagram, sockaddr);
channel.connect(sslEngineProvider, config, localConnIdLength, supportsDatagram, sockaddrMemory);
return channel;
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
final QuicheQuicChannel channel;
try {
channel = QuicheQuicChannel.handleConnect(sslEngineProvider, remoteAddress, config.nativeAddress(),
localConnIdLength, config.isDatagramSupported(), sockaddrMemory.memoryAddress());
localConnIdLength, config.isDatagramSupported(),
sockaddrMemory.internalNioBuffer(0, sockaddrMemory.capacity()));
} catch (Exception e) {
promise.setFailure(e);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
import io.netty.util.ReferenceCounted;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.function.Supplier;

final class QuicheQuicConnection {
private static final int TOTAL_RECV_INFO_SIZE = Quiche.SIZEOF_QUICHE_RECV_INFO + Quiche.SIZEOF_SOCKADDR_STORAGE;
private static final int QUICHE_SEND_INFOS_OFFSET = 2 * TOTAL_RECV_INFO_SIZE;
private final ReferenceCounted refCnt;
private final QuicheQuicSslEngine engine;

Expand All @@ -37,18 +37,35 @@ final class QuicheQuicConnection {
//
// We need to have every stored 2 times as we need to check if the last sockaddr has changed between
// quiche_conn_recv and quiche_conn_send calls. If this happens we know a QUIC connection migration did happen.
private final ByteBuf infoBuffer;
private final ByteBuf recvInfoBuffer;
private final ByteBuf sendInfoBuffer;

private boolean recvInfoFirst = true;
private boolean sendInfoFirst = true;
private final ByteBuffer recvInfoBuffer1;
private final ByteBuffer recvInfoBuffer2;
private final ByteBuffer sendInfoBuffer1;
private final ByteBuffer sendInfoBuffer2;

private long connection;

QuicheQuicConnection(long connection, QuicheQuicSslEngine engine, ReferenceCounted refCnt) {
this.connection = connection;
this.engine = engine;
this.refCnt = refCnt;
// TODO: Maybe cache these per thread as we only use them temporary within a limited scope.
infoBuffer = Quiche.allocateNativeOrder(QUICHE_SEND_INFOS_OFFSET +
2 * Quiche.SIZEOF_QUICHE_SEND_INFO);
recvInfoBuffer = Quiche.allocateNativeOrder(2 * TOTAL_RECV_INFO_SIZE);
sendInfoBuffer = Quiche.allocateNativeOrder(2 * Quiche.SIZEOF_QUICHE_SEND_INFO);

// Let's memset the memory.
infoBuffer.setZero(0, infoBuffer.capacity());
recvInfoBuffer.setZero(0, recvInfoBuffer.capacity());
sendInfoBuffer.setZero(0, sendInfoBuffer.capacity());

recvInfoBuffer1 = recvInfoBuffer.nioBuffer(0, TOTAL_RECV_INFO_SIZE);
recvInfoBuffer2 = recvInfoBuffer.nioBuffer(TOTAL_RECV_INFO_SIZE, TOTAL_RECV_INFO_SIZE);

sendInfoBuffer1 = sendInfoBuffer.nioBuffer(0, Quiche.SIZEOF_QUICHE_SEND_INFO);
sendInfoBuffer2 = sendInfoBuffer.nioBuffer(Quiche.SIZEOF_QUICHE_SEND_INFO, Quiche.SIZEOF_QUICHE_SEND_INFO);
}

void free() {
Expand All @@ -65,7 +82,8 @@ void free() {
}
if (release) {
refCnt.release();
infoBuffer.release();
recvInfoBuffer.release();
sendInfoBuffer.release();
}
}

Expand Down Expand Up @@ -97,43 +115,40 @@ long address() {
return connection;
}

private long sendInfosAddress() {
return infoBuffer.memoryAddress() + QUICHE_SEND_INFOS_OFFSET;
}
void initInfo(InetSocketAddress address) {
assert connection != -1;
assert recvInfoBuffer.refCnt() != 0;
assert sendInfoBuffer.refCnt() != 0;

void initInfoAddresses(InetSocketAddress address) {
// Fill both quiche_recv_info structs with the same address.
QuicheRecvInfo.write(infoBuffer.memoryAddress(), address);
QuicheRecvInfo.write(infoBuffer.memoryAddress() + TOTAL_RECV_INFO_SIZE, address);
QuicheRecvInfo.setRecvInfo(recvInfoBuffer1, address);
QuicheRecvInfo.setRecvInfo(recvInfoBuffer2, address);

// Fill both quiche_send_info structs with the same address.
long sendInfosAddress = sendInfosAddress();
QuicheSendInfo.write(sendInfosAddress, address);
QuicheSendInfo.write(sendInfosAddress + Quiche.SIZEOF_QUICHE_SEND_INFO, address);
QuicheSendInfo.setSendInfo(sendInfoBuffer1, address);
QuicheSendInfo.setSendInfo(sendInfoBuffer2, address);
}

long recvInfoAddress() {
return infoBuffer.memoryAddress();
ByteBuffer nextRecvInfo() {
assert recvInfoBuffer.refCnt() != 0;
recvInfoFirst = !recvInfoFirst;
return recvInfoFirst ? recvInfoBuffer1 : recvInfoBuffer2;
}

long sendInfoAddress() {
return sendInfosAddress();
ByteBuffer nextSendInfo() {
assert sendInfoBuffer.refCnt() != 0;
sendInfoFirst = !sendInfoFirst;
return sendInfoFirst ? sendInfoBuffer1 : sendInfoBuffer2;
}

long nextRecvInfoAddress(long previousRecvInfoAddress) {
long memoryAddress = infoBuffer.memoryAddress();
if (memoryAddress == previousRecvInfoAddress) {
return memoryAddress + TOTAL_RECV_INFO_SIZE;
}
return memoryAddress;
boolean isSendInfoChanged() {
assert sendInfoBuffer.refCnt() != 0;
return !QuicheSendInfo.isSameAddress(sendInfoBuffer1, sendInfoBuffer2);
}

long nextSendInfoAddress(long previousSendInfoAddress) {
long memoryAddress = sendInfosAddress();
if (memoryAddress == previousSendInfoAddress) {
return memoryAddress + Quiche.SIZEOF_QUICHE_SEND_INFO;
}
return memoryAddress;
boolean isRecvInfoChanged() {
assert recvInfoBuffer.refCnt() != 0;
return !QuicheRecvInfo.isSameAddress(recvInfoBuffer1, recvInfoBuffer2);
}

boolean isClosed() {
Expand All @@ -151,5 +166,4 @@ protected void finalize() throws Throwable {
super.finalize();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,10 @@ private QuicheQuicChannel handleServer(ChannelHandlerContext ctx, InetSocketAddr

QuicheQuicSslEngine quicSslEngine = (QuicheQuicSslEngine) engine;
QuicheQuicConnection connection = quicSslEngine.createConnection(ssl -> {
long peerAddr = sockaddrMemory.memoryAddress();
int peerLen = SockaddrIn.write(peerAddr, sender);
return Quiche.quiche_conn_new_with_tls(scidAddr, scidLen, ocidAddr, ocidLen, peerAddr, peerLen,
ByteBuffer peerAddrMemory = sockaddrMemory.internalNioBuffer(0, sockaddrMemory.capacity());
int peerLen = SockaddrIn.setAddress(peerAddrMemory, sender);
return Quiche.quiche_conn_new_with_tls(scidAddr, scidLen, ocidAddr, ocidLen,
Quiche.memoryAddressWithPosition(peerAddrMemory), peerLen,
config.nativeAddress(), ssl, true);
});
if (connection == null) {
Expand Down
Loading

0 comments on commit fdd5be1

Please sign in to comment.