Skip to content

Commit

Permalink
Active Failover Asynchronous driver (#381)
Browse files Browse the repository at this point in the history
* fixed active failover communication in async driver

* fixed active failover dirty read test

* fixed active failover communication in async driver

* fixed async StreamTransactionGraphTest

* fixed race conditions at VST MessageStore.clear

* enhanced VST connection debug logging

* set default acquireHostList=true for tests

* fixed race condition in ConnectionPool

* added active-failover to GH actions

* fixed some active-failover test failures

* skipping getLogs test for ArangoDB <= 3.6 (BTS-362)

* skipping getLogs test for ArangoDB <= 3.6 (BTS-362)

* documented active failover acquireHostList and loadBalancingStrategy configuration

* changelog upd
  • Loading branch information
rashtao authored Apr 9, 2021
1 parent 339c834 commit 159b9c7
Show file tree
Hide file tree
Showing 19 changed files with 195 additions and 55 deletions.
1 change: 1 addition & 0 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ jobs:
topology:
- single
- cluster
- active-failover

steps:
- uses: actions/checkout@v1
Expand Down
2 changes: 2 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) a

## [Unreleased]

- fixed active failover behavior for the asynchronous driver

## [6.10.0] - 2021-03-27

- closing VST connection after 3 consecutive keepAlive failures (#ES-837)
Expand Down
7 changes: 7 additions & 0 deletions docker/clean_active-failover.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash

for c in server1 \
server2 \
server3; do
docker rm -f $c
done
70 changes: 70 additions & 0 deletions docker/start_db_active-failover.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#!/bin/bash

# USAGE:
# export ARANGO_LICENSE_KEY=<arangodb-enterprise-license>
# ./start_active-failover.sh <dockerImage>

# EXAMPLE:
# ./start_active-failover.sh docker.io/arangodb/arangodb:3.7.10

docker pull "$1"

LOCATION=$(pwd)/$(dirname "$0")

docker network create arangodb --subnet 172.28.0.0/16

echo "Averysecretword" >"$LOCATION"/jwtSecret
docker run --rm -v "$LOCATION"/jwtSecret:/jwtSecret "$1" arangodb auth header --auth.jwt-secret /jwtSecret >"$LOCATION"/jwtHeader
AUTHORIZATION_HEADER=$(cat "$LOCATION"/jwtHeader)

echo "Starting containers..."

docker run -d -v "$LOCATION"/jwtSecret:/jwtSecret -e ARANGO_LICENSE_KEY="$ARANGO_LICENSE_KEY" --network arangodb --ip 172.28.3.1 --name server1 "$1" sh -c 'arangodb --starter.address=$(hostname -i) --starter.mode=activefailover --starter.join server1,server2,server3 --auth.jwt-secret /jwtSecret'
docker run -d -v "$LOCATION"/jwtSecret:/jwtSecret -e ARANGO_LICENSE_KEY="$ARANGO_LICENSE_KEY" --network arangodb --ip 172.28.3.2 --name server2 "$1" sh -c 'arangodb --starter.address=$(hostname -i) --starter.mode=activefailover --starter.join server1,server2,server3 --auth.jwt-secret /jwtSecret'
docker run -d -v "$LOCATION"/jwtSecret:/jwtSecret -e ARANGO_LICENSE_KEY="$ARANGO_LICENSE_KEY" --network arangodb --ip 172.28.3.3 --name server3 "$1" sh -c 'arangodb --starter.address=$(hostname -i) --starter.mode=activefailover --starter.join server1,server2,server3 --auth.jwt-secret /jwtSecret'

debug_container() {
running=$(docker inspect -f '{{.State.Running}}' "$1")

if [ "$running" = false ]; then
echo "$1 is not running!"
echo "---"
docker logs "$1"
echo "---"
exit 1
fi
}

debug() {
for c in server1 \
server2 \
server3; do
debug_container $c
done
}

wait_server() {
# shellcheck disable=SC2091
until $(curl --output /dev/null --silent --head --fail -i -H "$AUTHORIZATION_HEADER" "http://$1/_api/version"); do
printf '.'
debug
sleep 1
done
}

echo "Waiting..."

# Wait for agents:
for a in 172.28.3.1:8529 \
172.28.3.2:8529 \
172.28.3.3:8529; do
wait_server $a
done

docker exec server1 arangosh --server.authentication=false --javascript.execute-string='require("org/arangodb/users").update("root", "test")'
docker exec server2 arangosh --server.authentication=false --javascript.execute-string='require("org/arangodb/users").update("root", "test")'
docker exec server3 arangosh --server.authentication=false --javascript.execute-string='require("org/arangodb/users").update("root", "test")'

#rm "$LOCATION"/jwtHeader "$LOCATION"/jwtSecret

echo "Done, your cluster is ready."
16 changes: 16 additions & 0 deletions docker/start_db_active-failover_retry_fail.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash

# USAGE:
# export ARANGO_LICENSE_KEY=<arangodb-enterprise-license>
# ./docker/start_db_active-failover_retry_fail.sh <dockerImage>

# EXAMPLE:
# ./docker/start_db_active-failover_retry_fail.sh docker.io/arangodb/arangodb:3.7.10

./docker/start_db_active-failover.sh "$1"
while [ $? -ne 0 ]; do
echo "=== === ==="
echo "active-failover startup failed, retrying ..."
./docker/clean_active-failover.sh
./docker/start_db_active-failover.sh "$1"
done
3 changes: 3 additions & 0 deletions src/main/java/com/arangodb/ArangoDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ public Builder keepAliveInterval(final Integer keepAliveInterval) {
/**
* Whether or not the driver should acquire a list of available coordinators in an ArangoDB cluster or a single
* server with active failover.
* In case of Active-Failover deployment set to {@code true} to enable automatic master discovery.
*
* <p>
* The host list will be used for failover and load balancing.
Expand All @@ -312,6 +313,8 @@ public Builder acquireHostListInterval(final Integer acquireHostListInterval) {

/**
* Sets the load balancing strategy to be used in an ArangoDB cluster setup.
* In case of Active-Failover deployment set to {@link LoadBalancingStrategy#NONE} or not set at all, since that
* would be the default.
*
* @param loadBalancingStrategy the load balancing strategy to be used (default: {@link LoadBalancingStrategy#NONE}
* @return {@link ArangoDB.Builder}
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/arangodb/async/ArangoDBAsync.java
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ public Builder keepAliveInterval(final Integer keepAliveInterval) {
/**
* Whether or not the driver should acquire a list of available coordinators in an ArangoDB cluster or a single
* server with active failover.
* In case of Active-Failover deployment set to {@code true} to enable automatic master discovery.
*
* <p>
* The host list will be used for failover and load balancing.
Expand All @@ -457,6 +458,8 @@ public Builder acquireHostList(final Boolean acquireHostList) {

/**
* Sets the load balancing strategy to be used in an ArangoDB cluster setup.
* In case of Active-Failover deployment set to {@link LoadBalancingStrategy#NONE} or not set at all, since that
* would be the default.
*
* @param loadBalancingStrategy the load balancing strategy to be used (default: {@link LoadBalancingStrategy#NONE}
* @return {@link ArangoDBAsync.Builder}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
package com.arangodb.async.internal.velocystream;

import com.arangodb.ArangoDBException;
import com.arangodb.entity.ErrorEntity;
import com.arangodb.internal.net.ArangoDBRedirectException;
import com.arangodb.internal.net.HostDescription;
import com.arangodb.internal.net.HostHandle;
import com.arangodb.internal.net.HostHandler;
import com.arangodb.internal.util.HostUtils;
import com.arangodb.internal.velocystream.VstCommunication;
import com.arangodb.internal.velocystream.internal.AuthenticationRequest;
import com.arangodb.internal.velocystream.internal.Message;
Expand Down Expand Up @@ -58,23 +61,37 @@ protected CompletableFuture<Response> execute(final Request request, final VstCo
final Message message = createMessage(request);
send(message, connection).whenComplete((m, ex) -> {
if (m != null) {
final Response response;
try {
final Response response = createResponse(m);
if (response.getResponseCode() >= 300) {
if (response.getBody() != null) {
final ErrorEntity errorEntity = util.deserialize(response.getBody(), ErrorEntity.class);
rfuture.completeExceptionally(new ArangoDBException(errorEntity));
} else {
rfuture.completeExceptionally(new ArangoDBException(
String.format("Response Code: %s", response.getResponseCode()), response.getResponseCode()));
}
} else {
rfuture.complete(response);
}
response = createResponse(m);
} catch (final VPackParserException e) {
LOGGER.error(e.getMessage(), e);
rfuture.completeExceptionally(e);
return;
}

try {
checkError(response);
} catch (final ArangoDBRedirectException e) {
final String location = e.getLocation();
final HostDescription redirectHost = HostUtils.createFromLocation(location);
hostHandler.closeCurrentOnError();
hostHandler.fail();
execute(request, new HostHandle().setHost(redirectHost))
.whenComplete((v, err) -> {
if (v != null) {
rfuture.complete(v);
} else if (err != null) {
rfuture.completeExceptionally(err);
} else {
rfuture.cancel(true);
}
});
return;
} catch (ArangoDBException e) {
rfuture.completeExceptionally(e);
}
rfuture.complete(response);
} else if (ex != null) {
LOGGER.error(ex.getMessage(), ex);
rfuture.completeExceptionally(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public synchronized Connection connection() {
}

@Override
public void close() throws IOException {
public synchronized void close() throws IOException {
for (final Connection connection : connections) {
connection.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,9 @@
import com.arangodb.ArangoDBException;
import com.arangodb.internal.ArangoDefaults;
import com.arangodb.internal.net.AccessType;
import com.arangodb.internal.net.ArangoDBRedirectException;
import com.arangodb.internal.net.Host;
import com.arangodb.internal.net.HostDescription;
import com.arangodb.internal.net.HostHandle;
import com.arangodb.internal.net.HostHandler;
import com.arangodb.internal.util.HostUtils;
import com.arangodb.internal.util.RequestUtils;
import com.arangodb.internal.util.ResponseUtils;
import com.arangodb.internal.velocystream.internal.Chunk;
Expand Down Expand Up @@ -64,7 +61,7 @@ public abstract class VstCommunication<R, C extends VstConnection> implements Cl
protected final String password;

protected final Integer chunksize;
private final HostHandler hostHandler;
protected final HostHandler hostHandler;

protected VstCommunication(final Integer timeout, final String user, final String password, final Boolean useSsl,
final SSLContext sslContext, final ArangoSerialization util, final Integer chunksize,
Expand Down Expand Up @@ -134,20 +131,8 @@ public void close() throws IOException {
}

public R execute(final Request request, final HostHandle hostHandle) throws ArangoDBException {
try {
final C connection = connect(hostHandle, RequestUtils.determineAccessType(request));
return execute(request, connection);
} catch (final ArangoDBException e) {
if (e instanceof ArangoDBRedirectException) {
final String location = ((ArangoDBRedirectException) e).getLocation();
final HostDescription redirectHost = HostUtils.createFromLocation(location);
hostHandler.closeCurrentOnError();
hostHandler.fail();
return execute(request, new HostHandle().setHost(redirectHost));
} else {
throw e;
}
}
final C connection = connect(hostHandle, RequestUtils.determineAccessType(request));
return execute(request, connection);
}

protected abstract R execute(final Request request, C connection) throws ArangoDBException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
package com.arangodb.internal.velocystream;

import com.arangodb.ArangoDBException;
import com.arangodb.internal.net.ArangoDBRedirectException;
import com.arangodb.internal.net.HostDescription;
import com.arangodb.internal.net.HostHandle;
import com.arangodb.internal.net.HostHandler;
import com.arangodb.internal.util.HostUtils;
import com.arangodb.internal.velocystream.internal.AuthenticationRequest;
import com.arangodb.internal.velocystream.internal.Message;
import com.arangodb.internal.velocystream.internal.VstConnectionSync;
Expand Down Expand Up @@ -127,6 +131,12 @@ protected Response execute(final Request request, final VstConnectionSync connec
return response;
} catch (final VPackParserException e) {
throw new ArangoDBException(e);
} catch (final ArangoDBRedirectException e) {
final String location = e.getLocation();
final HostDescription redirectHost = HostUtils.createFromLocation(location);
hostHandler.closeCurrentOnError();
hostHandler.fail();
return execute(request, new HostHandle().setHost(redirectHost));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void cancel(final long messageId) {
}
}

public void clear(final Exception e) {
public synchronized void clear(final Exception e) {
if (!task.isEmpty()) {
LOGGER.error(e.getMessage(), e);
}
Expand All @@ -96,7 +96,7 @@ public void clear(final Exception e) {
task.clear();
}

public void clear() {
public synchronized void clear() {
for (final Entry<Long, FutureTask<Message>> entry : task.entrySet()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("Cancel Message (id=%s).", entry.getKey()));
Expand Down
Loading

0 comments on commit 159b9c7

Please sign in to comment.