Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add socket read timeout implementation #15261

Merged
merged 11 commits into from
May 13, 2019
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ All notable changes to this project will be documented in this file. When sendin

### Standard Libraries

-
- [Add read timeout support for TCP and UDP sockets](https://github.com/ballerina-platform/ballerina-lang/pull/15261)
-
-
-
Expand Down
8 changes: 6 additions & 2 deletions stdlib/socket/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,22 @@ dependencies {
baloCreat project(':lib-creator-milestone')
// implementation project(':ballerina-lang')
implementation project(':ballerina-core')
// implementation project(':lib-creator')
implementation project(':lib-creator')

testCompile project(':ballerina-launcher')
testCompile project(':ballerina-runtime-api')
testCompile project(':ballerina-builtin')
testCompile project(':ballerina-utils')
testCompile project(':ballerina-io')
testCompile project(':ballerina-log-api')
testCompile 'org.testng:testng'
testCompile 'org.awaitility:awaitility'
testCompile 'org.slf4j:slf4j-jdk14'

baloImplementation project(path: ':ballerina-builtin', configuration: 'baloImplementation')
baloImplementation project(path: ':ballerina-runtime-api', configuration: 'baloImplementation')
baloImplementation project(path: ':ballerina-io', configuration: 'baloImplementation')
baloImplementation project(path: ':ballerina-log-api', configuration: 'baloImplementation')
baloImplementation project(path: ':ballerina-utils', configuration: 'baloImplementation')
}

Expand All @@ -51,4 +53,6 @@ test {
useTestNG() {
suites 'src/test/resources/testng.xml'
}
systemProperty "java.util.logging.config.file", "src/test/resources/logging.properties"
systemProperty "java.util.logging.manager", "org.ballerinalang.logging.BLogManager"
}
12 changes: 12 additions & 0 deletions stdlib/socket/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,18 @@
<classifier>ballerina-binary-repo</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-log-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-log-api</artifactId>
<type>zip</type>
<classifier>ballerina-binary-repo</classifier>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
2 changes: 2 additions & 0 deletions stdlib/socket/src/main/ballerina/socket/client_socket.bal
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,11 @@ public type Client client object {
#
# + host - Target service URL
# + port - Port number of the remote service
# + readTimeout - Socket read timeout value to be used in milliseconds. Default is 300000 milliseconds (5 minutes)
# + callbackService - The callback service for the client. Resources in this service gets called on receipt of messages from the server.
public type ClientConfig record {|
string host;
int port;
int readTimeout = 300000;
service callbackService?;
|};
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public type Listener object {
# Represents the socket server configuration.
#
# + interface - the interface that server with to bind
# + readTimeout - Socket read timeout value to be used in milliseconds. Default is 300000 milliseconds (5 minutes)
public type ListenerConfig record {|
string? interface = ();
int readTimeout = 300000;
|};
15 changes: 12 additions & 3 deletions stdlib/socket/src/main/ballerina/socket/udp_client_socket.bal
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ public type UdpClient client object {
# Initialize the UDP client based on provided configuration.
#
# + localAddress - Locally binding interface and port
public function __init(Address? localAddress = ()) {
var initResult = self.initEndpoint(localAddress);
# + config - Configure additional details like read timeout etc.
public function __init(Address? localAddress = (), UdpClientConfig? config = ()) {
UdpClientConfig configuration = config ?: {};
var initResult = self.initEndpoint(localAddress, configuration);
if (initResult is error) {
panic initResult;
}
Expand All @@ -40,7 +42,7 @@ public type UdpClient client object {
}
}

function initEndpoint(Address? localAddress) returns error? = external;
function initEndpoint(Address? localAddress, UdpClientConfig config) returns error? = external;

# Send given data to the specified remote client.
#
Expand Down Expand Up @@ -72,3 +74,10 @@ public type Address record {|
string host?;
int port;
|};

# Configuration for UDP client.
#
# + readTimeout - Socket read timeout value to be used in milliseconds. Default is 300000 milliseconds (5 minutes)
public type UdpClientConfig record {|
int readTimeout = 300000;
|};
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,10 @@ private SocketConstants() {

// If default length pass as the read length then the entire buffer read.
public static final int DEFAULT_EXPECTED_READ_LENGTH = -100;

// Default read timeout set as 5 min.
public static final long DEFAULT_READ_TIMEOUT_VALUE = 1000 * 60 * 5;
public static final String READ_TIMEOUT = "readTimeout";
public static final String CONFIG_READ_TIMEOUT = "b7a.socket.read.timeout";

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.ballerinalang.connector.api.Service;
import org.ballerinalang.connector.api.Struct;
import org.ballerinalang.model.types.TypeKind;
import org.ballerinalang.model.values.BInteger;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.natives.annotations.BallerinaFunction;
Expand All @@ -43,6 +44,7 @@
import static org.ballerinalang.stdlib.socket.SocketConstants.CLIENT_CONFIG;
import static org.ballerinalang.stdlib.socket.SocketConstants.CLIENT_SERVICE_CONFIG;
import static org.ballerinalang.stdlib.socket.SocketConstants.IS_CLIENT;
import static org.ballerinalang.stdlib.socket.SocketConstants.READ_TIMEOUT;
import static org.ballerinalang.stdlib.socket.SocketConstants.SOCKET_KEY;
import static org.ballerinalang.stdlib.socket.SocketConstants.SOCKET_PACKAGE;
import static org.ballerinalang.stdlib.socket.SocketConstants.SOCKET_SERVICE;
Expand Down Expand Up @@ -83,8 +85,10 @@ public void execute(Context context) {
if (service != null) {
resourceMap = SocketUtils.getResourceRegistry(service);
}
clientEndpoint.addNativeData(SOCKET_SERVICE, new SocketService(socketChannel, resourceMap));
clientEndpoint.addNativeData(CLIENT_CONFIG, endpointConfig);
final BValue readTimeoutBValue = endpointConfig.get(READ_TIMEOUT);
long timeout = ((BInteger) readTimeoutBValue).intValue();
clientEndpoint.addNativeData(SOCKET_SERVICE, new SocketService(socketChannel, resourceMap, timeout));
context.setReturnValues();
} catch (SocketException e) {
context.setReturnValues(SocketUtils.createSocketError(context, "Unable to bind the local socket port"));
Expand All @@ -93,6 +97,4 @@ public void execute(Context context) {
context.setReturnValues(SocketUtils.createSocketError(context, "Unable to initiate the socket"));
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,14 @@ public void execute(Context context, CallableUnitCallback callback) {
callback.notifyFailure(SocketUtils.createSocketError(context, msg));
return;
}
SocketService socketService = (SocketService) clientEndpoint.getNativeData(SocketConstants.SOCKET_SERVICE);
SocketChannel socketChannel = (SocketChannel) clientEndpoint.getNativeData(SocketConstants.SOCKET_KEY);
final ReadPendingCallback readPendingCallback = new ReadPendingCallback(context, callback, expectedLength);
int socketHash = socketChannel.hashCode();
ReadPendingCallback readPendingCallback = new ReadPendingCallback(context, callback, expectedLength, socketHash,
socketService.getReadTimeout());
ReadPendingSocketMap.getInstance().add(socketChannel.hashCode(), readPendingCallback);
SocketService socketService = (SocketService) clientEndpoint.getNativeData(SocketConstants.SOCKET_SERVICE);
log.debug("Notify to invokeRead");
SelectorManager.getInstance().invokeRead(socketChannel.hashCode(), socketService.getResources() != null);
SelectorManager.getInstance().invokeRead(socketHash, socketService.getResources() != null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.ballerinalang.connector.api.BLangConnectorSPIUtil;
import org.ballerinalang.connector.api.Struct;
import org.ballerinalang.model.types.TypeKind;
import org.ballerinalang.model.values.BInteger;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.natives.annotations.BallerinaFunction;
Expand All @@ -37,6 +38,7 @@

import static org.ballerinalang.stdlib.socket.SocketConstants.CONFIG_FIELD_PORT;
import static org.ballerinalang.stdlib.socket.SocketConstants.LISTENER_CONFIG;
import static org.ballerinalang.stdlib.socket.SocketConstants.READ_TIMEOUT;
import static org.ballerinalang.stdlib.socket.SocketConstants.SERVER_SOCKET_KEY;
import static org.ballerinalang.stdlib.socket.SocketConstants.SOCKET_PACKAGE;

Expand Down Expand Up @@ -67,6 +69,9 @@ public void execute(Context context) {
serviceEndpoint.addNativeData(LISTENER_CONFIG, endpointConfig);
int port = (int) context.getIntArgument(0);
serviceEndpoint.addNativeData(CONFIG_FIELD_PORT, port);
final BValue readTimeoutBValue = endpointConfig.get(READ_TIMEOUT);
long timeout = ((BInteger) readTimeoutBValue).intValue();
serviceEndpoint.addNativeData(READ_TIMEOUT, timeout);
} catch (SocketException e) {
context.setReturnValues(SocketUtils.createSocketError(context, "Unable to bind the socket port"));
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.nio.channels.ServerSocketChannel;
import java.util.Map;

import static org.ballerinalang.stdlib.socket.SocketConstants.READ_TIMEOUT;
import static org.ballerinalang.stdlib.socket.SocketConstants.SERVER_SOCKET_KEY;
import static org.ballerinalang.stdlib.socket.SocketConstants.SOCKET_PACKAGE;
import static org.ballerinalang.stdlib.socket.SocketConstants.SOCKET_SERVICE;
Expand Down Expand Up @@ -62,7 +63,8 @@ public void execute(Context context) {
private SocketService getSocketService(Context context, Struct listenerEndpoint) {
Map<String, Resource> resources = getResourceMap(context);
ServerSocketChannel serverSocket = (ServerSocketChannel) listenerEndpoint.getNativeData(SERVER_SOCKET_KEY);
return new SocketService(serverSocket, resources);
long timeout = (long) listenerEndpoint.getNativeData(READ_TIMEOUT);
return new SocketService(serverSocket, resources, timeout);
}

private Map<String, Resource> getResourceMap(Context context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

import static java.nio.channels.SelectionKey.OP_READ;
import static org.ballerinalang.stdlib.socket.SocketConstants.IS_CLIENT;
import static org.ballerinalang.stdlib.socket.SocketConstants.READ_TIMEOUT;
import static org.ballerinalang.stdlib.socket.SocketConstants.SOCKET_KEY;
import static org.ballerinalang.stdlib.socket.SocketConstants.SOCKET_PACKAGE;
import static org.ballerinalang.stdlib.socket.SocketConstants.SOCKET_SERVICE;
Expand Down Expand Up @@ -87,7 +88,9 @@ public void execute(Context context, CallableUnitCallback callback) {
socketChannel.bind(new InetSocketAddress(host.stringValue(), (int) port.intValue()));
}
}
socketService = new SocketService(socketChannel, null);
BMap<String, BValue> configs = (BMap<String, BValue>) context.getNullableRefArgument(2);
long timeout = ((BInteger) configs.get(READ_TIMEOUT)).intValue();
socketService = new SocketService(socketChannel, null, timeout);
clientEndpoint.addNativeData(SOCKET_SERVICE, socketService);
selectorManager = SelectorManager.getInstance();
selectorManager.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.ballerinalang.stdlib.socket.tcp.ReadPendingCallback;
import org.ballerinalang.stdlib.socket.tcp.ReadPendingSocketMap;
import org.ballerinalang.stdlib.socket.tcp.SelectorManager;
import org.ballerinalang.stdlib.socket.tcp.SocketService;
import org.ballerinalang.stdlib.socket.tcp.SocketUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -65,10 +66,13 @@ public void execute(Context context, CallableUnitCallback callback) {
return;
}
DatagramChannel socket = (DatagramChannel) clientEndpoint.getNativeData(SocketConstants.SOCKET_KEY);
final ReadPendingCallback readPendingCallback = new ReadPendingCallback(context, callback, expectedLength);
int socketHash = socket.hashCode();
SocketService socketService = (SocketService) clientEndpoint.getNativeData(SocketConstants.SOCKET_SERVICE);
ReadPendingCallback readPendingCallback = new ReadPendingCallback(context, callback, expectedLength, socketHash,
socketService.getReadTimeout());
ReadPendingSocketMap.getInstance().add(socket.hashCode(), readPendingCallback);
log.debug("Notify to invokeRead");
SelectorManager.getInstance().invokeRead(socket.hashCode(), false);
SelectorManager.getInstance().invokeRead(socketHash, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@

import org.ballerinalang.bre.Context;
import org.ballerinalang.bre.bvm.CallableUnitCallback;
import org.ballerinalang.model.values.BError;

import java.nio.ByteBuffer;
import java.util.Timer;
import java.util.TimerTask;

/**
* This will hold information related to the pending read actions.
Expand All @@ -35,11 +38,18 @@ public class ReadPendingCallback {
private final int expectedLength;
private int currentLength;
private ByteBuffer buffer;
private int socketHash;
private Timer timer;
private long timeout;

public ReadPendingCallback(Context context, CallableUnitCallback callback, int expectedLength) {
public ReadPendingCallback(Context context, CallableUnitCallback callback, int expectedLength, int socketHash,
long timeout) {
this.context = context;
this.callback = callback;
this.expectedLength = expectedLength;
this.socketHash = socketHash;
this.timeout = timeout;
scheduleTimeout(timeout);
}

public Context getContext() {
Expand Down Expand Up @@ -69,4 +79,41 @@ public ByteBuffer getBuffer() {
public void setBuffer(ByteBuffer buffer) {
this.buffer = buffer;
}

private void scheduleTimeout(long timeout) {
timer = getTimer();
timer.schedule(getTimerTask(), timeout);
}

/**
* Reset the timer to original time. This will create a new timer instance and start the count down.
*/
public void resetTimeout() {
timer.cancel();
timer = getTimer();
timer.schedule(getTimerTask(), this.timeout);
}

/**
* Cancel already running timer.
*/
public void cancelTimeout() {
timer.cancel();
}

private Timer getTimer() {
return new Timer("B7aSocketTimeoutTimer");
}

private TimerTask getTimerTask() {
return new TimerTask() {
public void run() {
ReadPendingSocketMap.getInstance().remove(socketHash);
final BError timeoutError = SocketUtils.createSocketError(context, "Read timed out");
context.setReturnValues(timeoutError);
callback.notifySuccess();
cancel();
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@

package org.ballerinalang.stdlib.socket.tcp;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* This map will hold all the pending read action that coming through
Expand All @@ -28,7 +27,7 @@
* @since 0.995.0
*/
public class ReadPendingSocketMap {
private Map<Integer, ReadPendingCallback> queue = new HashMap<>();
private ConcurrentHashMap<Integer, ReadPendingCallback> queue = new ConcurrentHashMap<>();

private ReadPendingSocketMap() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@

package org.ballerinalang.stdlib.socket.tcp;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* This map will hold all the read ready sockets until it is read through the
Expand All @@ -28,7 +27,7 @@
* @since 0.995.0
*/
public class ReadReadySocketMap {
private Map<Integer, SocketReader> socketReaderMap = new HashMap<>();
private ConcurrentHashMap<Integer, SocketReader> socketReaderMap = new ConcurrentHashMap<>();

private ReadReadySocketMap() {
}
Expand Down
Loading