Skip to content

Commit

Permalink
Merge pull request #618 from alvasw/create_network_envelope_socket
Browse files Browse the repository at this point in the history
Hide socket in NetworkEnvelopeSocket
  • Loading branch information
alvasw authored Dec 31, 2022
2 parents e5c16e6 + 5476528 commit 009508e
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import javax.annotation.Nullable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
Expand Down Expand Up @@ -69,11 +67,10 @@ public interface Listener {
@Getter
private final Metrics metrics;

private final Socket socket;
private NetworkEnvelopeSocket networkEnvelopeSocket;

private final Handler handler;
private final Set<Listener> listeners = new CopyOnWriteArraySet<>();
private OutputStream outputStream;
private InputStream inputStream;
@Nullable
private Future<?> future;

Expand All @@ -90,15 +87,13 @@ protected Connection(Socket socket,
Metrics metrics,
Handler handler,
BiConsumer<Connection, Exception> errorHandler) {
this.socket = socket;
this.peersCapability = peersCapability;
this.peersLoad = peersLoad;
this.handler = handler;
this.metrics = metrics;

try {
outputStream = socket.getOutputStream();
inputStream = socket.getInputStream();
this.networkEnvelopeSocket = new NetworkEnvelopeSocket(socket);
} catch (IOException exception) {
log.error("Could not create objectOutputStream/objectInputStream for socket " + socket, exception);
errorHandler.accept(this, exception);
Expand All @@ -110,7 +105,7 @@ protected Connection(Socket socket,
Thread.currentThread().setName("Connection.read-" + getThreadNameId());
try {
while (isInputStreamActive()) {
var proto = bisq.network.protobuf.NetworkEnvelope.parseDelimitedFrom(inputStream);
var proto = networkEnvelopeSocket.receiveNextEnvelope();
// parsing might need some time wo we check again if connection is still active
if (isInputStreamActive()) {
checkNotNull(proto, "Proto from NetworkEnvelope.parseDelimitedFrom(inputStream) must not be null");
Expand Down Expand Up @@ -153,10 +148,7 @@ Connection send(NetworkMessage networkMessage, AuthorizationToken authorizationT
boolean sent = false;
synchronized (writeLock) {
try {
bisq.network.protobuf.NetworkEnvelope proto = checkNotNull(networkEnvelope.toProto(),
"networkEnvelope.toProto() must not be null");
proto.writeDelimitedTo(outputStream);
outputStream.flush();
networkEnvelopeSocket.send(networkEnvelope);
sent = true;
} catch (Throwable throwable) {
if (!isStopped) {
Expand Down Expand Up @@ -201,7 +193,7 @@ void close(CloseReason closeReason) {
future.cancel(true);
}
try {
socket.close();
networkEnvelopeSocket.close();
} catch (IOException ignore) {
}
NetworkService.DISPATCHER.submit(() -> {
Expand Down Expand Up @@ -248,7 +240,7 @@ public boolean isRunning() {
@Override
public String toString() {
return "'" + getClass().getSimpleName() + " [peerAddress=" + getPeersCapability().getAddress() +
", socket=" + socket +
", socket=" + networkEnvelopeSocket +
", keyId=" + getId() + "]'";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketException;
import java.util.UUID;

/**
Expand All @@ -44,11 +41,12 @@
public final class ConnectionHandshake {
@Getter
private final String id = StringUtils.createUid();
private final Socket socket;
private final BanList banList;
private final Capability capability;
private final AuthorizationService authorizationService;

private NetworkEnvelopeSocket networkEnvelopeSocket;

@Getter
@ToString
@EqualsAndHashCode
Expand Down Expand Up @@ -123,7 +121,6 @@ static final class Result {
int socketTimeout,
Capability capability,
AuthorizationService authorizationService) {
this.socket = socket;
this.banList = banList;
this.capability = capability;
this.authorizationService = authorizationService;
Expand All @@ -132,7 +129,9 @@ static final class Result {
// socket.setTcpNoDelay(true);
// socket.setSoLinger(true, 100);
socket.setSoTimeout(socketTimeout);
} catch (SocketException e) {

this.networkEnvelopeSocket = new NetworkEnvelopeSocket(socket);
} catch (IOException e) {
e.printStackTrace();
}
}
Expand All @@ -141,21 +140,18 @@ static final class Result {
Result start(Load myLoad, Address peerAddress) {
try {
Metrics metrics = new Metrics();
OutputStream outputStream = socket.getOutputStream();
Request request = new Request(capability, myLoad);
AuthorizationToken token = authorizationService.createToken(request,
Load.INITIAL_LOAD,
peerAddress.getFullAddress(),
0);
NetworkEnvelope requestNetworkEnvelope = new NetworkEnvelope(NetworkEnvelope.VERSION, token, request);
long ts = System.currentTimeMillis();
bisq.network.protobuf.NetworkEnvelope requestProto = requestNetworkEnvelope.toProto();
requestProto.writeDelimitedTo(outputStream);
outputStream.flush();

networkEnvelopeSocket.send(requestNetworkEnvelope);
metrics.onSent(requestNetworkEnvelope);

InputStream inputStream = socket.getInputStream();
bisq.network.protobuf.NetworkEnvelope responseProto = bisq.network.protobuf.NetworkEnvelope.parseDelimitedFrom(inputStream);
bisq.network.protobuf.NetworkEnvelope responseProto = networkEnvelopeSocket.receiveNextEnvelope();
if (responseProto == null) {
throw new ConnectionException("Response NetworkEnvelope protobuf is null");
}
Expand Down Expand Up @@ -191,7 +187,7 @@ Result start(Load myLoad, Address peerAddress) {
return new Result(response.getCapability(), response.getLoad(), metrics);
} catch (Exception e) {
try {
socket.close();
networkEnvelopeSocket.close();
} catch (IOException ignore) {
}
if (e instanceof ConnectionException) {
Expand All @@ -206,8 +202,7 @@ Result start(Load myLoad, Address peerAddress) {
Result onSocket(Load myLoad) {
try {
Metrics metrics = new Metrics();
InputStream inputStream = socket.getInputStream();
bisq.network.protobuf.NetworkEnvelope requestProto = bisq.network.protobuf.NetworkEnvelope.parseDelimitedFrom(inputStream);
bisq.network.protobuf.NetworkEnvelope requestProto = networkEnvelopeSocket.receiveNextEnvelope();
if (requestProto == null) {
throw new ConnectionException("Request NetworkEnvelope protobuf is null");
}
Expand Down Expand Up @@ -241,20 +236,17 @@ Result onSocket(Load myLoad) {
log.debug("Clients capability {}, load={}", request.getCapability(), request.getLoad());
metrics.onReceived(requestNetworkEnvelope);

OutputStream outputStream = socket.getOutputStream();
Response response = new Response(capability, myLoad);
AuthorizationToken token = authorizationService.createToken(response, request.getLoad(), peerAddress.getFullAddress(), 0);
NetworkEnvelope responseNetworkEnvelope = new NetworkEnvelope(NetworkEnvelope.VERSION, token, response);
bisq.network.protobuf.NetworkEnvelope responseProto = responseNetworkEnvelope.toProto();
responseProto.writeDelimitedTo(outputStream);
outputStream.flush();
networkEnvelopeSocket.send(responseNetworkEnvelope);

metrics.onSent(responseNetworkEnvelope);
metrics.addRtt(System.currentTimeMillis() - ts);
return new Result(request.getCapability(), request.getLoad(), metrics);
} catch (Exception e) {
try {
socket.close();
networkEnvelopeSocket.close();
} catch (IOException ignore) {
}
if (e instanceof ConnectionException) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/

package bisq.network.p2p.node;

import bisq.network.p2p.message.NetworkEnvelope;
import lombok.extern.slf4j.Slf4j;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

import static com.google.common.base.Preconditions.checkNotNull;

@Slf4j
public class NetworkEnvelopeSocket implements Closeable {
private final Socket socket;
private final InputStream inputStream;
private final OutputStream outputStream;

public NetworkEnvelopeSocket(Socket socket) throws IOException {
this.socket = socket;
this.inputStream = socket.getInputStream();
this.outputStream = socket.getOutputStream();
}

public void send(NetworkEnvelope networkEnvelope) throws IOException {
bisq.network.protobuf.NetworkEnvelope proto = checkNotNull(networkEnvelope.toProto(),
"networkEnvelope.toProto() must not be null");
proto.writeDelimitedTo(outputStream);
outputStream.flush();
}

public bisq.network.protobuf.NetworkEnvelope receiveNextEnvelope() throws IOException {
return bisq.network.protobuf.NetworkEnvelope.parseDelimitedFrom(inputStream);
}

@Override
public void close() throws IOException {
socket.close();
}
}

0 comments on commit 009508e

Please sign in to comment.