Skip to content

Commit

Permalink
[DE-725] Bugfix VST resilience (#529)
Browse files Browse the repository at this point in the history
* increased wait time in resilience tests

* CI: disable tests for draft PRs

* refactoring VstCommunication

* refactoring VstConnection

* auth fail test

* unify VST and HTTP communication

* test fix

* rm disabled consumer thread tests
  • Loading branch information
rashtao authored Dec 1, 2023
1 parent 3e80267 commit 3416000
Show file tree
Hide file tree
Showing 22 changed files with 420 additions and 550 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/resilience.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ jobs:
env:
TOXIPROXY_VERSION: v2.7.0

strategy:
fail-fast: false

steps:
- uses: actions/checkout@v2
- name: Set up JDK
Expand Down
7 changes: 7 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ on:
jobs:

test:
if: '! github.event.pull_request.draft'
timeout-minutes: 20
runs-on: ubuntu-latest

Expand Down Expand Up @@ -63,6 +64,7 @@ jobs:
run: mvn --no-transfer-progress -am -pl driver test -DargLine="-Duser.language=${{matrix.user-language}}"

test-ssl:
if: '! github.event.pull_request.draft'
timeout-minutes: 10
runs-on: ubuntu-latest

Expand Down Expand Up @@ -98,6 +100,7 @@ jobs:

# test encodeURIComponent() and normalize('NFC') comparing to Javascript behavior
test-graalvm:
if: '! github.event.pull_request.draft'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
Expand All @@ -113,6 +116,7 @@ jobs:
run: mvn -e --no-transfer-progress -am -pl driver test -Dtest=graalvm.UnicodeUtilsTest -Dsurefire.failIfNoSpecifiedTests=false

test-jwt:
if: '! github.event.pull_request.draft'
timeout-minutes: 20
runs-on: ubuntu-latest

Expand Down Expand Up @@ -160,6 +164,7 @@ jobs:
run: mvn --no-transfer-progress -am -pl driver test -DargLine="-Duser.language=${{matrix.user-language}}"

jackson-test:
if: '! github.event.pull_request.draft'
timeout-minutes: 20
runs-on: ubuntu-latest

Expand Down Expand Up @@ -205,6 +210,7 @@ jobs:
run: mvn --no-transfer-progress -am -pl driver test -Dadb.jackson.version=${{matrix.jackson-version}}

integration-tests:
if: '! github.event.pull_request.draft'
timeout-minutes: 20
runs-on: ubuntu-latest

Expand Down Expand Up @@ -250,6 +256,7 @@ jobs:
run: mvn --no-transfer-progress -Pplain test

sonar:
if: '! github.event.pull_request.draft'
timeout-minutes: 10
runs-on: ubuntu-latest

Expand Down
161 changes: 161 additions & 0 deletions core/src/main/java/com/arangodb/internal/net/Communication.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package com.arangodb.internal.net;

import com.arangodb.ArangoDBException;
import com.arangodb.config.HostDescription;
import com.arangodb.internal.InternalRequest;
import com.arangodb.internal.InternalResponse;
import com.arangodb.internal.RequestType;
import com.arangodb.internal.config.ArangoConfig;
import com.arangodb.internal.serde.InternalSerde;
import com.arangodb.internal.util.HostUtils;
import com.arangodb.internal.util.RequestUtils;
import com.arangodb.internal.util.ResponseUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

public abstract class Communication implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(Communication.class);
protected final HostHandler hostHandler;
protected final InternalSerde serde;
private final AtomicLong reqCount;


protected Communication(final ArangoConfig config, final HostHandler hostHandler) {
this.hostHandler = hostHandler;
serde = config.getInternalSerde();
reqCount = new AtomicLong();
}

protected abstract void connect(final Connection conn) throws IOException;

@Override
public void close() throws IOException {
hostHandler.close();
}

public CompletableFuture<InternalResponse> executeAsync(final InternalRequest request, final HostHandle hostHandle) {
return executeAsync(request, hostHandle, hostHandler.get(hostHandle, RequestUtils.determineAccessType(request)), 0);
}

private CompletableFuture<InternalResponse> executeAsync(final InternalRequest request, final HostHandle hostHandle, final Host host, final int attemptCount) {
long reqId = reqCount.getAndIncrement();
return doExecuteAsync(request, hostHandle, host, attemptCount, host.connection(), reqId);
}

private CompletableFuture<InternalResponse> doExecuteAsync(
final InternalRequest request, final HostHandle hostHandle, final Host host, final int attemptCount, Connection connection, long reqId
) {
if (LOGGER.isDebugEnabled()) {
String body = request.getBody() == null ? "" : serde.toJsonString(request.getBody());
LOGGER.debug("Send Request [id={}]: {} {}", reqId, request, body);
}
final CompletableFuture<InternalResponse> rfuture = new CompletableFuture<>();
try {
connect(connection);
} catch (IOException e) {
handleException(true, e, hostHandle, request, host, reqId, attemptCount, rfuture);
return rfuture;
}

connection.executeAsync(request)
.whenComplete((response, e) -> {
try {
if (e instanceof SocketTimeoutException) {
// SocketTimeoutException exceptions are wrapped and rethrown.
TimeoutException te = new TimeoutException(e.getMessage());
te.initCause(e);
rfuture.completeExceptionally(ArangoDBException.of(te, reqId));
} else if (e instanceof TimeoutException) {
rfuture.completeExceptionally(ArangoDBException.of(e, reqId));
} else if (e instanceof ConnectException) {
handleException(true, e, hostHandle, request, host, reqId, attemptCount, rfuture);
} else if (e != null) {
handleException(isSafe(request), e, hostHandle, request, host, reqId, attemptCount, rfuture);
} else {
if (LOGGER.isDebugEnabled()) {
String body = response.getBody() == null ? "" : serde.toJsonString(response.getBody());
LOGGER.debug("Received Response [id={}]: {} {}", reqId, response, body);
}
ArangoDBException errorEntityEx = ResponseUtils.translateError(serde, response);
if (errorEntityEx instanceof ArangoDBRedirectException) {
if (attemptCount >= 3) {
rfuture.completeExceptionally(errorEntityEx);
} else {
final String location = ((ArangoDBRedirectException) errorEntityEx).getLocation();
final HostDescription redirectHost = HostUtils.createFromLocation(location);
hostHandler.failIfNotMatch(redirectHost, errorEntityEx);
mirror(
executeAsync(request, new HostHandle().setHost(redirectHost), hostHandler.get(hostHandle, RequestUtils.determineAccessType(request)), attemptCount + 1),
rfuture
);
}
} else if (errorEntityEx != null) {
rfuture.completeExceptionally(errorEntityEx);
} else {
hostHandler.success();
rfuture.complete(response);
}
}
} catch (Exception ex) {
rfuture.completeExceptionally(ArangoDBException.of(ex, reqId));
}
});
return rfuture;
}

private void handleException(boolean isSafe, Throwable e, HostHandle hostHandle, InternalRequest request, Host host,
long reqId, int attemptCount, CompletableFuture<InternalResponse> rfuture) {
IOException ioEx = wrapIOEx(e);
hostHandler.fail(ioEx);
if (hostHandle != null && hostHandle.getHost() != null) {
hostHandle.setHost(null);
}
hostHandler.checkNext(hostHandle, RequestUtils.determineAccessType(request));
if (isSafe) {
Host nextHost = hostHandler.get(hostHandle, RequestUtils.determineAccessType(request));
LOGGER.warn("Could not connect to {} while executing request [id={}]",
host.getDescription(), reqId, ioEx);
LOGGER.debug("Try connecting to {}", nextHost.getDescription());
mirror(
executeAsync(request, hostHandle, nextHost, attemptCount),
rfuture
);
} else {
ArangoDBException aEx = ArangoDBException.of(ioEx, reqId);
rfuture.completeExceptionally(aEx);
}
}

private void mirror(CompletableFuture<InternalResponse> up, CompletableFuture<InternalResponse> down) {
up.whenComplete((v, err) -> {
if (err != null) {
down.completeExceptionally(err instanceof CompletionException ? err.getCause() : err);
} else {
down.complete(v);
}
});
}

private static IOException wrapIOEx(Throwable t) {
if (t instanceof IOException) {
return (IOException) t;
} else {
return new IOException(t);
}
}

private boolean isSafe(final InternalRequest request) {
RequestType type = request.getRequestType();
return type == RequestType.GET || type == RequestType.HEAD || type == RequestType.OPTIONS;
}

}
6 changes: 6 additions & 0 deletions core/src/main/java/com/arangodb/internal/net/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@

package com.arangodb.internal.net;

import com.arangodb.internal.InternalRequest;
import com.arangodb.internal.InternalResponse;

import java.io.Closeable;
import java.util.concurrent.CompletableFuture;

/**
* @author Mark Vollmary
*/
public interface Connection extends Closeable {
void setJwt(String jwt);

CompletableFuture<InternalResponse> executeAsync(InternalRequest request);
}
88 changes: 0 additions & 88 deletions driver/src/test/java/com/arangodb/ConsumerThreadAsyncTest.java
Original file line number Diff line number Diff line change
@@ -1,101 +1,13 @@
package com.arangodb;

import com.arangodb.config.ArangoConfigProperties;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.assertj.core.api.Assertions.assertThat;

public class ConsumerThreadAsyncTest extends BaseJunit5 {

private volatile Thread thread;

private void setThread() {
thread = Thread.currentThread();
}

private void sleep() {
try {
Thread.sleep(3_000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@ParameterizedTest
@EnumSource(Protocol.class)
@Disabled
void defaultConsumerThread(Protocol protocol) throws ExecutionException, InterruptedException {
ArangoDBAsync adb = new ArangoDB.Builder()
.loadProperties(ArangoConfigProperties.fromFile())
.protocol(protocol)
.build()
.async();

adb.getVersion()
.thenAccept(it -> setThread())
.get();

adb.shutdown();

if (Protocol.VST.equals(protocol)) {
assertThat(thread.getName()).startsWith("adb-vst-");
} else {
assertThat(thread.getName()).startsWith("adb-http-");
}
}

@ParameterizedTest
@EnumSource(Protocol.class)
void customConsumerExecutor(Protocol protocol) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newCachedThreadPool(r -> {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setName("custom-" + UUID.randomUUID());
return t;
});
ArangoDBAsync adb = new ArangoDB.Builder()
.loadProperties(ArangoConfigProperties.fromFile())
.protocol(protocol)
.asyncExecutor(es)
.build()
.async();

adb.getVersion()
.thenAccept(it -> setThread())
.get();

adb.shutdown();
es.shutdown();
assertThat(thread.getName()).startsWith("custom-");
}

/**
* Generates warns from Vert.x BlockedThreadChecker
*/
@ParameterizedTest
@EnumSource(Protocol.class)
@Disabled
void sleepOnDefaultConsumerThread(Protocol protocol) throws ExecutionException, InterruptedException {
ArangoDBAsync adb = new ArangoDB.Builder()
.loadProperties(ArangoConfigProperties.fromFile())
.protocol(protocol)
.maxConnections(1)
.build()
.async();

adb.getVersion()
.thenAccept(it -> sleep())
.get();

adb.shutdown();
}

@ParameterizedTest
@EnumSource(Protocol.class)
void nestedRequests(Protocol protocol) throws ExecutionException, InterruptedException {
Expand Down
Loading

0 comments on commit 3416000

Please sign in to comment.