Skip to content

Commit

Permalink
[69] partial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
mirage22 committed Oct 9, 2024
1 parent d34aae4 commit 3cfab5f
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
*/
package com.robo4j.socket.http.channel;

import com.robo4j.logging.SimpleLoggingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
Expand All @@ -29,26 +30,26 @@
* @author Miro Wengner (@miragemiko)
*/
public class AcceptSelectionKeyHandler implements SelectionKeyHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(AcceptSelectionKeyHandler.class);
private final SelectionKey key;
private final int bufferCapacity;

private final SelectionKey key;
private final int bufferCapacity;
public AcceptSelectionKeyHandler(SelectionKey key, int bufferCapacity) {
this.key = key;
this.bufferCapacity = bufferCapacity;
}

public AcceptSelectionKeyHandler(SelectionKey key, int bufferCapacity) {
this.key = key;
this.bufferCapacity = bufferCapacity;
}

@Override
public SelectionKey handle() {
try {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel channel = serverChannel.accept();
serverChannel.socket().setReceiveBufferSize(bufferCapacity);
channel.configureBlocking(false);
channel.register(key.selector(), SelectionKey.OP_READ);
} catch (Exception e) {
SimpleLoggingUtil.error(getClass(), "handle accept", e);
}
return key;
}
@Override
public SelectionKey handle() {
try {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel channel = serverChannel.accept();
serverChannel.socket().setReceiveBufferSize(bufferCapacity);
channel.configureBlocking(false);
channel.register(key.selector(), SelectionKey.OP_READ);
} catch (Exception e) {
LOGGER.error("handle accept:{}", e.getMessage(), e);
}
return key;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
*/
package com.robo4j.socket.http.channel;

import com.robo4j.logging.SimpleLoggingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
Expand All @@ -28,6 +29,7 @@
* @author Miro Wengner (@miragemiko)
*/
public class ConnectSelectionKeyHandler implements SelectionKeyHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectSelectionKeyHandler.class);

private final SelectionKey key;

Expand All @@ -37,11 +39,11 @@ public ConnectSelectionKeyHandler(SelectionKey key) {

@Override
public SelectionKey handle() {
try{
try {
SocketChannel channel = (SocketChannel) key.channel();
channel.finishConnect();
} catch (Exception e){
SimpleLoggingUtil.error(getClass(), "handle connect", e);
} catch (Exception e) {
LOGGER.error("handle connect:{}", e.getMessage(), e);
}
return key;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
package com.robo4j.socket.http.channel;

import com.robo4j.RoboContext;
import com.robo4j.logging.SimpleLoggingUtil;
import com.robo4j.socket.http.request.HttpResponseProcess;
import com.robo4j.socket.http.units.CodecRegistry;
import com.robo4j.socket.http.units.ServerContext;
import com.robo4j.socket.http.util.ChannelUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.channels.SelectionKey;
Expand All @@ -32,8 +33,8 @@
import java.util.concurrent.ConcurrentHashMap;

import static com.robo4j.socket.http.util.ChannelUtils.handleSelectorHandler;
import static com.robo4j.socket.http.util.RoboHttpUtils.PROPERTY_CODEC_REGISTRY;
import static com.robo4j.socket.http.util.RoboHttpUtils.PROPERTY_BUFFER_CAPACITY;
import static com.robo4j.socket.http.util.RoboHttpUtils.PROPERTY_CODEC_REGISTRY;

/**
* Inbound context co
Expand All @@ -42,71 +43,72 @@
* @author Miro Wengner (@miragemiko)
*/
public class InboundHttpSocketChannelHandler implements ChannelHandler {

private final RoboContext context;
private final ServerContext serverContext;
private final Map<SelectionKey, HttpResponseProcess> outBuffers = new ConcurrentHashMap<>();
private ServerSocketChannel socketChannel;
private boolean active;

public InboundHttpSocketChannelHandler(RoboContext context, ServerContext serverContext) {
this.context = context;
this.serverContext = serverContext;
}

@Override
public void start() {
if (!active) {
active = true;
context.getScheduler().execute(() -> initSocketChannel(serverContext));
}
}

@Override
public void stop() {
try {
if (socketChannel != null && socketChannel.isOpen()) {
active = false;
socketChannel.close();
}
} catch (IOException e) {
SimpleLoggingUtil.error(getClass(), "server stop problem: ", e);
}
}

private void initSocketChannel(ServerContext serverContext) {
socketChannel = ChannelUtils.initServerSocketChannel(serverContext);
final SelectionKey key = ChannelUtils.registerSelectionKey(socketChannel);

final CodecRegistry codecRegistry = serverContext.getPropertySafe(CodecRegistry.class, PROPERTY_CODEC_REGISTRY);
final int bufferCapacity = serverContext.getPropertySafe(Integer.class, PROPERTY_BUFFER_CAPACITY);

while (active) {
int channelReady = ChannelUtils.getReadyChannelBySelectionKey(key);
if (channelReady == 0) {
continue;
}

Set<SelectionKey> selectedKeys = key.selector().selectedKeys();
Iterator<SelectionKey> selectedIterator = selectedKeys.iterator();

while (selectedIterator.hasNext()) {
final SelectionKey selectedKey = selectedIterator.next();

selectedIterator.remove();

if (selectedKey.isAcceptable()) {
handleSelectorHandler(new AcceptSelectionKeyHandler(selectedKey, bufferCapacity));
} else if (selectedKey.isConnectable()) {
handleSelectorHandler(new ConnectSelectionKeyHandler(selectedKey));
} else if (selectedKey.isReadable()) {
handleSelectorHandler(new ReadSelectionKeyHandler(context, serverContext, codecRegistry, outBuffers, selectedKey));
} else if (selectedKey.isWritable()) {
handleSelectorHandler(new WriteSelectionKeyHandler(context, serverContext, outBuffers, selectedKey));
}
}
}
}
private static final Logger LOGGER = LoggerFactory.getLogger(InboundHttpSocketChannelHandler.class);

private final RoboContext context;
private final ServerContext serverContext;
private final Map<SelectionKey, HttpResponseProcess> outBuffers = new ConcurrentHashMap<>();
private ServerSocketChannel socketChannel;
private boolean active;

public InboundHttpSocketChannelHandler(RoboContext context, ServerContext serverContext) {
this.context = context;
this.serverContext = serverContext;
}

@Override
public void start() {
if (!active) {
active = true;
context.getScheduler().execute(() -> initSocketChannel(serverContext));
}
}

@Override
public void stop() {
try {
if (socketChannel != null && socketChannel.isOpen()) {
active = false;
socketChannel.close();
}
} catch (IOException e) {
LOGGER.error("server stop problem: {}", e.getMessage(), e);
}
}

private void initSocketChannel(ServerContext serverContext) {
socketChannel = ChannelUtils.initServerSocketChannel(serverContext);
final SelectionKey key = ChannelUtils.registerSelectionKey(socketChannel);

final CodecRegistry codecRegistry = serverContext.getPropertySafe(CodecRegistry.class, PROPERTY_CODEC_REGISTRY);
final int bufferCapacity = serverContext.getPropertySafe(Integer.class, PROPERTY_BUFFER_CAPACITY);

while (active) {
int channelReady = ChannelUtils.getReadyChannelBySelectionKey(key);
if (channelReady == 0) {
continue;
}

Set<SelectionKey> selectedKeys = key.selector().selectedKeys();
Iterator<SelectionKey> selectedIterator = selectedKeys.iterator();

while (selectedIterator.hasNext()) {
final SelectionKey selectedKey = selectedIterator.next();

selectedIterator.remove();

if (selectedKey.isAcceptable()) {
handleSelectorHandler(new AcceptSelectionKeyHandler(selectedKey, bufferCapacity));
} else if (selectedKey.isConnectable()) {
handleSelectorHandler(new ConnectSelectionKeyHandler(selectedKey));
} else if (selectedKey.isReadable()) {
handleSelectorHandler(new ReadSelectionKeyHandler(context, serverContext, codecRegistry, outBuffers, selectedKey));
} else if (selectedKey.isWritable()) {
handleSelectorHandler(new WriteSelectionKeyHandler(context, serverContext, outBuffers, selectedKey));
}
}
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
package com.robo4j.socket.http.channel;

import com.robo4j.RoboContext;
import com.robo4j.logging.SimpleLoggingUtil;
import com.robo4j.socket.http.units.ClientContext;
import com.robo4j.socket.http.units.DatagramClientUnit;
import com.robo4j.socket.http.util.ChannelBufferUtils;
import com.robo4j.socket.http.util.ChannelUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.SocketAddress;
Expand All @@ -32,59 +33,58 @@
* Inbound Datagram Handler for UDP client handles sending messages and
* receiving response
*
* @see DatagramClientUnit
*
* @author Marcus Hirt (@hirt)
* @author Miroslav Wengner (@miragemiko)
* @see DatagramClientUnit
*/
public class OutboundDatagramSocketChannelHandler implements ChannelHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(OutboundDatagramSocketChannelHandler.class);
private final RoboContext context;
private final ClientContext clientContext;
private final byte[] payload;
private DatagramChannel channel;
private volatile boolean active;

private final RoboContext context;
private final ClientContext clientContext;
private final byte[] payload;
private DatagramChannel channel;
private volatile boolean active;

public OutboundDatagramSocketChannelHandler(RoboContext context, ClientContext clientContext, byte[] payload) {
this.context = context;
this.clientContext = clientContext;
this.payload = payload;
}
public OutboundDatagramSocketChannelHandler(RoboContext context, ClientContext clientContext, byte[] payload) {
this.context = context;
this.clientContext = clientContext;
this.payload = payload;
}

@Override
public void start() {
if (!active) {
active = true;
context.getScheduler().execute(() -> initDatagramSocket(clientContext));
}
}
@Override
public void start() {
if (!active) {
active = true;
context.getScheduler().execute(() -> initDatagramSocket(clientContext));
}
}

@Override
public void stop() {
try {
if (channel != null) {
active = false;
if (channel.isConnected())
channel.close();
}
} catch (IOException e) {
SimpleLoggingUtil.error(getClass(), "server stop problem: ", e);
}
}
@Override
public void stop() {
try {
if (channel != null) {
active = false;
if (channel.isConnected())
channel.close();
}
} catch (IOException e) {
LOGGER.error("server stop problem: {}", e.getMessage(), e);
}
}

private void initDatagramSocket(ClientContext clientContext) {
channel = ChannelUtils.initDatagramChannel(DatagramConnectionType.CLIENT, clientContext);
final ByteBuffer buffer = ByteBuffer.allocateDirect(ChannelBufferUtils.INIT_BUFFER_CAPACITY);
final SocketAddress address = ChannelUtils.getSocketAddressByContext(clientContext);
// while (active.get()){
try {
buffer.clear();
buffer.put(payload);
buffer.flip();
channel.send(buffer, address);
} catch (Exception e) {
SimpleLoggingUtil.error(getClass(), "datagram problem: ", e);
}
private void initDatagramSocket(ClientContext clientContext) {
channel = ChannelUtils.initDatagramChannel(DatagramConnectionType.CLIENT, clientContext);
final ByteBuffer buffer = ByteBuffer.allocateDirect(ChannelBufferUtils.INIT_BUFFER_CAPACITY);
final SocketAddress address = ChannelUtils.getSocketAddressByContext(clientContext);
// while (active.get()){
try {
buffer.clear();
buffer.put(payload);
buffer.flip();
channel.send(buffer, address);
} catch (Exception e) {
LOGGER.error("datagram problem:{}", e.getMessage(), e);
}

}
}
}

0 comments on commit 3cfab5f

Please sign in to comment.