Skip to content

Commit

Permalink
common: FileDescriptorCast: Update blocking state as necessary
Browse files Browse the repository at this point in the history
Previously, casting a FileDescriptor to Socket would not reset its
(cached) blocking state to "blocking" if it was configured non-blocking,
which may lead to unexpected behavior.

Check the "blocking" state upon casting and set it accordingly when a
Socket/DatagramSocket/ServerSocket is required (since they are expected
to be blocking by default).

When casting to a SocketChannel/DatagramChannel/ServerSocketChannel,
adjust the Java-cached "blocking" state to the actual blocking state of
the native socket.

Moreover, add support to declare "we don't know the state" (like in
Windows, which is currently not supported for casting, though), and
ensure we get the cached state and the native state back in sync.

#151
  • Loading branch information
kohlschuetter committed Jan 29, 2024
1 parent 493788a commit 5c51df5
Show file tree
Hide file tree
Showing 8 changed files with 269 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
* @param <A> The supported address type.
*/
public abstract class AFDatagramChannel<A extends AFSocketAddress> extends DatagramChannel
implements AFSomeSocket, AFSocketExtensions {
implements AFSomeSocket, AFSocketExtensions, AFSomeSocketChannel {
private final AFDatagramSocket<A> afSocket;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
* @author Christian Kohlschütter
*/
public abstract class AFServerSocketChannel<A extends AFSocketAddress> extends ServerSocketChannel
implements FileDescriptorAccess {
implements FileDescriptorAccess, AFSomeSocketChannel {
private final @NonNull AFServerSocket<A> afSocket;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
* @author Christian Kohlschütter
*/
public abstract class AFSocketChannel<A extends AFSocketAddress> extends SocketChannel implements
AFSomeSocket, AFSocketExtensions {
AFSomeSocket, AFSocketExtensions, AFSomeSocketChannel {
private final @NonNull AFSocket<A> afSocket;
private final AtomicBoolean connectPending = new AtomicBoolean(false);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* junixsocket
*
* Copyright 2009-2023 Christian Kohlschütter
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.newsclub.net.unix;

import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

/**
* Marker interface that combines junixsocket-based {@link SocketChannel}s, {@link DatagramChannel}s
* or {@link ServerSocketChannel}s.
*
* @author Christian Kohlschütter
* @see AFSocketChannel
* @see AFServerSocketChannel
* @see AFDatagramChannel
*/
public interface AFSomeSocketChannel extends Closeable, FileDescriptorAccess {
/**
* Checks if the channel is configured blocking. The result may be cached, and therefore not
* invoke native code to check if the underlying socket is actually configured that way.
*
* @return {@code true} if blocking.
*/
boolean isBlocking();

/**
* Adjusts this channel's blocking mode.
*
* <p>
* If the given blocking mode is different from the currently cached blocking mode then this
* method native code to change it.
* </p>
*
* @param block {@code true} if blocking is desired.
* @return This channel.
* @throws IOException on error.
*/
SelectableChannel configureBlocking(boolean block) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@
import java.io.InputStream;
import java.lang.ProcessBuilder.Redirect;
import java.net.DatagramSocket;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.channels.DatagramChannel;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.channels.spi.AbstractSelectableChannel;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -66,6 +71,11 @@
* that do not encode this information directly (such as {@link AFUNIXSocketAddress} and
* {@link AFTIPCSocketAddress}).
* </p>
* <p>
* Also note that the "blocking" state of a socket may be forcibly changed to "blocking" when
* performing the cast, especially when casting to {@link Socket}, {@link DatagramSocket} or
* {@link ServerSocket} and any of their subclasses where "blocking" is the expected state.
* </p>
*
* @author Christian Kohlschütter
*/
Expand Down Expand Up @@ -225,21 +235,24 @@ static <A extends AFSocketAddress> void registerCastingProviders(
protected void addProviders() {
addProviders(GLOBAL_PROVIDERS);

final CastingProvider<AFSocket<A>> cpSocket = (fdc, desiredType) -> AFSocket.newInstance(
config.socketConstructor(), (AFSocketFactory<A>) null, fdc.getFileDescriptor(),
fdc.localPort, fdc.remotePort);
final CastingProvider<AFServerSocket<A>> cpServerSocket = (fdc,
desiredType) -> AFServerSocket.newInstance(config.serverSocketConstructor(), fdc
.getFileDescriptor(), fdc.localPort, fdc.remotePort);
final CastingProviderSocketOrChannel<AFSocket<A>> cpSocketOrChannel = (fdc, desiredType,
isChannel) -> reconfigure(isChannel, AFSocket.newInstance(config.socketConstructor(),
(AFSocketFactory<A>) null, fdc.getFileDescriptor(), fdc.localPort, fdc.remotePort));
final CastingProviderSocketOrChannel<AFServerSocket<A>> cpServerSocketOrChannel = (fdc,
desiredType, isChannel) -> reconfigure(isChannel, AFServerSocket.newInstance(config
.serverSocketConstructor(), fdc.getFileDescriptor(), fdc.localPort,
fdc.remotePort));

registerGenericSocketProviders();

addProvider(socketClass, cpSocket);
addProvider(config.serverSocketClass(), cpServerSocket);
addProvider(config.socketChannelClass(), (fdc, desiredType) -> cpSocket.provideAs(fdc,
AFSocket.class).getChannel());
addProvider(config.serverSocketChannelClass(), (fdc, desiredType) -> cpServerSocket
.provideAs(fdc, AFServerSocket.class).getChannel());
addProvider(socketClass, (fdc, desiredType) -> cpSocketOrChannel.provideAs(fdc, desiredType,
false));
addProvider(config.serverSocketClass(), (fdc, desiredType) -> cpServerSocketOrChannel
.provideAs(fdc, desiredType, false));
addProvider(config.socketChannelClass(), (fdc, desiredType) -> cpSocketOrChannel.provideAs(
fdc, AFSocket.class, true).getChannel());
addProvider(config.serverSocketChannelClass(), (fdc, desiredType) -> cpServerSocketOrChannel
.provideAs(fdc, AFServerSocket.class, true).getChannel());
}
});

Expand All @@ -250,15 +263,17 @@ protected void addProviders() {
protected void addProviders() {
addProviders(GLOBAL_PROVIDERS);

final CastingProvider<AFDatagramSocket<A>> cpDatagramSocket = (fdc,
desiredType) -> AFDatagramSocket.newInstance(config.datagramSocketConstructor(), fdc
.getFileDescriptor(), fdc.localPort, fdc.remotePort);
final CastingProviderSocketOrChannel<AFDatagramSocket<A>> cpDatagramSocketOrChannel = (fdc,
desiredType, isChannel) -> reconfigure(isChannel, AFDatagramSocket.newInstance(config
.datagramSocketConstructor(), fdc.getFileDescriptor(), fdc.localPort,
fdc.remotePort));

registerGenericDatagramSocketProviders();

addProvider(datagramSocketClass, cpDatagramSocket);
addProvider(config.datagramChannelClass(), (fdc, desiredType) -> cpDatagramSocket.provideAs(
fdc, AFDatagramSocket.class).getChannel());
addProvider(datagramSocketClass, (fdc, desiredType) -> cpDatagramSocketOrChannel.provideAs(
fdc, desiredType, false));
addProvider(config.datagramChannelClass(), (fdc, desiredType) -> cpDatagramSocketOrChannel
.provideAs(fdc, AFDatagramSocket.class, true).getChannel());
}
});
}
Expand All @@ -275,31 +290,35 @@ protected CastingProviderMap() {
}

protected void registerGenericSocketProviders() {
final CastingProvider<AFSocket<AFGenericSocketAddress>> cpSocketGeneric = (fdc,
desiredType) -> AFSocket.newInstance(AFGenericSocket::new,
(AFSocketFactory<AFGenericSocketAddress>) null, fdc.getFileDescriptor(),
fdc.localPort, fdc.remotePort);
final CastingProvider<AFServerSocket<AFGenericSocketAddress>> cpServerSocketGeneric = (fdc,
desiredType) -> AFServerSocket.newInstance(AFGenericServerSocket::new, fdc
.getFileDescriptor(), fdc.localPort, fdc.remotePort);

addProvider(AFGenericSocket.class, cpSocketGeneric);
addProvider(AFGenericServerSocket.class, cpServerSocketGeneric);
addProvider(AFGenericSocketChannel.class, (fdc, desiredType) -> cpSocketGeneric.provideAs(fdc,
AFSocket.class).getChannel());
addProvider(AFGenericServerSocketChannel.class, (fdc, desiredType) -> cpServerSocketGeneric
.provideAs(fdc, AFServerSocket.class).getChannel());
final CastingProviderSocketOrChannel<AFSocket<AFGenericSocketAddress>> cpSocketOrChannelGeneric =
(fdc, desiredType, isChannel) -> reconfigure(isChannel, AFSocket.newInstance(
AFGenericSocket::new, (AFSocketFactory<AFGenericSocketAddress>) null, fdc
.getFileDescriptor(), fdc.localPort, fdc.remotePort));
final CastingProviderSocketOrChannel<AFServerSocket<AFGenericSocketAddress>> cpServerSocketOrChannelGeneric =
(fdc, desiredType, isChannel) -> reconfigure(isChannel, AFServerSocket.newInstance(
AFGenericServerSocket::new, fdc.getFileDescriptor(), fdc.localPort, fdc.remotePort));

addProvider(AFGenericSocket.class, (fdc, desiredType) -> cpSocketOrChannelGeneric.provideAs(
fdc, desiredType, false));
addProvider(AFGenericServerSocket.class, (fdc, desiredType) -> cpServerSocketOrChannelGeneric
.provideAs(fdc, desiredType, false));
addProvider(AFGenericSocketChannel.class, (fdc, desiredType) -> cpSocketOrChannelGeneric
.provideAs(fdc, AFSocket.class, true).getChannel());
addProvider(AFGenericServerSocketChannel.class, (fdc,
desiredType) -> cpServerSocketOrChannelGeneric.provideAs(fdc, AFServerSocket.class, true)
.getChannel());
}

protected void registerGenericDatagramSocketProviders() {
final CastingProvider<AFDatagramSocket<AFGenericSocketAddress>> cpDatagramSocketGeneric = (
fdc, desiredType) -> AFDatagramSocket.newInstance(AFGenericDatagramSocket::new, fdc
.getFileDescriptor(), fdc.localPort, fdc.remotePort);

addProvider(AFDatagramSocket.class, cpDatagramSocketGeneric);
addProvider(AFDatagramChannel.class, (fdc, desiredType) -> cpDatagramSocketGeneric.provideAs(
fdc, AFDatagramSocket.class).getChannel());

final CastingProviderSocketOrChannel<AFDatagramSocket<AFGenericSocketAddress>> cpDatagramSocketOrChannelGeneric =
(fdc, desiredType, isChannel) -> reconfigure(isChannel, AFDatagramSocket.newInstance(
AFGenericDatagramSocket::new, fdc.getFileDescriptor(), fdc.localPort,
fdc.remotePort));

addProvider(AFDatagramSocket.class, (fdc, desiredType) -> cpDatagramSocketOrChannelGeneric
.provideAs(fdc, desiredType, false));
addProvider(AFDatagramChannel.class, (fdc, desiredType) -> cpDatagramSocketOrChannelGeneric
.provideAs(fdc, AFDatagramSocket.class, true).getChannel());
}

protected abstract void addProviders();
Expand Down Expand Up @@ -340,6 +359,12 @@ private interface CastingProvider<T> {
T provideAs(FileDescriptorCast fdc, Class<? super T> desiredType) throws IOException;
}

@FunctionalInterface
private interface CastingProviderSocketOrChannel<T> {
T provideAs(FileDescriptorCast fdc, Class<? super T> desiredType, boolean isChannel)
throws IOException;
}

/**
* Creates a {@link FileDescriptorCast} using the given file descriptor.
*
Expand Down Expand Up @@ -555,4 +580,103 @@ protected void addProviders() {
}
});
}

@SuppressWarnings("null")
private static <S extends AFSocket<?>> S reconfigure(boolean isChannel, S socket)
throws IOException {
reconfigure(isChannel, socket.getChannel());
return socket;
}

@SuppressWarnings("null")
private static <S extends AFServerSocket<?>> S reconfigure(boolean isChannel, S socket)
throws IOException {
reconfigure(isChannel, socket.getChannel());
return socket;
}

@SuppressWarnings("null")
private static <S extends AFDatagramSocket<?>> S reconfigure(boolean isChannel, S socket)
throws IOException {
reconfigure(isChannel, socket.getChannel());
return socket;
}

/**
* Reconfigures the Java-side of the socket/socket channel such that its state is compatible with
* the native socket's state. This is necessary to properly configure blocking/non-blocking state,
* as that is cached on the Java side.
* <p>
* If {@code isChannel} is false, then we want to cast to a {@link Socket}, {@link DatagramSocket}
* or {@link ServerSocket}, which means blocking I/O is desired. If the underlying native socket
* is configured non-blocking, we need to reset the state to "blocking" accordingly.
* <p>
* If {@code isChannel} is true, then we want to cast to a {@link SocketChannel},
* {@link DatagramChannel} or {@link ServerSocketChannel}, in which case the blocking state should
* be preserved, if possible. It is then up to the user to check blocking state via
* {@link AbstractSelectableChannel#isBlocking()} prior to using the socket.
* <p>
* Note that on Windows, it may be impossible to query the blocking state from an external socket,
* so the state is always forcibly set to "blocking".
*
* @param <S> The type.
* @param isChannel The desired cast type (socket=set to blocking, or channel=preserve state).
* @param socketChannel The channel.
* @throws IOException on error.
*/
private static <@NonNull S extends AFSomeSocketChannel> void reconfigure(boolean isChannel,
S socketChannel) throws IOException {
if (isChannel) {
reconfigureKeepBlockingState(socketChannel);
} else {
reconfigureSetBlocking(socketChannel);
}
}

private static <@NonNull S extends AFSomeSocketChannel> void reconfigureKeepBlockingState(
S socketChannel) throws IOException {
int result = NativeUnixSocket.checkBlocking(socketChannel.getFileDescriptor());

boolean blocking;
switch (result) {
case 0:
blocking = false;
break;
case 1:
blocking = true;
break;
case 2:
// need to reconfigure/forcibly override any cached result -> set to blocking by default
socketChannel.configureBlocking(false);
socketChannel.configureBlocking(true);
return;
default:
throw new OperationNotSupportedSocketException("Invalid blocking state");
}

socketChannel.configureBlocking(blocking);
}

private static <@NonNull S extends AFSomeSocketChannel> void reconfigureSetBlocking(
S socketChannel) throws IOException {
int result = NativeUnixSocket.checkBlocking(socketChannel.getFileDescriptor());

switch (result) {
case 0:
// see below
break;
case 1:
// already blocking, nothing to do
return;
case 2:
// need to reconfigure/forcibly override any cached result -> set to blocking by default
// see below
break;
default:
throw new OperationNotSupportedSocketException("Invalid blocking state");
}

socketChannel.configureBlocking(false);
socketChannel.configureBlocking(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,15 @@ static native boolean initPipe(FileDescriptor source, FileDescriptor sink, boole

static native void configureBlocking(FileDescriptor fd, boolean blocking) throws IOException;

/**
* Checks if the given file descriptor describes a blocking socket.
*
* @param fd The file descriptor to check
* @return 0 = non-blocking, 1 = blocking, 2 = indeterminate (needs reconfiguration)
* @throws IOException on error.
*/
static native int checkBlocking(FileDescriptor fd) throws IOException;

static native void socketPair(int domain, int type, FileDescriptor fd, FileDescriptor fd2);

static native Redirect initRedirect(FileDescriptor fd);
Expand Down
27 changes: 27 additions & 0 deletions junixsocket-native/src/main/c/filedescriptors.c
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,33 @@ JNIEXPORT void JNICALL Java_org_newsclub_net_unix_NativeUnixSocket_shutdown
}
}

/*
* Class: org_newsclub_net_unix_NativeUnixSocket
* Method: checkBlocking
* Signature: (Ljava/io/FileDescriptor;)I
*/
JNIEXPORT jint JNICALL Java_org_newsclub_net_unix_NativeUnixSocket_checkBlocking
(JNIEnv *env, jclass clazz CK_UNUSED, jobject fd) {
int handle = _getFD(env, fd);
#if defined(_WIN32)
CK_ARGUMENT_POTENTIALLY_UNUSED(handle);
// Windows doesn't provide current API to check for blocking state
return 2; // "indeterminate; needs re-configure"
#else
int flags = fcntl(handle, F_GETFL);
if(flags == -1) {
_throwErrnumException(env, socket_errno, NULL);
return -1;
}

if((flags & O_NONBLOCK) != 0) {
return 0; // "non-blocking"
} else {
return 1; // "blocking"
}
#endif
}

/*
* Class: org_newsclub_net_unix_NativeUnixSocket
* Method: configureBlocking
Expand Down
Loading

0 comments on commit 5c51df5

Please sign in to comment.