Skip to content

Commit

Permalink
https://github.com/advantageous/qbit/issues/723
Browse files Browse the repository at this point in the history
  • Loading branch information
MammatusPlatypus committed May 12, 2016
1 parent 9df4fa6 commit fce7f6f
Show file tree
Hide file tree
Showing 11 changed files with 155 additions and 47 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

ext {
projectVersion = '1.8.3'
projectVersion = '1.8.4'
boonVersion = '0.5.7'
boonGroup = "io.advantageous.boon"
springFrameworkVersion = '4.2.5.RELEASE'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@
import java.lang.ref.WeakReference;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.net.ConnectException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import static io.advantageous.boon.core.Exceptions.die;
import static io.advantageous.boon.core.Str.sputs;
Expand Down Expand Up @@ -184,13 +186,16 @@ public void flush() {
httpServerProxy.flush();
}


/**
* Sends a message over websocket.
*
* @param message message to sendText over WebSocket
* @param serviceName message to sendText over WebSocket
*/
private void send(final String serviceName, final String message) {
private void send(final String serviceName, final String message, final Consumer<Exception> exceptionConsumer) {



if (webSocket == null) {
String webSocketURI;
Expand All @@ -207,8 +212,8 @@ private void send(final String serviceName, final String message) {
if (webSocket.isClosed() && connected()) {
this.webSocket.openAndNotify(netSocket -> {
connected.set(true);
webSocket.sendText(message);
});
webSocket.sendText(message, exceptionConsumer);
}, exceptionConsumer);
} else {
webSocket.sendText(message);
}
Expand All @@ -220,9 +225,15 @@ private void send(final String serviceName, final String message) {

private void wireWebSocket(final String serviceName, final String message) {

this.webSocket.setErrorConsumer(error ->
logger.error(sputs(this.getClass().getName(),
"::Exception calling WebSocket from client proxy", "\nService Name", serviceName, "\nMessage", message), error));
this.webSocket.setErrorConsumer(error -> {

if (error instanceof ConnectException) {
connected.set(false);
}
logger.error(sputs(this.getClass().getName(),
"::Exception calling WebSocket from client proxy",
"\nService Name", serviceName, "\nMessage", message), error);
});

//noinspection Convert2MethodRef
this.webSocket.setTextMessageConsumer(messageFromServer -> handleWebSocketReplyMessage(messageFromServer));
Expand Down Expand Up @@ -266,8 +277,8 @@ public <T> T createProxy(final Class<T> serviceInterface, final String serviceNa
final Sender<String> sender = new Sender<String>() {

@Override
public void send(String returnAddress, String buffer) {
BoonClient.this.send(serviceName, buffer);
public void send(String returnAddress, String buffer, Consumer<Exception> exceptionConsumer) {
BoonClient.this.send(serviceName, buffer, exceptionConsumer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import io.advantageous.qbit.network.NetworkSender;

import java.util.function.Consumer;

/**
* created by rhightower on 10/22/14.
*
Expand All @@ -31,4 +33,9 @@ public interface WebSocketSender extends NetworkSender {
default void openWebSocket(WebSocket webSocket) {
open(webSocket);
}


default void openWebSocket(WebSocket webSocket, Consumer<Exception> exceptionConsumer) {
open(webSocket, exceptionConsumer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
* created by rhightower on 2/14/15.
*/
public interface NetSocket {
Consumer<Exception> noOpErrorHandler = e -> {
};

String remoteAddress();

Expand All @@ -41,9 +43,18 @@ public interface NetSocket {

void onError(Exception exception);

void sendText(String string);
void sendText(String string, Consumer<Exception> errorHandler);

void sendBinary(byte[] bytes);

default void sendText(String string) {
sendText(string, noOpErrorHandler);
}

void sendBinary(byte[] bytes, Consumer<Exception> errorHandler);

default void sendBinary(byte[] bytes) {
sendBinary(bytes, noOpErrorHandler);
}

boolean isClosed();

Expand All @@ -63,11 +74,20 @@ public interface NetSocket {

void close();

void open();
void open(Consumer<Exception> exceptionConsumer);

default void open() {
open(noOpErrorHandler);
}

void openAndWait();

void openAndNotify(final Consumer<NetSocket> openConsumer);
void openAndNotify(final Consumer<NetSocket> openConsumer, Consumer<Exception> exceptionConsumer);


default void openAndNotify(final Consumer<NetSocket> openConsumer) {
openAndNotify(openConsumer, noOpErrorHandler);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@

package io.advantageous.qbit.network;

import java.util.function.Consumer;

/**
* WebSocket like thing that receives messages.
* Could be mapped to non-websocket implementations.
* created by rhightower on 2/14/15.
*/
public interface NetworkSender {


Consumer<Exception> noOpErrorHandler = e -> {
};

void sendText(String message);

default void sendBytes(byte[] message) {
Expand All @@ -35,5 +41,11 @@ default void close() {
}

default void open(NetSocket netSocket) {
open(netSocket, noOpErrorHandler);
}


default void open(NetSocket netSocket, Consumer<Exception> exceptionConsumer) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,22 @@ public void onError(Exception exception) {
}

@Override
public void sendText(String text) {
public void sendText(final String text, final Consumer<Exception> exceptionConsumer) {
try {
networkSender.sendText(text);
} catch (Exception ex) {
onError(ex);
exceptionConsumer.accept(ex);
}
}

@Override
public void sendBinary(byte[] bytes) {
public void sendBinary(final byte[] bytes, final Consumer<Exception> exceptionConsumer) {
try {
networkSender.sendBytes(bytes);
} catch (Exception ex) {
onError(ex);
exceptionConsumer.accept(ex);
}
}

Expand Down Expand Up @@ -172,26 +174,28 @@ public void close() {
}

@Override
public void open() {
public void open(Consumer<Exception> exceptionConsumer) {
try {
networkSender.open(this);
networkSender.open(this, exceptionConsumer);
} catch (Exception ex) {
exceptionConsumer.accept(ex);
onError(ex);
}
}


@Override
public void openAndNotify(Consumer<NetSocket> openConsumer) {
public void openAndNotify(Consumer<NetSocket> openConsumer, Consumer<Exception> exceptionConsumer) {

this.setOpenConsumer(aVoid -> openConsumer.accept(this));
open();
open(exceptionConsumer);
}

@Override
public void openAndWait() {

open();
open(e -> {
});
/* Try to open for three seconds. */
int count = 5;
while (!open.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import io.advantageous.qbit.service.Stoppable;

import java.util.function.Consumer;

/**
* created by Richard on 10/1/14.
* This could be a TCP/IP connection, a websocket, an HTTP long poll, etc.
Expand All @@ -31,6 +33,7 @@
*/
public interface Sender<T> extends Stoppable {

void send(String returnAddress, T buffer);
void send(String returnAddress, T buffer, Consumer<Exception> exceptionConsumer);

//add a close here
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,34 +89,30 @@ public void call(List<MethodCall<Object>> methodCalls) {

if (methodCalls.size() > 0) {
String returnAddress = methodCalls.get(0).returnAddress();
sender.send(returnAddress, encoder.encodeMethodCalls(methodCalls.get(0).returnAddress(), methodCalls));
sender.send(returnAddress, encoder.encodeMethodCalls(methodCalls.get(0).returnAddress(), methodCalls), e -> {

methodCalls.forEach(mc -> {
if (mc.callback() != null) {
mc.callback().onError(e);
}
});
});
}
}


@Override
public void flush() {


MethodCall<Object> method = methodCalls.poll();

if (method != null) {

List<MethodCall<Object>> methods;


methods = new ArrayList<>(methodCalls.size());


while (method != null) {
methods.add(method);
method = methodCalls.poll();
}

sender.send((methods.get(0)).returnAddress(), encoder.encodeMethodCalls(methods.get(0).returnAddress(), methods));

call(methods);
}

}

public void stop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ public void sendText(String message) {
}

@Override
public void openWebSocket(WebSocket webSocket) {
public void openWebSocket(WebSocket webSocket, Consumer<Exception> exceptionConsumer) {


final Buffer[] bufferRef = new Buffer[1];
Expand Down Expand Up @@ -428,30 +428,42 @@ public void openWebSocket(WebSocket webSocket) {
vertxWebSocket.closeHandler(event -> webSocket.onClose());

/* Handle on Exception. */
vertxWebSocket.exceptionHandler(event -> {

if (event instanceof ConnectException) {

logger.error("Unable to connect to " + host + " port " + port, event);
vertxWebSocket.exceptionHandler(error -> {
if (error instanceof ConnectException) {
logger.error("Unable to connect to " + host + " port " + port, error);
closed.set(true);
}

if (event instanceof Exception) {
webSocket.onError((Exception) event);
if (error instanceof Exception) {
exceptionConsumer.accept((Exception) error);
webSocket.onError((Exception) error);
} else {
webSocket.onError(new Exception(event));

exceptionConsumer.accept(new Exception(error));
webSocket.onError(new Exception(error));
}
});

/* Handle onOpen. */
webSocket.onOpen();

}, error -> {
if (error instanceof ConnectException) {
logger.error("Unable to connect to " + host + " port " + port, error);
closed.set(true);
}
if (error instanceof Exception) {
exceptionConsumer.accept((Exception) error);
webSocket.onError((Exception) error);
} else {
exceptionConsumer.accept(new Exception(error));
webSocket.onError(new Exception(error));
}
});
}

@Override
public void open(NetSocket netSocket) {
openWebSocket((WebSocket) netSocket);
public void open(NetSocket netSocket, Consumer<Exception> ex) {
openWebSocket((WebSocket) netSocket, ex);
}

@Override
Expand Down Expand Up @@ -544,6 +556,7 @@ private void connect(final Runnable runnable) {

httpClient = vertx.createHttpClient(httpClientOptions);


if (debug) logger.debug("HTTP CLIENT: connect:: \nhost {} \nport {}\n", host, port);
closed.set(false);
Sys.sleep(100);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ public void testWebSocket() {
}
);

webSocket.open();
webSocket.open(e -> {

});


client.flush();
Expand Down
Loading

0 comments on commit fce7f6f

Please sign in to comment.