Skip to content

Commit

Permalink
fix(client): close client if service is unavailable (#123)
Browse files Browse the repository at this point in the history
* close the client if it lost the connection to the Hazelcast server and reached the connection timeout
  • Loading branch information
saig0 authored Aug 2, 2021
1 parent a8f605f commit eec996b
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.zeebe.hazelcast.connect.java;

import com.google.protobuf.InvalidProtocolBufferException;
import com.hazelcast.client.HazelcastClientNotActiveException;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.ringbuffer.Ringbuffer;
import com.hazelcast.ringbuffer.StaleSequenceException;
Expand Down Expand Up @@ -76,6 +77,10 @@ private void start() {
future = executorService.submit(this::readFromBuffer);
}

public boolean isClosed() {
return isClosed;
}

/** Stop reading from the ringbuffer. */
@Override
public void close() throws Exception {
Expand Down Expand Up @@ -146,6 +151,15 @@ private void readNext() {

sequence = headSequence;

} catch (HazelcastClientNotActiveException e) {
LOGGER.warn("Lost connection to the Hazelcast server", e);

try {
close();
} catch (Exception closingFailure) {
LOGGER.debug("Failure while closing the client", closingFailure);
}

} catch (InterruptedException e) {
LOGGER.debug("Interrupted while reading from ring-buffer with sequence '{}'", sequence);
throw new RuntimeException("Interrupted while reading from ring-buffer", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package io.zeebe.hazelcast;

import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import io.zeebe.hazelcast.connect.java.ZeebeHazelcast;
import io.zeebe.hazelcast.exporter.ExporterConfiguration;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.time.Duration;

import static org.assertj.core.api.Assertions.assertThat;

public class ZeebeHazelcastClientTest {

private static final ExporterConfiguration CONFIGURATION = new ExporterConfiguration();
private static final Duration CONNECTION_TIMEOUT = Duration.ofSeconds(5);

private ZeebeHazelcast zeebeHazelcast;

private HazelcastInstance hzInstance;
private HazelcastInstance hzClient;

@Before
public void init() {
final Config config = new Config();
config.getNetworkConfig().setPort(5702);
hzInstance = Hazelcast.newHazelcastInstance(config);

final ClientConfig clientConfig = new ClientConfig();

final var connectionRetryConfig =
clientConfig.getConnectionStrategyConfig().getConnectionRetryConfig();
connectionRetryConfig.setClusterConnectTimeoutMillis(CONNECTION_TIMEOUT.toMillis());

clientConfig.getNetworkConfig().addAddress("127.0.0.1:5702");
hzClient = HazelcastClient.newHazelcastClient(clientConfig);
}

@After
public void cleanUp() throws Exception {
zeebeHazelcast.close();
hzClient.shutdown();
hzInstance.shutdown();
}

@Test
public void shouldCloseIfHazelcastIsUnavailable() {
// given
zeebeHazelcast = ZeebeHazelcast.newBuilder(hzClient).build();

// when
hzInstance.shutdown();

// then
Awaitility.await()
.atMost(CONNECTION_TIMEOUT.multipliedBy(2))
.untilAsserted(() -> assertThat(zeebeHazelcast.isClosed()).isTrue());
}
}

0 comments on commit eec996b

Please sign in to comment.