diff --git a/network/network/src/main/java/bisq/network/p2p/node/Connection.java b/network/network/src/main/java/bisq/network/p2p/node/Connection.java index d4dd3ae081..61e3135628 100644 --- a/network/network/src/main/java/bisq/network/p2p/node/Connection.java +++ b/network/network/src/main/java/bisq/network/p2p/node/Connection.java @@ -28,8 +28,6 @@ import javax.annotation.Nullable; import java.io.EOFException; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.net.Socket; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; @@ -69,11 +67,10 @@ public interface Listener { @Getter private final Metrics metrics; - private final Socket socket; + private NetworkEnvelopeSocket networkEnvelopeSocket; + private final Handler handler; private final Set listeners = new CopyOnWriteArraySet<>(); - private OutputStream outputStream; - private InputStream inputStream; @Nullable private Future future; @@ -90,15 +87,13 @@ protected Connection(Socket socket, Metrics metrics, Handler handler, BiConsumer errorHandler) { - this.socket = socket; this.peersCapability = peersCapability; this.peersLoad = peersLoad; this.handler = handler; this.metrics = metrics; try { - outputStream = socket.getOutputStream(); - inputStream = socket.getInputStream(); + this.networkEnvelopeSocket = new NetworkEnvelopeSocket(socket); } catch (IOException exception) { log.error("Could not create objectOutputStream/objectInputStream for socket " + socket, exception); errorHandler.accept(this, exception); @@ -110,7 +105,7 @@ protected Connection(Socket socket, Thread.currentThread().setName("Connection.read-" + getThreadNameId()); try { while (isInputStreamActive()) { - var proto = bisq.network.protobuf.NetworkEnvelope.parseDelimitedFrom(inputStream); + var proto = networkEnvelopeSocket.receiveNextEnvelope(); // parsing might need some time wo we check again if connection is still active if (isInputStreamActive()) { checkNotNull(proto, "Proto from NetworkEnvelope.parseDelimitedFrom(inputStream) must not be null"); @@ -153,10 +148,7 @@ Connection send(NetworkMessage networkMessage, AuthorizationToken authorizationT boolean sent = false; synchronized (writeLock) { try { - bisq.network.protobuf.NetworkEnvelope proto = checkNotNull(networkEnvelope.toProto(), - "networkEnvelope.toProto() must not be null"); - proto.writeDelimitedTo(outputStream); - outputStream.flush(); + networkEnvelopeSocket.send(networkEnvelope); sent = true; } catch (Throwable throwable) { if (!isStopped) { @@ -201,7 +193,7 @@ void close(CloseReason closeReason) { future.cancel(true); } try { - socket.close(); + networkEnvelopeSocket.close(); } catch (IOException ignore) { } NetworkService.DISPATCHER.submit(() -> { @@ -248,7 +240,7 @@ public boolean isRunning() { @Override public String toString() { return "'" + getClass().getSimpleName() + " [peerAddress=" + getPeersCapability().getAddress() + - ", socket=" + socket + + ", socket=" + networkEnvelopeSocket + ", keyId=" + getId() + "]'"; } diff --git a/network/network/src/main/java/bisq/network/p2p/node/ConnectionHandshake.java b/network/network/src/main/java/bisq/network/p2p/node/ConnectionHandshake.java index 7213b97d5f..e74aea0646 100644 --- a/network/network/src/main/java/bisq/network/p2p/node/ConnectionHandshake.java +++ b/network/network/src/main/java/bisq/network/p2p/node/ConnectionHandshake.java @@ -29,10 +29,7 @@ import lombok.extern.slf4j.Slf4j; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.net.Socket; -import java.net.SocketException; import java.util.UUID; /** @@ -44,11 +41,12 @@ public final class ConnectionHandshake { @Getter private final String id = StringUtils.createUid(); - private final Socket socket; private final BanList banList; private final Capability capability; private final AuthorizationService authorizationService; + private NetworkEnvelopeSocket networkEnvelopeSocket; + @Getter @ToString @EqualsAndHashCode @@ -123,7 +121,6 @@ static final class Result { int socketTimeout, Capability capability, AuthorizationService authorizationService) { - this.socket = socket; this.banList = banList; this.capability = capability; this.authorizationService = authorizationService; @@ -132,7 +129,9 @@ static final class Result { // socket.setTcpNoDelay(true); // socket.setSoLinger(true, 100); socket.setSoTimeout(socketTimeout); - } catch (SocketException e) { + + this.networkEnvelopeSocket = new NetworkEnvelopeSocket(socket); + } catch (IOException e) { e.printStackTrace(); } } @@ -141,7 +140,6 @@ static final class Result { Result start(Load myLoad, Address peerAddress) { try { Metrics metrics = new Metrics(); - OutputStream outputStream = socket.getOutputStream(); Request request = new Request(capability, myLoad); AuthorizationToken token = authorizationService.createToken(request, Load.INITIAL_LOAD, @@ -149,13 +147,11 @@ Result start(Load myLoad, Address peerAddress) { 0); NetworkEnvelope requestNetworkEnvelope = new NetworkEnvelope(NetworkEnvelope.VERSION, token, request); long ts = System.currentTimeMillis(); - bisq.network.protobuf.NetworkEnvelope requestProto = requestNetworkEnvelope.toProto(); - requestProto.writeDelimitedTo(outputStream); - outputStream.flush(); + + networkEnvelopeSocket.send(requestNetworkEnvelope); metrics.onSent(requestNetworkEnvelope); - InputStream inputStream = socket.getInputStream(); - bisq.network.protobuf.NetworkEnvelope responseProto = bisq.network.protobuf.NetworkEnvelope.parseDelimitedFrom(inputStream); + bisq.network.protobuf.NetworkEnvelope responseProto = networkEnvelopeSocket.receiveNextEnvelope(); if (responseProto == null) { throw new ConnectionException("Response NetworkEnvelope protobuf is null"); } @@ -191,7 +187,7 @@ Result start(Load myLoad, Address peerAddress) { return new Result(response.getCapability(), response.getLoad(), metrics); } catch (Exception e) { try { - socket.close(); + networkEnvelopeSocket.close(); } catch (IOException ignore) { } if (e instanceof ConnectionException) { @@ -206,8 +202,7 @@ Result start(Load myLoad, Address peerAddress) { Result onSocket(Load myLoad) { try { Metrics metrics = new Metrics(); - InputStream inputStream = socket.getInputStream(); - bisq.network.protobuf.NetworkEnvelope requestProto = bisq.network.protobuf.NetworkEnvelope.parseDelimitedFrom(inputStream); + bisq.network.protobuf.NetworkEnvelope requestProto = networkEnvelopeSocket.receiveNextEnvelope(); if (requestProto == null) { throw new ConnectionException("Request NetworkEnvelope protobuf is null"); } @@ -241,20 +236,17 @@ Result onSocket(Load myLoad) { log.debug("Clients capability {}, load={}", request.getCapability(), request.getLoad()); metrics.onReceived(requestNetworkEnvelope); - OutputStream outputStream = socket.getOutputStream(); Response response = new Response(capability, myLoad); AuthorizationToken token = authorizationService.createToken(response, request.getLoad(), peerAddress.getFullAddress(), 0); NetworkEnvelope responseNetworkEnvelope = new NetworkEnvelope(NetworkEnvelope.VERSION, token, response); - bisq.network.protobuf.NetworkEnvelope responseProto = responseNetworkEnvelope.toProto(); - responseProto.writeDelimitedTo(outputStream); - outputStream.flush(); + networkEnvelopeSocket.send(responseNetworkEnvelope); metrics.onSent(responseNetworkEnvelope); metrics.addRtt(System.currentTimeMillis() - ts); return new Result(request.getCapability(), request.getLoad(), metrics); } catch (Exception e) { try { - socket.close(); + networkEnvelopeSocket.close(); } catch (IOException ignore) { } if (e instanceof ConnectionException) { diff --git a/network/network/src/main/java/bisq/network/p2p/node/NetworkEnvelopeSocket.java b/network/network/src/main/java/bisq/network/p2p/node/NetworkEnvelopeSocket.java new file mode 100644 index 0000000000..1508a84d2e --- /dev/null +++ b/network/network/src/main/java/bisq/network/p2p/node/NetworkEnvelopeSocket.java @@ -0,0 +1,58 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.network.p2p.node; + +import bisq.network.p2p.message.NetworkEnvelope; +import lombok.extern.slf4j.Slf4j; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; + +import static com.google.common.base.Preconditions.checkNotNull; + +@Slf4j +public class NetworkEnvelopeSocket implements Closeable { + private final Socket socket; + private final InputStream inputStream; + private final OutputStream outputStream; + + public NetworkEnvelopeSocket(Socket socket) throws IOException { + this.socket = socket; + this.inputStream = socket.getInputStream(); + this.outputStream = socket.getOutputStream(); + } + + public void send(NetworkEnvelope networkEnvelope) throws IOException { + bisq.network.protobuf.NetworkEnvelope proto = checkNotNull(networkEnvelope.toProto(), + "networkEnvelope.toProto() must not be null"); + proto.writeDelimitedTo(outputStream); + outputStream.flush(); + } + + public bisq.network.protobuf.NetworkEnvelope receiveNextEnvelope() throws IOException { + return bisq.network.protobuf.NetworkEnvelope.parseDelimitedFrom(inputStream); + } + + @Override + public void close() throws IOException { + socket.close(); + } +}