Skip to content

Commit

Permalink
common: Rework, centralizde InterruptibleChannel logic
Browse files Browse the repository at this point in the history
Make the begin/end/handle-interrupt logic for Channels a little bit more
concise.

#158
  • Loading branch information
kohlschuetter committed Jun 30, 2024
1 parent b33b2e3 commit 4cef55e
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,47 @@ public final AFDatagramChannel<A> disconnect() throws IOException {

@Override
public final A receive(ByteBuffer dst) throws IOException {
return afSocket.getAFImpl().receive(dst);
boolean complete = false;
try {
begin();
A ret = afSocket.getAFImpl().receive(dst);
complete = true;
return ret;
} catch (IOException e) {
throw InterruptibleChannelUtil.handleException(this, e);
} finally {
InterruptibleChannelUtil.endInterruptable(this, this::end, complete);
}
}

@Override
public final int send(ByteBuffer src, SocketAddress target) throws IOException {
return afSocket.getAFImpl().send(src, target);
boolean complete = false;
try {
begin();
int ret = afSocket.getAFImpl().send(src, target);
complete = true;
return ret;
} catch (IOException e) {
throw InterruptibleChannelUtil.handleException(this, e);
} finally {
InterruptibleChannelUtil.endInterruptable(this, this::end, complete);
}
}

@Override
public final int read(ByteBuffer dst) throws IOException {
return afSocket.getAFImpl().read(dst, null);
boolean complete = false;
try {
begin();
int ret = afSocket.getAFImpl().read(dst, null);
complete = true;
return ret;
} catch (IOException e) {
throw InterruptibleChannelUtil.handleException(this, e);
} finally {
InterruptibleChannelUtil.endInterruptable(this, this::end, complete);
}
}

@Override
Expand All @@ -166,7 +196,17 @@ public final long read(ByteBuffer[] dsts, int offset, int length) throws IOExcep

@Override
public final int write(ByteBuffer src) throws IOException {
return afSocket.getAFImpl().write(src);
boolean complete = false;
try {
begin();
int ret = afSocket.getAFImpl().write(src);
complete = true;
return ret;
} catch (IOException e) {
throw InterruptibleChannelUtil.handleException(this, e);
} finally {
InterruptibleChannelUtil.endInterruptable(this, this::end, complete);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.net.SocketOption;
import java.nio.channels.ClosedByInterruptException;
import java.net.StandardProtocolFamily;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Objects;
Expand Down Expand Up @@ -110,7 +108,6 @@ public final AFServerSocket<A> socket() {
return afSocket;
}

@SuppressWarnings("null")
@Override
public AFSocketChannel<A> accept() throws IOException {
boolean complete = false;
Expand All @@ -119,37 +116,13 @@ public AFSocketChannel<A> accept() throws IOException {
AFSocket<A> socket = afSocket.accept1(false);
complete = true;
return socket == null ? null : socket.getChannel();
} catch (SocketClosedByInterruptException e) {
throw closeAndThrow(e.asClosedByInterruptException()); // NOPMD.PreserveStackTrace
} catch (BrokenPipeSocketException | ClosedChannelException e) { // NOPMD.ExceptionAsFlowControl
if (Thread.currentThread().isInterrupted()) {
throw closeAndThrow((ClosedByInterruptException) new ClosedByInterruptException().initCause(
e));
} else if (e instanceof BrokenPipeSocketException) { // NOPMD.AvoidInstanceofChecksInCatchClause
throw closeAndThrow((ClosedChannelException) new ClosedChannelException().initCause(e));
} else {
throw closeAndThrow(e);
}
} finally { // NOPMD.DoNotThrowExceptionInFinally
try {
end(complete);
} catch (ClosedByInterruptException e) {
throw closeAndThrow(e);
}
} catch (IOException e) {
throw InterruptibleChannelUtil.handleException(this, e);
} finally {
InterruptibleChannelUtil.endInterruptable(this, this::end, complete);
}
}

private <T extends Exception> T closeAndThrow(@NonNull T exc) {
if (isOpen()) {
try {
close();
} catch (IOException e2) {
exc.addSuppressed(e2);
}
}
return exc;
}

@Override
public final @Nullable A getLocalAddress() {
return getLocalSocketAddress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import java.net.SocketOption;
import java.net.StandardProtocolFamily;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Objects;
Expand Down Expand Up @@ -199,7 +197,6 @@ public final boolean isConnectionPending() {
return connectPending.get();
}

@SuppressWarnings("null")
@Override
public final boolean connect(SocketAddress remote) throws IOException {
boolean complete = false;
Expand All @@ -211,25 +208,10 @@ public final boolean connect(SocketAddress remote) throws IOException {
}
complete = true;
return connected;
} catch (SocketClosedByInterruptException e) {
throw closeAndThrow(e.asClosedByInterruptException()); // NOPMD.PreserveStackTrace
} catch (BrokenPipeSocketException | ClosedChannelException e) { // NOPMD.ExceptionAsFlowControl
if (Thread.currentThread().isInterrupted()) {
throw closeAndThrow((ClosedByInterruptException) new ClosedByInterruptException().initCause(
e));
} else if (e instanceof BrokenPipeSocketException) { // NOPMD.AvoidInstanceofChecksInCatchClause
throw closeAndThrow((ClosedChannelException) new ClosedChannelException().initCause(e));
} else {
throw closeAndThrow(e);
}
} catch (SocketClosedException e) {
throw closeAndThrow(e);
} catch (IOException e) {
throw InterruptibleChannelUtil.handleException(this, e);
} finally { // NOPMD.DoNotThrowExceptionInFinally
try {
end(complete);
} catch (ClosedByInterruptException e) {
throw closeAndThrow(e);
}
InterruptibleChannelUtil.endInterruptable(this, this::end, complete);
}
}

Expand All @@ -241,12 +223,21 @@ public final boolean finishConnect() throws IOException {
return false;
}

boolean connected = NativeUnixSocket.finishConnect(afSocket.getFileDescriptor())
|| isConnected();
if (connected) {
connectPending.set(false);
boolean complete = false;
try {
begin();
boolean connected = NativeUnixSocket.finishConnect(afSocket.getFileDescriptor())
|| isConnected();
if (connected) {
connectPending.set(false);
}
complete = true;
return connected;
} catch (IOException e) {
throw InterruptibleChannelUtil.handleException(this, e);
} finally {
InterruptibleChannelUtil.endInterruptable(this, this::end, complete);
}
return connected;
}

@Override
Expand All @@ -259,45 +250,19 @@ public final A getRemoteSocketAddress() {
return afSocket.getRemoteSocketAddress();
}

@SuppressWarnings("null")
@Override
public final int read(ByteBuffer dst) throws IOException {
boolean completed = false;
boolean complete = false;
try {
begin();
int read = afSocket.getAFImpl().read(dst, null);
completed = true;
complete = true;
return read;
} catch (SocketClosedByInterruptException e) {
throw closeAndThrow(e.asClosedByInterruptException()); // NOPMD.PreserveStackTrace
} catch (BrokenPipeSocketException | ClosedChannelException e) {
if (Thread.currentThread().isInterrupted()) {
throw closeAndThrow((ClosedByInterruptException) new ClosedByInterruptException().initCause(
e));
} else if (e instanceof BrokenPipeSocketException) { // NOPMD.AvoidInstanceofChecksInCatchClause
throw closeAndThrow((ClosedChannelException) new ClosedChannelException().initCause(e));
} else {
throw closeAndThrow(e);
}
} finally {
end(completed);
}
}

private <T extends Exception> T closeAndThrow(@NonNull T exc) {
if (isOpen()) {
try {
close();
} catch (IOException e2) {
exc.addSuppressed(e2);
}
try {
getAFSocket().close();
} catch (IOException e2) {
exc.addSuppressed(e2);
}
} catch (IOException e) {
throw InterruptibleChannelUtil.handleException(this, e);
} finally { // NOPMD.DoNotThrowExceptionInFinally
InterruptibleChannelUtil.endInterruptable(this, this::end, complete);
}
return exc;
}

@Override
Expand All @@ -318,28 +283,18 @@ public final long write(ByteBuffer[] srcs, int offset, int length) throws IOExce
return write(srcs[offset]);
}

@SuppressWarnings("null")
@Override
public final int write(ByteBuffer src) throws IOException {
boolean completed = false;
boolean complete = false;
try {
begin();
int written = afSocket.getAFImpl().write(src);
completed = true;
complete = true;
return written;
} catch (SocketClosedByInterruptException e) {
throw closeAndThrow(e.asClosedByInterruptException()); // NOPMD.PreserveStackTrace
} catch (BrokenPipeSocketException | ClosedChannelException e) {
if (Thread.currentThread().isInterrupted()) {
throw closeAndThrow((ClosedByInterruptException) new ClosedByInterruptException().initCause(
e));
} else if (e instanceof BrokenPipeSocketException) { // NOPMD.AvoidInstanceofChecksInCatchClause
throw closeAndThrow((ClosedChannelException) new ClosedChannelException().initCause(e));
} else {
throw closeAndThrow(e);
}
} finally {
end(completed);
} catch (IOException e) {
throw InterruptibleChannelUtil.handleException(this, e);
} finally { // NOPMD.DoNotThrowExceptionInFinally
InterruptibleChannelUtil.endInterruptable(this, this::end, complete);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* junixsocket
*
* Copyright 2009-2024 Christian Kohlschütter
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.newsclub.net.unix;

import java.io.IOException;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.spi.AbstractInterruptibleChannel;

import org.eclipse.jdt.annotation.NonNull;

/**
* Helper methods when working with {@link AbstractInterruptibleChannel} subclasses.
*
* @author Christian Kohlschütter
*/
final class InterruptibleChannelUtil {
/**
* Reference to the protected {@code AbstractInterruptibleChannel#end(boolean)} method.
*/
@FunctionalInterface
interface EndMethod {
void end(boolean completed) throws AsynchronousCloseException;
}

/**
* Wrapper method that calls {@code AbstractInterruptibleChannel#end(boolean)}, making sure the
* socket is closed and the {@link Thread#interrupted()} state is set correctly upon error.
*
* @param channel The channel.
* @param end The reference to the protected {@code AbstractInterruptibleChannel#end(boolean)}
* method.
* @param complete {@code true} if the block started with {@code begin} succeeded without an
* exception.
* @throws AsynchronousCloseException on error.
*/
static void endInterruptable(AbstractInterruptibleChannel channel, EndMethod end,
boolean complete) throws AsynchronousCloseException {
try {
end.end(complete);
} catch (AsynchronousCloseException e) {
throw closeAndThrow(channel, e);
}
}

private static <T extends Exception> T closeAndThrow(AbstractInterruptibleChannel channel,
@NonNull T exc) {
if (channel.isOpen()) {
try {
channel.close();
} catch (IOException e2) {
exc.addSuppressed(e2);
}
}
return exc;
}

/**
* Makes sure that upon an exception that is documented to have the channel be closed the channel
* is indeed closed before throwing that exception. If the exception is also documented to have
* the "Thread interrupted" state be set, make sure that this state is actually set as well.
*
* @param channel The channel to work with.
* @param e The exception
* @return The exception.
*/
static IOException handleException(AbstractInterruptibleChannel channel, IOException e) {
if (e instanceof SocketClosedException || e instanceof ClosedChannelException
|| e instanceof BrokenPipeSocketException) {
if (e instanceof SocketClosedByInterruptException
|| e instanceof ClosedByInterruptException) {
Thread t = Thread.currentThread();
if (!t.isInterrupted()) {
t.interrupt();
}
}
return closeAndThrow(channel, e);
} else {
return e;
}
}
}

0 comments on commit 4cef55e

Please sign in to comment.