Skip to content

Commit

Permalink
Merge pull request #59 from beverly-hills-money-gangster/feature/asyn…
Browse files Browse the repository at this point in the history
…c_client_connection

Async client connection
  • Loading branch information
beverly-hills-money-gangster authored Apr 17, 2024
2 parents 0ead294 + e75dc12 commit 405a914
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 52 deletions.
8 changes: 4 additions & 4 deletions net-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
<parent>
<groupId>com.beverly.hills.money.gang</groupId>
<artifactId>daikombat-server</artifactId>
<version>5.0.0</version>
<version>5.0.1</version>
</parent>

<artifactId>net-client</artifactId>
<version>5.0.0</version>
<version>5.0.1</version>

<properties>
<maven.compiler.source>14</maven.compiler.source>
Expand Down Expand Up @@ -65,12 +65,12 @@
<dependency>
<groupId>com.beverly.hills.money.gang</groupId>
<artifactId>schema</artifactId>
<version>5.0.0</version>
<version>5.0.1</version>
</dependency>
<dependency>
<groupId>com.beverly.hills.money.gang</groupId>
<artifactId>security</artifactId>
<version>5.0.0</version>
<version>5.0.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.maven/maven-model -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.netty.util.concurrent.EventExecutorGroup;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -78,14 +79,17 @@ public class GameConnection {

private final ServerHMACService hmacService;

private final Channel channel;
private final AtomicReference<Channel> channelRef = new AtomicReference<>();

private final EventLoopGroup group;
private final CountDownLatch connectedLatch = new CountDownLatch(1);

private final EventLoopGroup group;

private final AtomicReference<GameConnectionState> state = new AtomicReference<>();

public GameConnection(final GameServerCreds gameServerCreds) throws IOException {
public GameConnection(
final GameServerCreds gameServerCreds,
final Runnable onConnected) throws IOException {
this.hmacService = new ServerHMACService(gameServerCreds.getPassword());
LOG.info("Start connecting");
state.set(GameConnectionState.CONNECTING);
Expand Down Expand Up @@ -162,34 +166,63 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
}
});
long startTime = System.currentTimeMillis();
this.channel = bootstrap.connect(
bootstrap.connect(
gameServerCreds.getHostPort().getHost(),
gameServerCreds.getHostPort().getPort()).sync().channel();
LOG.info("Connected to server in {} mls. Fast TCP enabled: {}",
System.currentTimeMillis() - startTime, ClientConfig.FAST_TCP);
pingScheduler.scheduleAtFixedRate(() -> {
try {
if (hasPendingPing.get()) {
LOG.warn("Old ping request is still pending");
return;
}
hasPendingPing.set(true);
pingRequestedTimeMls.set(System.currentTimeMillis());
writeToChannel(PING);
} catch (Exception e) {
LOG.error("Can't ping server", e);
}
}, ClientConfig.SERVER_MAX_INACTIVE_MLS / 3, ClientConfig.SERVER_MAX_INACTIVE_MLS / 3,
TimeUnit.MILLISECONDS);
gameServerCreds.getHostPort().getPort()).addListener((ChannelFutureListener) future -> {
channelRef.set(future.channel());
if (future.isSuccess()) {
LOG.info("Connected to server in {} mls. Fast TCP enabled: {}",
System.currentTimeMillis() - startTime, ClientConfig.FAST_TCP);
schedulePing();
state.set(GameConnectionState.CONNECTED);
connectedLatch.countDown();
onConnected.run();
} else {
LOG.error("Error occurred", future.cause());
errorsQueueAPI.push(future.cause());
disconnect();
}
});

state.set(GameConnectionState.CONNECTED);
} catch (Exception e) {
LOG.error("Error occurred", e);
disconnect();
throw new IOException("Can't connect to " + gameServerCreds.getHostPort(), e);
}
}

public GameConnection(
final GameServerCreds gameServerCreds) throws IOException {
this(gameServerCreds, () -> {
});
}

public boolean waitUntilConnected(int timeoutMls) throws InterruptedException {
return connectedLatch.await(timeoutMls, TimeUnit.MILLISECONDS);
}

public boolean waitUntilConnected() throws InterruptedException {
return waitUntilConnected(60_000);
}

private void schedulePing() {
pingScheduler.scheduleAtFixedRate(() -> {
try {
if (hasPendingPing.get()) {
LOG.warn("Old ping request is still pending");
return;
}
hasPendingPing.set(true);
pingRequestedTimeMls.set(System.currentTimeMillis());
writeToChannel(PING);
} catch (Exception e) {
LOG.error("Can't ping server", e);
}
}, ClientConfig.SERVER_MAX_INACTIVE_MLS / 3,
ClientConfig.SERVER_MAX_INACTIVE_MLS / 3,
TimeUnit.MILLISECONDS);
}

public void shutdownPingScheduler() {
try {
pingScheduler.shutdown();
Expand Down Expand Up @@ -245,14 +278,20 @@ private void writeLocal(GeneratedMessageV3 command) {
}

private void writeToChannel(ServerCommand serverCommand) {
if (state.get() != GameConnectionState.CONNECTED) {
LOG.warn("Can't write to closed channel");
return;
}
LOG.debug("Write {}", serverCommand);
channel.writeAndFlush(serverCommand).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
errorsQueueAPI.push(new IOException("Failed to write command " + serverCommand));
}
Optional.ofNullable(channelRef.get()).ifPresent(channel -> {
channel.writeAndFlush(serverCommand).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
errorsQueueAPI.push(new IOException("Failed to write command " + serverCommand));
}
});
networkStats.incSentMessages();
networkStats.addOutboundPayloadBytes(serverCommand.getSerializedSize());
});
networkStats.incSentMessages();
networkStats.addOutboundPayloadBytes(serverCommand.getSerializedSize());
}

public void disconnect() {
Expand All @@ -268,7 +307,7 @@ public void disconnect() {
LOG.error("Can't shutdown bootstrap group", e);
}
try {
Optional.ofNullable(channel).ifPresent(ChannelOutboundInvoker::close);
Optional.ofNullable(channelRef.get()).ifPresent(ChannelOutboundInvoker::close);
} catch (Exception e) {
LOG.error("Can't close channel", e);
}
Expand Down Expand Up @@ -302,7 +341,6 @@ public NetworkStatsReader getNetworkStats() {
return networkStats;
}


private enum GameConnectionState {
CONNECTING, CONNECTED, DISCONNECTING, DISCONNECTED
}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<groupId>com.beverly.hills.money.gang</groupId>
<artifactId>daikombat-server</artifactId>
<version>5.0.0</version>
<version>5.0.1</version>
<packaging>pom</packaging>

<modules>
Expand Down
4 changes: 2 additions & 2 deletions schema/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
<parent>
<groupId>com.beverly.hills.money.gang</groupId>
<artifactId>daikombat-server</artifactId>
<version>5.0.0</version>
<version>5.0.1</version>
</parent>

<artifactId>schema</artifactId>
<version>5.0.0</version>
<version>5.0.1</version>

<properties>
<maven.compiler.source>14</maven.compiler.source>
Expand Down
4 changes: 2 additions & 2 deletions security/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
<parent>
<groupId>com.beverly.hills.money.gang</groupId>
<artifactId>daikombat-server</artifactId>
<version>5.0.0</version>
<version>5.0.1</version>
</parent>

<artifactId>security</artifactId>
<version>5.0.0</version>
<version>5.0.1</version>

<properties>
<maven.compiler.source>14</maven.compiler.source>
Expand Down
10 changes: 5 additions & 5 deletions server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
<parent>
<groupId>com.beverly.hills.money.gang</groupId>
<artifactId>daikombat-server</artifactId>
<version>5.0.0</version>
<version>5.0.1</version>
</parent>

<artifactId>server</artifactId>
<version>5.0.0</version>
<version>5.0.1</version>

<properties>
<maven.compiler.source>14</maven.compiler.source>
Expand Down Expand Up @@ -78,12 +78,12 @@
<dependency>
<groupId>com.beverly.hills.money.gang</groupId>
<artifactId>schema</artifactId>
<version>5.0.0</version>
<version>5.0.1</version>
</dependency>
<dependency>
<groupId>com.beverly.hills.money.gang</groupId>
<artifactId>security</artifactId>
<version>5.0.0</version>
<version>5.0.1</version>
</dependency>
<dependency>
<groupId>eu.rekawek.toxiproxy</groupId>
Expand Down Expand Up @@ -122,7 +122,7 @@
<dependency>
<groupId>com.beverly.hills.money.gang</groupId>
<artifactId>net-client</artifactId>
<version>5.0.0</version>
<version>5.0.1</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ protected GameConnection createGameConnection(String password, String host, int
.hostPort(HostPort.builder().host(host).port(port).build())
.build());
gameConnections.add(gameConnection);
try {
gameConnection.waitUntilConnected(5_000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return gameConnection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.beverly.hills.money.gang.config.ServerConfig;
Expand All @@ -11,6 +11,8 @@
import com.beverly.hills.money.gang.proto.JoinGameCommand;
import com.beverly.hills.money.gang.proto.ServerResponse;
import java.io.IOException;
import java.net.ConnectException;
import java.net.UnknownHostException;
import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.SetEnvironmentVariable;

Expand Down Expand Up @@ -104,9 +106,9 @@ public void testDisconnectTwice() throws IOException, InterruptedException {
* @then an exception is thrown
*/
@Test
public void testGetServerInfoNotExistingServer() {
assertThrows(IOException.class,
() -> createGameConnection(ServerConfig.PIN_CODE, "666.666.666.666", port));
public void testGetServerInfoNotExistingServer() throws IOException {
var connection = createGameConnection(ServerConfig.PIN_CODE, "666.666.666.666", port);
assertInstanceOf(UnknownHostException.class, connection.getErrors().poll().get());
}

/**
Expand All @@ -115,8 +117,8 @@ public void testGetServerInfoNotExistingServer() {
* @then an exception is thrown
*/
@Test
public void testGetServerInfoWrongPort() {
assertThrows(IOException.class,
() -> createGameConnection(ServerConfig.PIN_CODE, "localhost", 666));
public void testGetServerInfoWrongPort() throws IOException {
var connection = createGameConnection(ServerConfig.PIN_CODE, "localhost", 666);
assertInstanceOf(ConnectException.class, connection.getErrors().poll().get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void testJoinGameAfterManyPlayersJoined() throws Exception {
* @when a player connects to a server using wrong game id
* @then the player is not connected
*/
@Test
@RepeatedTest(16)
public void testJoinGameNotExistingGame() throws IOException {
int gameIdToConnectTo = 666;
GameConnection gameConnection = createGameConnection(ServerConfig.PIN_CODE, "localhost", port);
Expand Down

0 comments on commit 405a914

Please sign in to comment.