From 50632310ddb7391a22826d4d6792fd80ce36f070 Mon Sep 17 00:00:00 2001 From: Jacob G Date: Sat, 11 Aug 2018 15:45:56 -0400 Subject: [PATCH] Outgoing packets are now combined into a single packet before flushing. --- README.md | 4 +- pom.xml | 5 +- src/main/java/simplenet/Client.java | 434 ++++++++++----------- src/main/java/simplenet/packet/Packet.java | 41 +- 4 files changed, 222 insertions(+), 262 deletions(-) diff --git a/README.md b/README.md index ebb087d..b17d252 100644 --- a/README.md +++ b/README.md @@ -10,13 +10,13 @@ An easy-to-use, event-driven, asynchronous, network application framework. com.github.jhg023 SimpleNet - 1.1.9 + 1.2.0 ``` Gradle: - compile 'com.github.jhg023:SimpleNet:1.1.9' + compile 'com.github.jhg023:SimpleNet:1.2.0' 2. To create a `Client`, you can use the following: ```java diff --git a/pom.xml b/pom.xml index c36516a..82d2a8a 100644 --- a/pom.xml +++ b/pom.xml @@ -11,12 +11,13 @@ com.github.jhg023 SimpleNet - 1.1.9 + 1.2.0 jar SimpleNet An easy-to-use, event-driven, asynchronous, - network application framework. + network application framework. + https://github.com/jhg023/SimpleNet diff --git a/src/main/java/simplenet/Client.java b/src/main/java/simplenet/Client.java index 490f8f6..c63c984 100644 --- a/src/main/java/simplenet/Client.java +++ b/src/main/java/simplenet/Client.java @@ -36,8 +36,7 @@ public class Client extends Receiver implements Channeled { /** - * The {@link CompletionHandler} used to process bytes - * when they are received by this {@link Client}. + * The {@link CompletionHandler} used to process bytes when they are received by this {@link Client}. */ static class Listener implements CompletionHandler { @@ -121,8 +120,8 @@ public void failed(Throwable t, Client client) { } /** - * The {@link CompletionHandler} used when this {@link Client} - * sends one or more {@link Packet}s to a {@link Server}. + * The {@link CompletionHandler} used when this {@link Client} sends one or more {@link Packet}s + * to a {@link Server}. */ private static final CompletionHandler PACKET_HANDLER = new CompletionHandler() { @Override @@ -148,16 +147,15 @@ public void failed(Throwable t, Client client) { t.printStackTrace(); } }; - - /** - * A thread-safe method of keeping track whether this {@link Client} - * is currently writing data to the network. - */ - private final AtomicBoolean writing; /** - * The {@link ByteBuffer} that will hold data - * sent by the {@link Client} or {@link Server}. + * A thread-safe method of keeping track whether this {@link Client} is currently writing data + * to the network. + */ + private final AtomicBoolean writing; + + /** + * The {@link ByteBuffer} that will hold data sent by the {@link Client} or {@link Server}. */ private final ByteBuffer buffer; @@ -167,14 +165,12 @@ public void failed(Throwable t, Client client) { private final AsynchronousSocketChannel channel; /** - * Whether or not new elements added {@code queue} - * should be added to the front rather than the back. + * Whether or not new elements added {@code queue} should be added to the front rather than the back. */ private boolean prepend; /** - * The amount of readable {@code byte}s that currently exist - * within this {@link Client}'s {@code buffer}. + * The amount of readable {@code byte}s that currently exist within this {@link Client}'s {@code buffer}. */ private int size; @@ -191,39 +187,35 @@ public void failed(Throwable t, Client client) { /** * A {@link Queue} to manage outgoing {@link Packet}s. */ - private final Queue outgoingPackets; + private final Queue outgoingPackets; /** - * A {@link Queue} to manage {@link Packet}s that should be - * flushed as soon as possible. + * A {@link Queue} to manage {@link Packet}s that should be flushed as soon as possible. */ private final Queue packetsToFlush; /** - * The {@link Deque} that keeps track of nested calls - * to {@link Client#read(int, Consumer)} and assures that they - * will complete in the expected order. + * The {@link Deque} that keeps track of nested calls to {@link Client#read(int, Consumer)} + * and assures that they will complete in the expected order. */ private final Deque>> stack; /** - * The {@link Deque} used when requesting a certain - * amount of bytes from the {@link Client} or {@link Server}. + * The {@link Deque} used when requesting a certain amount of bytes from the {@link Client} or + * {@link Server}. */ private final Deque>> queue; /** - * Instantiates a new {@link Client} by attempting - * to open the backing {@link AsynchronousSocketChannel} - * with a default buffer size of {@code 4096} bytes. + * Instantiates a new {@link Client} by attempting to open the backing + * {@link AsynchronousSocketChannel} with a default buffer size of {@code 4096} bytes. */ public Client() { this(4096); } /** - * Instantiates a new {@link Client} by attempting - * to open the backing {@link AsynchronousSocketChannel} + * Instantiates a new {@link Client} by attempting to open the backing {@link AsynchronousSocketChannel} * with a provided buffer size in bytes. * * @param bufferSize The size of this {@link Client}'s buffer, in bytes. @@ -233,8 +225,7 @@ public Client(int bufferSize) { } /** - * Instantiates a new {@link Client} with an existing - * {@link AsynchronousSocketChannel} with a provided + * Instantiates a new {@link Client} with an existing {@link AsynchronousSocketChannel} with a provided * buffer size in bytes. * * @param bufferSize The size of this {@link Client}'s buffer, in bytes. @@ -293,20 +284,20 @@ public final void connect(String address, int port) { connect(address, port, 30L, TimeUnit.SECONDS, () -> { System.err.println("Couldn't connect within 30 seconds!"); }); - } - - /** - * Attempts to connect to a {@link Server} with the specified {@code address} and {@code port} - * and a specified timeout. If the timeout is reached, then the {@link Runnable} is run and - * the backing {@link AsynchronousSocketChannel} is closed. - * - * @param address The IP address to connect to. - * @param port The port to connect to {@code 0 <= port <= 65535}. - * @param timeout The timeout value. - * @param unit The timeout unit. - * @param onTimeout The {@link Runnable} that runs if this connection attempt times out. - */ - public final void connect(String address, int port, long timeout, TimeUnit unit, Runnable onTimeout) { + } + + /** + * Attempts to connect to a {@link Server} with the specified {@code address} and {@code port} + * and a specified timeout. If the timeout is reached, then the {@link Runnable} is run and + * the backing {@link AsynchronousSocketChannel} is closed. + * + * @param address The IP address to connect to. + * @param port The port to connect to {@code 0 <= port <= 65535}. + * @param timeout The timeout value. + * @param unit The timeout unit. + * @param onTimeout The {@link Runnable} that runs if this connection attempt times out. + */ + public final void connect(String address, int port, long timeout, TimeUnit unit, Runnable onTimeout) { Objects.requireNonNull(address); if (port < 0 || port > 65535) { @@ -321,7 +312,7 @@ public final void connect(String address, int port, long timeout, TimeUnit unit, throw new IllegalStateException("This receiver is already connected!"); } catch (ExecutionException e) { e.printStackTrace(); - } catch (Exception e) { + } catch (Exception e) { onTimeout.run(); close(); } @@ -334,14 +325,12 @@ public void close() { } /** - * Registers a listener that fires when a {@link Client} - * disconnects from a {@link Server}. + * Registers a listener that fires when a {@link Client} disconnects from a {@link Server}. *

- * This listener is able to be used by both the {@link Client} - * and {@link Server}, but can be independent of one-another. + * This listener is able to be used by both the {@link Client} and {@link Server}, but can + * be independent of one-another. *

- * When calling this method more than once, multiple listeners - * are registered. + * Calling this method more than once registers multiple listeners. * * @param listener A {@link Runnable}. */ @@ -350,8 +339,8 @@ public void onDisconnect(Runnable listener) { } /** - * Requests {@code n} bytes and accepts a {@link Consumer} - * holding the {@code n} bytes once received. + * Requests {@code n} bytes and accepts a {@link Consumer} holding the {@code n} + * bytes once received. * * @param n The number of bytes requested. * @param consumer A {@link Consumer}. @@ -371,11 +360,9 @@ public final void read(int n, Consumer consumer) { } /** - * Calls {@link #read(int, Consumer)}, however once - * finished, {@link #read(int, Consumer)} is called once - * again with the same parameters; this loops indefinitely - * whereas {@link #read(int, Consumer)} completes after - * a single iteration. + * Calls {@link #read(int, Consumer)}, however once finished, {@link #read(int, Consumer)} + * is called once again with the same parameters; this loops indefinitely whereas + * {@link #read(int, Consumer)} completes after a single iteration. * * @param n The number of bytes requested. * @param consumer Holds the operations that should be performed once @@ -390,37 +377,37 @@ public void accept(ByteBuffer buffer) { } }); } - - /** - * A helper method to block until the {@link CompletableFuture} contains a value. - * - * @param future The {@link CompletableFuture} to wait for. - * @param The type of the {@link CompletableFuture} and the return type. - * @return The instance of {@code T} contained in the {@link CompletableFuture}. - */ - private T read(CompletableFuture future) { - try { - return future.get(); - } catch (Exception e) { - throw new IllegalStateException("Blocking operation was cancelled!"); - } - } - - /** - * Reads a {@code byte} from the network, but blocks the executing thread - * unlike {@link #readByte(Consumer)}. - * - * @return A {@code byte}. - */ - public final byte readByte() { + + /** + * A helper method to block until the {@link CompletableFuture} contains a value. + * + * @param future The {@link CompletableFuture} to wait for. + * @param The type of the {@link CompletableFuture} and the return type. + * @return The instance of {@code T} contained in the {@link CompletableFuture}. + */ + private T read(CompletableFuture future) { + try { + return future.get(); + } catch (Exception e) { + throw new IllegalStateException("Blocking operation was cancelled!"); + } + } + + /** + * Reads a {@code byte} from the network, but blocks the executing thread unlike + * {@link #readByte(Consumer)}. + * + * @return A {@code byte}. + */ + public final byte readByte() { CompletableFuture future = new CompletableFuture<>(); - readByte(future::complete); + readByte(future::complete); return read(future); } /** - * Requests a single {@code byte} and accepts a {@link Consumer} - * with the {@code byte} when it is received. + * Requests a single {@code byte} and accepts a {@link Consumer} with the {@code byte} + * when it is received. * * @param consumer A {@link Consumer}. */ @@ -429,33 +416,31 @@ public final void readByte(Consumer consumer) { } /** - * Calls {@link #readByte(Consumer)}, however once - * finished, {@link #readByte(Consumer)} is called once - * again with the same parameter; this loops indefinitely - * whereas {@link #readByte(Consumer)} completes after - * a single iteration. + * Calls {@link #readByte(Consumer)}, however once finished, {@link #readByte(Consumer)} + * is called once again with the same parameter; this loops indefinitely whereas + * {@link #readByte(Consumer)} completes after a single iteration. * * @param consumer A {@link Consumer}. */ public final void readByteAlways(Consumer consumer) { readAlways(Byte.BYTES, buffer -> consumer.accept(buffer.get())); - } - - /** - * Reads a {@code char} from the network, but blocks the executing thread - * unlike {@link #readChar(Consumer)}. - * - * @return A {@code char}. - */ - public final char readChar() { - CompletableFuture future = new CompletableFuture<>(); - readChar(future::complete); - return read(future); - } - - /** - * Requests a single {@code char} and accepts a {@link Consumer} - * with the {@code char} when it is received. + } + + /** + * Reads a {@code char} from the network, but blocks the executing thread unlike + * {@link #readChar(Consumer)}. + * + * @return A {@code char}. + */ + public final char readChar() { + CompletableFuture future = new CompletableFuture<>(); + readChar(future::complete); + return read(future); + } + + /** + * Requests a single {@code char} and accepts a {@link Consumer} with the {@code char} + * when it is received. * * @param consumer A {@link Consumer}. */ @@ -474,20 +459,20 @@ public final void readChar(Consumer consumer) { */ public final void readCharAlways(Consumer consumer) { readAlways(Character.BYTES, buffer -> consumer.accept(buffer.getChar())); - } - - /** - * Reads a {@code double} from the network, but blocks the executing thread - * unlike {@link #readDouble(DoubleConsumer)}. - * - * @return A {@code double}. - */ - public final double readDouble() { - CompletableFuture future = new CompletableFuture<>(); - readDouble(future::complete); - return read(future); - } - + } + + /** + * Reads a {@code double} from the network, but blocks the executing thread + * unlike {@link #readDouble(DoubleConsumer)}. + * + * @return A {@code double}. + */ + public final double readDouble() { + CompletableFuture future = new CompletableFuture<>(); + readDouble(future::complete); + return read(future); + } + /** * Requests a single {@code double} and accepts a {@link Consumer} * with the {@code double} when it is received. @@ -509,20 +494,20 @@ public final void readDouble(DoubleConsumer consumer) { */ public final void readDoubleAlways(DoubleConsumer consumer) { readAlways(Double.BYTES, buffer -> consumer.accept(buffer.getDouble())); - } - - /** - * Reads a {@code float} from the network, but blocks the executing thread - * unlike {@link #readFloat(Consumer)}. - * - * @return A {@code float}. - */ - public final float readFloat() { - CompletableFuture future = new CompletableFuture<>(); - readFloat(future::complete); - return read(future); - } - + } + + /** + * Reads a {@code float} from the network, but blocks the executing thread + * unlike {@link #readFloat(Consumer)}. + * + * @return A {@code float}. + */ + public final float readFloat() { + CompletableFuture future = new CompletableFuture<>(); + readFloat(future::complete); + return read(future); + } + /** * Requests a single {@code float} and accepts a {@link Consumer} * with the {@code float} when it is received. @@ -543,21 +528,21 @@ public final void readFloat(Consumer consumer) { * @param consumer A {@link Consumer}. */ public final void readFloatAlways(Consumer consumer) { - readAlways(Float.BYTES, buffer -> consumer.accept(buffer.getFloat())); - } - - /** - * Reads an {@code int} from the network, but blocks the executing thread - * unlike {@link #readInt(IntConsumer)}. - * - * @return An {@code int}. - */ - public final int readInt() { - CompletableFuture future = new CompletableFuture<>(); - readInt(future::complete); - return read(future); - } - + readAlways(Float.BYTES, buffer -> consumer.accept(buffer.getFloat())); + } + + /** + * Reads an {@code int} from the network, but blocks the executing thread + * unlike {@link #readInt(IntConsumer)}. + * + * @return An {@code int}. + */ + public final int readInt() { + CompletableFuture future = new CompletableFuture<>(); + readInt(future::complete); + return read(future); + } + /** * Requests a single {@code int} and accepts a {@link Consumer} * with the {@code int} when it is received. @@ -578,21 +563,21 @@ public final void readInt(IntConsumer consumer) { * @param consumer An {@link IntConsumer}. */ public final void readIntAlways(IntConsumer consumer) { - readAlways(Integer.BYTES, buffer -> consumer.accept(buffer.getInt())); - } - - /** - * Reads a {@code long} from the network, but blocks the executing thread - * unlike {@link #readLong(LongConsumer)}. - * - * @return A {@code long}. - */ - public final long readLong() { - CompletableFuture future = new CompletableFuture<>(); - readLong(future::complete); - return read(future); - } - + readAlways(Integer.BYTES, buffer -> consumer.accept(buffer.getInt())); + } + + /** + * Reads a {@code long} from the network, but blocks the executing thread + * unlike {@link #readLong(LongConsumer)}. + * + * @return A {@code long}. + */ + public final long readLong() { + CompletableFuture future = new CompletableFuture<>(); + readLong(future::complete); + return read(future); + } + /** * Requests a single {@code long} and accepts a {@link Consumer} * with the {@code long} when it is received. @@ -613,21 +598,21 @@ public final void readLong(LongConsumer consumer) { * @param consumer A {@link LongConsumer}. */ public final void readLongAlways(LongConsumer consumer) { - readAlways(Long.BYTES, buffer -> consumer.accept(buffer.getLong())); - } - - /** - * Reads a {@code short} from the network, but blocks the executing thread - * unlike {@link #readShort(Consumer)}. - * - * @return A {@code short}. - */ - public final short readShort() { - CompletableFuture future = new CompletableFuture<>(); - readShort(future::complete); - return read(future); - } - + readAlways(Long.BYTES, buffer -> consumer.accept(buffer.getLong())); + } + + /** + * Reads a {@code short} from the network, but blocks the executing thread + * unlike {@link #readShort(Consumer)}. + * + * @return A {@code short}. + */ + public final short readShort() { + CompletableFuture future = new CompletableFuture<>(); + readShort(future::complete); + return read(future); + } + /** * Requests a single {@code short} and accepts a {@link Consumer} * with the {@code short} when it is received. @@ -648,21 +633,21 @@ public final void readShort(Consumer consumer) { * @param consumer A {@link Consumer}. */ public final void readShortAlways(Consumer consumer) { - readAlways(Short.BYTES, buffer -> consumer.accept(buffer.getShort())); - } - - /** - * Reads a {@link String} from the network, but blocks the executing thread - * unlike {@link #readString(Consumer)}. - * - * @return A {@code String}. - */ - public final String readString() { - CompletableFuture future = new CompletableFuture<>(); - readString(future::complete); - return read(future); - } - + readAlways(Short.BYTES, buffer -> consumer.accept(buffer.getShort())); + } + + /** + * Reads a {@link String} from the network, but blocks the executing thread + * unlike {@link #readString(Consumer)}. + * + * @return A {@code String}. + */ + public final String readString() { + CompletableFuture future = new CompletableFuture<>(); + readString(future::complete); + return read(future); + } + /** * Requests a single {@link String} and accepts a {@link Consumer} * with the {@link String} when it is received. A {@code short} @@ -707,54 +692,39 @@ public final void readStringAlways(Consumer consumer) { * it is called again. */ public final synchronized void flush() { - for (int i = 0; i < outgoingPackets.size(); i++) { - ByteBuffer raw = outgoingPackets.poll(); - - if (encryption != null) { - try { - encryption.update(raw, raw.duplicate()); - raw.flip(); - } catch (Exception e) { - throw new IllegalStateException("Exception occurred when encrypting:", e); - } - } + ByteBuffer raw = ByteBuffer.allocateDirect(outgoingPackets.stream().mapToInt(Packet::getSize).sum()); - if (!writing.getAndSet(true)) { - channel.write(raw, this, PACKET_HANDLER); - } else { - packetsToFlush.offer(raw); - } + Packet packet; + + while ((packet = outgoingPackets.poll()) != null) { + packet.getQueue().forEach(consumer -> consumer.accept(raw)); } - } - /** - * Gets the {@link Deque} that holds information - * regarding requested bytes by this {@link Client}. - * - * @return A {@link Deque}. - */ - private Deque>> getQueue() { - return queue; - } + raw.flip(); - /** - * Gets the {@link Deque} that keeps track of nested - * calls to {@link Client#read(int, Consumer)}. - * - * @return A {@link Deque}. - */ - private Deque>> getStack() { - return stack; + if (encryption != null) { + try { + encryption.update(raw, raw.duplicate()); + raw.flip(); + } catch (Exception e) { + throw new IllegalStateException("Exception occurred when encrypting:", e); + } + } + + if (!writing.getAndSet(true)) { + channel.write(raw, this, PACKET_HANDLER); + } else { + packetsToFlush.offer(raw); + } } /** - * Gets the {@link Queue} that manages outgoing - * {@link Packet}s before writing them to the + * Gets the {@link Queue} that manages outgoing {@link Packet}s before writing them to the * {@link Channel}. * * @return A {@link Queue}. */ - public Queue getOutgoingPackets() { + public Queue getOutgoingPackets() { return outgoingPackets; } diff --git a/src/main/java/simplenet/packet/Packet.java b/src/main/java/simplenet/packet/Packet.java index 2e5b395..26690d7 100644 --- a/src/main/java/simplenet/packet/Packet.java +++ b/src/main/java/simplenet/packet/Packet.java @@ -208,11 +208,9 @@ public Packet putString(String s) { * or more of the headers depend on the size of the data * contained within the {@link Packet} itself. * - * @param runnable - * The {@link Runnable} containing calls to add - * more data to this {@link Packet}. - * @return - * The {@link Packet} to allow for chained writes. + * @param runnable The {@link Runnable} containing calls to add + * more data to this {@link Packet}. + * @return The {@link Packet} to allow for chained writes. */ public Packet prepend(Runnable runnable) { prepend = true; @@ -221,19 +219,6 @@ public Packet prepend(Runnable runnable) { return this; } - /** - * Builds this {@link Packet}'s data into a {@link ByteBuffer} - * for use in {@link #write(Client...)} and {@link #writeAndFlush(Client...)}. - * - * @return - * A {@link ByteBuffer}. - */ - private ByteBuffer build() { - ByteBuffer buffer = ByteBuffer.allocateDirect(size); - queue.forEach(consumer -> consumer.accept(buffer)); - return (ByteBuffer) buffer.flip(); - } - /** * Queues this {@link Packet} to one (or more) {@link Client}(s). *

@@ -247,10 +232,8 @@ public final void write(Client... clients) { throw new IllegalArgumentException("You must send this packet to at least one channel!"); } - ByteBuffer payload = build(); - for (Client client : clients) { - client.getOutgoingPackets().offer(payload); + client.getOutgoingPackets().offer(this); } } @@ -266,10 +249,8 @@ public final void writeAndFlush(Client... clients) { throw new IllegalArgumentException("You must send this packet to at least one channel!"); } - ByteBuffer payload = build(); - for (Client client : clients) { - client.getOutgoingPackets().offer(payload); + client.getOutgoingPackets().offer(this); client.flush(); } } @@ -277,11 +258,19 @@ public final void writeAndFlush(Client... clients) { /** * Gets the number of bytes in this {@link Packet}'s payload. * - * @return - * The current size of this {@link Packet} measured in bytes. + * @return The current size of this {@link Packet} measured in bytes. */ public int getSize() { return size; } + /** + * Gets the backing queue of this {@link Packet}. + * + * @return A {@link Deque>}. + */ + public Deque> getQueue() { + return queue; + } + }