From 34160009e5d7359bc96f76aa4784560ef024d7a1 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Fri, 1 Dec 2023 21:49:09 +0100 Subject: [PATCH] [DE-725] Bugfix VST resilience (#529) * increased wait time in resilience tests * CI: disable tests for draft PRs * refactoring VstCommunication * refactoring VstConnection * auth fail test * unify VST and HTTP communication * test fix * rm disabled consumer thread tests --- .github/workflows/resilience.yml | 3 - .github/workflows/test.yml | 7 + .../arangodb/internal/net/Communication.java | 161 ++++++++++++++++ .../com/arangodb/internal/net/Connection.java | 6 + .../com/arangodb/ConsumerThreadAsyncTest.java | 88 --------- .../com/arangodb/http/HttpCommunication.java | 145 +-------------- .../com/arangodb/http/HttpConnection.java | 1 + .../java/com/arangodb/http/HttpProtocol.java | 3 +- .../arangodb/http/HttpProtocolProvider.java | 2 +- .../src/test/java/resilience/ClusterTest.java | 18 +- .../resilience/connection/ConnectionTest.java | 34 ++++ .../LoadBalanceNoneClusterTest.java | 31 +++- .../LoadBalanceRoundRobinClusterTest.java | 34 ++-- .../resilience/retry/RetriableCursorTest.java | 5 +- .../resilience/retry/RetryClusterTest.java | 2 - .../test/java/resilience/retry/RetryTest.java | 4 - .../vstKeepAlive/VstKeepAliveCloseTest.java | 4 +- .../com/arangodb/vst/VstCommunication.java | 173 ++++-------------- .../arangodb/vst/VstCommunicationAsync.java | 146 --------------- .../java/com/arangodb/vst/VstProtocol.java | 6 +- .../com/arangodb/vst/VstProtocolProvider.java | 2 +- .../vst/internal/VstConnectionAsync.java | 95 ++++++++++ 22 files changed, 420 insertions(+), 550 deletions(-) create mode 100644 core/src/main/java/com/arangodb/internal/net/Communication.java delete mode 100644 vst/src/main/java/com/arangodb/vst/VstCommunicationAsync.java diff --git a/.github/workflows/resilience.yml b/.github/workflows/resilience.yml index 1b0017d8a..5972cf031 100644 --- a/.github/workflows/resilience.yml +++ b/.github/workflows/resilience.yml @@ -25,9 +25,6 @@ jobs: env: TOXIPROXY_VERSION: v2.7.0 - strategy: - fail-fast: false - steps: - uses: actions/checkout@v2 - name: Set up JDK diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 8ee4d7254..d2009c359 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -20,6 +20,7 @@ on: jobs: test: + if: '! github.event.pull_request.draft' timeout-minutes: 20 runs-on: ubuntu-latest @@ -63,6 +64,7 @@ jobs: run: mvn --no-transfer-progress -am -pl driver test -DargLine="-Duser.language=${{matrix.user-language}}" test-ssl: + if: '! github.event.pull_request.draft' timeout-minutes: 10 runs-on: ubuntu-latest @@ -98,6 +100,7 @@ jobs: # test encodeURIComponent() and normalize('NFC') comparing to Javascript behavior test-graalvm: + if: '! github.event.pull_request.draft' runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 @@ -113,6 +116,7 @@ jobs: run: mvn -e --no-transfer-progress -am -pl driver test -Dtest=graalvm.UnicodeUtilsTest -Dsurefire.failIfNoSpecifiedTests=false test-jwt: + if: '! github.event.pull_request.draft' timeout-minutes: 20 runs-on: ubuntu-latest @@ -160,6 +164,7 @@ jobs: run: mvn --no-transfer-progress -am -pl driver test -DargLine="-Duser.language=${{matrix.user-language}}" jackson-test: + if: '! github.event.pull_request.draft' timeout-minutes: 20 runs-on: ubuntu-latest @@ -205,6 +210,7 @@ jobs: run: mvn --no-transfer-progress -am -pl driver test -Dadb.jackson.version=${{matrix.jackson-version}} integration-tests: + if: '! github.event.pull_request.draft' timeout-minutes: 20 runs-on: ubuntu-latest @@ -250,6 +256,7 @@ jobs: run: mvn --no-transfer-progress -Pplain test sonar: + if: '! github.event.pull_request.draft' timeout-minutes: 10 runs-on: ubuntu-latest diff --git a/core/src/main/java/com/arangodb/internal/net/Communication.java b/core/src/main/java/com/arangodb/internal/net/Communication.java new file mode 100644 index 000000000..781da6336 --- /dev/null +++ b/core/src/main/java/com/arangodb/internal/net/Communication.java @@ -0,0 +1,161 @@ +package com.arangodb.internal.net; + +import com.arangodb.ArangoDBException; +import com.arangodb.config.HostDescription; +import com.arangodb.internal.InternalRequest; +import com.arangodb.internal.InternalResponse; +import com.arangodb.internal.RequestType; +import com.arangodb.internal.config.ArangoConfig; +import com.arangodb.internal.serde.InternalSerde; +import com.arangodb.internal.util.HostUtils; +import com.arangodb.internal.util.RequestUtils; +import com.arangodb.internal.util.ResponseUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.net.ConnectException; +import java.net.SocketTimeoutException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; + +public abstract class Communication implements Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(Communication.class); + protected final HostHandler hostHandler; + protected final InternalSerde serde; + private final AtomicLong reqCount; + + + protected Communication(final ArangoConfig config, final HostHandler hostHandler) { + this.hostHandler = hostHandler; + serde = config.getInternalSerde(); + reqCount = new AtomicLong(); + } + + protected abstract void connect(final Connection conn) throws IOException; + + @Override + public void close() throws IOException { + hostHandler.close(); + } + + public CompletableFuture executeAsync(final InternalRequest request, final HostHandle hostHandle) { + return executeAsync(request, hostHandle, hostHandler.get(hostHandle, RequestUtils.determineAccessType(request)), 0); + } + + private CompletableFuture executeAsync(final InternalRequest request, final HostHandle hostHandle, final Host host, final int attemptCount) { + long reqId = reqCount.getAndIncrement(); + return doExecuteAsync(request, hostHandle, host, attemptCount, host.connection(), reqId); + } + + private CompletableFuture doExecuteAsync( + final InternalRequest request, final HostHandle hostHandle, final Host host, final int attemptCount, Connection connection, long reqId + ) { + if (LOGGER.isDebugEnabled()) { + String body = request.getBody() == null ? "" : serde.toJsonString(request.getBody()); + LOGGER.debug("Send Request [id={}]: {} {}", reqId, request, body); + } + final CompletableFuture rfuture = new CompletableFuture<>(); + try { + connect(connection); + } catch (IOException e) { + handleException(true, e, hostHandle, request, host, reqId, attemptCount, rfuture); + return rfuture; + } + + connection.executeAsync(request) + .whenComplete((response, e) -> { + try { + if (e instanceof SocketTimeoutException) { + // SocketTimeoutException exceptions are wrapped and rethrown. + TimeoutException te = new TimeoutException(e.getMessage()); + te.initCause(e); + rfuture.completeExceptionally(ArangoDBException.of(te, reqId)); + } else if (e instanceof TimeoutException) { + rfuture.completeExceptionally(ArangoDBException.of(e, reqId)); + } else if (e instanceof ConnectException) { + handleException(true, e, hostHandle, request, host, reqId, attemptCount, rfuture); + } else if (e != null) { + handleException(isSafe(request), e, hostHandle, request, host, reqId, attemptCount, rfuture); + } else { + if (LOGGER.isDebugEnabled()) { + String body = response.getBody() == null ? "" : serde.toJsonString(response.getBody()); + LOGGER.debug("Received Response [id={}]: {} {}", reqId, response, body); + } + ArangoDBException errorEntityEx = ResponseUtils.translateError(serde, response); + if (errorEntityEx instanceof ArangoDBRedirectException) { + if (attemptCount >= 3) { + rfuture.completeExceptionally(errorEntityEx); + } else { + final String location = ((ArangoDBRedirectException) errorEntityEx).getLocation(); + final HostDescription redirectHost = HostUtils.createFromLocation(location); + hostHandler.failIfNotMatch(redirectHost, errorEntityEx); + mirror( + executeAsync(request, new HostHandle().setHost(redirectHost), hostHandler.get(hostHandle, RequestUtils.determineAccessType(request)), attemptCount + 1), + rfuture + ); + } + } else if (errorEntityEx != null) { + rfuture.completeExceptionally(errorEntityEx); + } else { + hostHandler.success(); + rfuture.complete(response); + } + } + } catch (Exception ex) { + rfuture.completeExceptionally(ArangoDBException.of(ex, reqId)); + } + }); + return rfuture; + } + + private void handleException(boolean isSafe, Throwable e, HostHandle hostHandle, InternalRequest request, Host host, + long reqId, int attemptCount, CompletableFuture rfuture) { + IOException ioEx = wrapIOEx(e); + hostHandler.fail(ioEx); + if (hostHandle != null && hostHandle.getHost() != null) { + hostHandle.setHost(null); + } + hostHandler.checkNext(hostHandle, RequestUtils.determineAccessType(request)); + if (isSafe) { + Host nextHost = hostHandler.get(hostHandle, RequestUtils.determineAccessType(request)); + LOGGER.warn("Could not connect to {} while executing request [id={}]", + host.getDescription(), reqId, ioEx); + LOGGER.debug("Try connecting to {}", nextHost.getDescription()); + mirror( + executeAsync(request, hostHandle, nextHost, attemptCount), + rfuture + ); + } else { + ArangoDBException aEx = ArangoDBException.of(ioEx, reqId); + rfuture.completeExceptionally(aEx); + } + } + + private void mirror(CompletableFuture up, CompletableFuture down) { + up.whenComplete((v, err) -> { + if (err != null) { + down.completeExceptionally(err instanceof CompletionException ? err.getCause() : err); + } else { + down.complete(v); + } + }); + } + + private static IOException wrapIOEx(Throwable t) { + if (t instanceof IOException) { + return (IOException) t; + } else { + return new IOException(t); + } + } + + private boolean isSafe(final InternalRequest request) { + RequestType type = request.getRequestType(); + return type == RequestType.GET || type == RequestType.HEAD || type == RequestType.OPTIONS; + } + +} diff --git a/core/src/main/java/com/arangodb/internal/net/Connection.java b/core/src/main/java/com/arangodb/internal/net/Connection.java index c2701361e..55396f6b5 100644 --- a/core/src/main/java/com/arangodb/internal/net/Connection.java +++ b/core/src/main/java/com/arangodb/internal/net/Connection.java @@ -20,11 +20,17 @@ package com.arangodb.internal.net; +import com.arangodb.internal.InternalRequest; +import com.arangodb.internal.InternalResponse; + import java.io.Closeable; +import java.util.concurrent.CompletableFuture; /** * @author Mark Vollmary */ public interface Connection extends Closeable { void setJwt(String jwt); + + CompletableFuture executeAsync(InternalRequest request); } diff --git a/driver/src/test/java/com/arangodb/ConsumerThreadAsyncTest.java b/driver/src/test/java/com/arangodb/ConsumerThreadAsyncTest.java index c58a4a815..b241f5f5c 100644 --- a/driver/src/test/java/com/arangodb/ConsumerThreadAsyncTest.java +++ b/driver/src/test/java/com/arangodb/ConsumerThreadAsyncTest.java @@ -1,101 +1,13 @@ package com.arangodb; import com.arangodb.config.ArangoConfigProperties; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; -import java.util.UUID; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import static org.assertj.core.api.Assertions.assertThat; public class ConsumerThreadAsyncTest extends BaseJunit5 { - private volatile Thread thread; - - private void setThread() { - thread = Thread.currentThread(); - } - - private void sleep() { - try { - Thread.sleep(3_000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - @ParameterizedTest - @EnumSource(Protocol.class) - @Disabled - void defaultConsumerThread(Protocol protocol) throws ExecutionException, InterruptedException { - ArangoDBAsync adb = new ArangoDB.Builder() - .loadProperties(ArangoConfigProperties.fromFile()) - .protocol(protocol) - .build() - .async(); - - adb.getVersion() - .thenAccept(it -> setThread()) - .get(); - - adb.shutdown(); - - if (Protocol.VST.equals(protocol)) { - assertThat(thread.getName()).startsWith("adb-vst-"); - } else { - assertThat(thread.getName()).startsWith("adb-http-"); - } - } - - @ParameterizedTest - @EnumSource(Protocol.class) - void customConsumerExecutor(Protocol protocol) throws ExecutionException, InterruptedException { - ExecutorService es = Executors.newCachedThreadPool(r -> { - Thread t = Executors.defaultThreadFactory().newThread(r); - t.setName("custom-" + UUID.randomUUID()); - return t; - }); - ArangoDBAsync adb = new ArangoDB.Builder() - .loadProperties(ArangoConfigProperties.fromFile()) - .protocol(protocol) - .asyncExecutor(es) - .build() - .async(); - - adb.getVersion() - .thenAccept(it -> setThread()) - .get(); - - adb.shutdown(); - es.shutdown(); - assertThat(thread.getName()).startsWith("custom-"); - } - - /** - * Generates warns from Vert.x BlockedThreadChecker - */ - @ParameterizedTest - @EnumSource(Protocol.class) - @Disabled - void sleepOnDefaultConsumerThread(Protocol protocol) throws ExecutionException, InterruptedException { - ArangoDBAsync adb = new ArangoDB.Builder() - .loadProperties(ArangoConfigProperties.fromFile()) - .protocol(protocol) - .maxConnections(1) - .build() - .async(); - - adb.getVersion() - .thenAccept(it -> sleep()) - .get(); - - adb.shutdown(); - } - @ParameterizedTest @EnumSource(Protocol.class) void nestedRequests(Protocol protocol) throws ExecutionException, InterruptedException { diff --git a/http/src/main/java/com/arangodb/http/HttpCommunication.java b/http/src/main/java/com/arangodb/http/HttpCommunication.java index 7166ae3d5..ecd97b86c 100644 --- a/http/src/main/java/com/arangodb/http/HttpCommunication.java +++ b/http/src/main/java/com/arangodb/http/HttpCommunication.java @@ -20,157 +20,26 @@ package com.arangodb.http; -import com.arangodb.ArangoDBException; -import com.arangodb.config.HostDescription; -import com.arangodb.internal.InternalRequest; -import com.arangodb.internal.InternalResponse; -import com.arangodb.internal.RequestType; import com.arangodb.internal.config.ArangoConfig; -import com.arangodb.internal.net.ArangoDBRedirectException; -import com.arangodb.internal.net.Host; -import com.arangodb.internal.net.HostHandle; +import com.arangodb.internal.net.Communication; +import com.arangodb.internal.net.Connection; import com.arangodb.internal.net.HostHandler; -import com.arangodb.internal.serde.InternalSerde; -import com.arangodb.internal.util.HostUtils; -import com.arangodb.internal.util.RequestUtils; -import com.arangodb.internal.util.ResponseUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.Closeable; import java.io.IOException; -import java.net.ConnectException; -import java.net.SocketTimeoutException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicLong; /** * @author Mark Vollmary * @author Michele Rastelli */ -public class HttpCommunication implements Closeable { +public class HttpCommunication extends Communication { - private static final Logger LOGGER = LoggerFactory.getLogger(HttpCommunication.class); - private final HostHandler hostHandler; - private final InternalSerde serde; - private final AtomicLong reqCount; - - HttpCommunication(final HostHandler hostHandler, final ArangoConfig config) { - super(); - this.hostHandler = hostHandler; - this.serde = config.getInternalSerde(); - reqCount = new AtomicLong(); + HttpCommunication(final ArangoConfig config, final HostHandler hostHandler) { + super(config, hostHandler); } @Override - public void close() throws IOException { - hostHandler.close(); - } - - public CompletableFuture executeAsync(final InternalRequest request, final HostHandle hostHandle) { - return executeAsync(request, hostHandle, hostHandler.get(hostHandle, RequestUtils.determineAccessType(request)), 0); - } - - private CompletableFuture executeAsync(final InternalRequest request, final HostHandle hostHandle, final Host host, final int attemptCount) { - final CompletableFuture rfuture = new CompletableFuture<>(); - long reqId = reqCount.getAndIncrement(); - final HttpConnection connection = (HttpConnection) host.connection(); - if (LOGGER.isDebugEnabled()) { - String body = request.getBody() == null ? "" : serde.toJsonString(request.getBody()); - LOGGER.debug("Send Request [id={}]: {} {}", reqId, request, body); - } - connection.executeAsync(request) - .whenComplete((response, e) -> { - try { - if (e instanceof SocketTimeoutException) { - // SocketTimeoutException exceptions are wrapped and rethrown. - TimeoutException te = new TimeoutException(e.getMessage()); - te.initCause(e); - rfuture.completeExceptionally(ArangoDBException.of(te, reqId)); - } else if (e instanceof TimeoutException) { - rfuture.completeExceptionally(ArangoDBException.of(e, reqId)); - } else if (e instanceof ConnectException) { - handleException(true, e, hostHandle, request, host, reqId, attemptCount, rfuture); - } else if (e != null) { - handleException(isSafe(request), e, hostHandle, request, host, reqId, attemptCount, rfuture); - } else { - if (LOGGER.isDebugEnabled()) { - String body = response.getBody() == null ? "" : serde.toJsonString(response.getBody()); - LOGGER.debug("Received Response [id={}]: {} {}", reqId, response, body); - } - ArangoDBException errorEntityEx = ResponseUtils.translateError(serde, response); - if (errorEntityEx instanceof ArangoDBRedirectException) { - if (attemptCount >= 3) { - rfuture.completeExceptionally(errorEntityEx); - } else { - final String location = ((ArangoDBRedirectException) errorEntityEx).getLocation(); - final HostDescription redirectHost = HostUtils.createFromLocation(location); - hostHandler.failIfNotMatch(redirectHost, errorEntityEx); - mirror( - executeAsync(request, new HostHandle().setHost(redirectHost), hostHandler.get(hostHandle, RequestUtils.determineAccessType(request)), attemptCount + 1), - rfuture - ); - } - } else if (errorEntityEx != null) { - rfuture.completeExceptionally(errorEntityEx); - } else { - hostHandler.success(); - rfuture.complete(response); - } - } - } catch (Exception ex) { - rfuture.completeExceptionally(ArangoDBException.of(ex, reqId)); - } - }); - return rfuture; - } - - private void handleException(boolean isSafe, Throwable e, HostHandle hostHandle, InternalRequest request, Host host, - long reqId, int attemptCount, CompletableFuture rfuture) { - IOException ioEx = wrapIOEx(e); - hostHandler.fail(ioEx); - if (hostHandle != null && hostHandle.getHost() != null) { - hostHandle.setHost(null); - } - hostHandler.checkNext(hostHandle, RequestUtils.determineAccessType(request)); - if (isSafe) { - Host nextHost = hostHandler.get(hostHandle, RequestUtils.determineAccessType(request)); - LOGGER.warn("Could not connect to {} while executing request [id={}]", - host.getDescription(), reqId, ioEx); - LOGGER.debug("Try connecting to {}", nextHost.getDescription()); - mirror( - executeAsync(request, hostHandle, nextHost, attemptCount), - rfuture - ); - } else { - ArangoDBException aEx = ArangoDBException.of(ioEx, reqId); - rfuture.completeExceptionally(aEx); - } - } - - private void mirror(CompletableFuture up, CompletableFuture down) { - up.whenComplete((v, err) -> { - if (err != null) { - down.completeExceptionally(err instanceof CompletionException ? err.getCause() : err); - } else { - down.complete(v); - } - }); - } - - private static IOException wrapIOEx(Throwable t) { - if (t instanceof IOException) { - return (IOException) t; - } else { - return new IOException(t); - } - } - - private boolean isSafe(final InternalRequest request) { - RequestType type = request.getRequestType(); - return type == RequestType.GET || type == RequestType.HEAD || type == RequestType.OPTIONS; + protected void connect(Connection conn) throws IOException { + // no-op } } diff --git a/http/src/main/java/com/arangodb/http/HttpConnection.java b/http/src/main/java/com/arangodb/http/HttpConnection.java index d18850d39..1592e0341 100644 --- a/http/src/main/java/com/arangodb/http/HttpConnection.java +++ b/http/src/main/java/com/arangodb/http/HttpConnection.java @@ -221,6 +221,7 @@ private HttpMethod requestTypeToHttpMethod(RequestType requestType) { } } + @Override public CompletableFuture executeAsync(final InternalRequest request) { CompletableFuture rfuture = new CompletableFuture<>(); vertx.runOnContext(e -> doExecute(request, rfuture)); diff --git a/http/src/main/java/com/arangodb/http/HttpProtocol.java b/http/src/main/java/com/arangodb/http/HttpProtocol.java index 67f8cdad3..d3df5cd5b 100644 --- a/http/src/main/java/com/arangodb/http/HttpProtocol.java +++ b/http/src/main/java/com/arangodb/http/HttpProtocol.java @@ -42,8 +42,7 @@ public HttpProtocol(final HttpCommunication httpCommunication) { @Override public CompletableFuture executeAsync(final InternalRequest request, final HostHandle hostHandle) { - return CompletableFuture.completedFuture(null) - .thenCompose(__ -> httpCommunication.executeAsync(request, hostHandle)); + return httpCommunication.executeAsync(request, hostHandle); } @Override diff --git a/http/src/main/java/com/arangodb/http/HttpProtocolProvider.java b/http/src/main/java/com/arangodb/http/HttpProtocolProvider.java index e6bc1d3b9..fd09f5feb 100644 --- a/http/src/main/java/com/arangodb/http/HttpProtocolProvider.java +++ b/http/src/main/java/com/arangodb/http/HttpProtocolProvider.java @@ -25,7 +25,7 @@ public ConnectionFactory createConnectionFactory() { @Override public CommunicationProtocol createProtocol(ArangoConfig config, HostHandler hostHandler) { - return new HttpProtocol(new HttpCommunication(hostHandler, config)); + return new HttpProtocol(new HttpCommunication(config, hostHandler)); } @Override diff --git a/resilience-tests/src/test/java/resilience/ClusterTest.java b/resilience-tests/src/test/java/resilience/ClusterTest.java index 5bebfc89c..bfe88d6e4 100644 --- a/resilience-tests/src/test/java/resilience/ClusterTest.java +++ b/resilience-tests/src/test/java/resilience/ClusterTest.java @@ -140,14 +140,24 @@ private static void initServerId(Endpoint endpoint) { } protected void enableAllEndpoints() { - for (Endpoint endpoint : endpoints) { - endpoint.enable(); + try { + for (Endpoint endpoint : endpoints) { + endpoint.getProxy().enable(); + } + Thread.sleep(100); + } catch (InterruptedException | IOException e) { + throw new RuntimeException(e); } } protected void disableAllEndpoints() { - for (Endpoint endpoint : endpoints) { - endpoint.disable(); + try { + for (Endpoint endpoint : endpoints) { + endpoint.getProxy().disable(); + } + Thread.sleep(100); + } catch (InterruptedException | IOException e) { + throw new RuntimeException(e); } } diff --git a/resilience-tests/src/test/java/resilience/connection/ConnectionTest.java b/resilience-tests/src/test/java/resilience/connection/ConnectionTest.java index 4c16ad6ce..c8a419fd6 100644 --- a/resilience-tests/src/test/java/resilience/connection/ConnectionTest.java +++ b/resilience-tests/src/test/java/resilience/connection/ConnectionTest.java @@ -2,6 +2,7 @@ import com.arangodb.*; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; import resilience.SingleServerTest; @@ -110,4 +111,37 @@ void connectionFailAsync(ArangoDBAsync arangoDB) { getEndpoint().enable(); } + @ParameterizedTest(name = "{index}") + @EnumSource(Protocol.class) + void authFail(Protocol protocol) { + ArangoDB adb = new ArangoDB.Builder() + .host(getEndpoint().getHost(), getEndpoint().getPort()) + .protocol(protocol) + .password("wrong") + .build(); + + Throwable thrown = catchThrowable(adb::getVersion); + assertThat(thrown).isInstanceOf(ArangoDBException.class); + ArangoDBException aEx = (ArangoDBException) thrown; + assertThat(aEx.getResponseCode()).isEqualTo(401); + adb.shutdown(); + } + + @ParameterizedTest(name = "{index}") + @EnumSource(Protocol.class) + void authFailAsync(Protocol protocol) { + ArangoDBAsync adb = new ArangoDB.Builder() + .host(getEndpoint().getHost(), getEndpoint().getPort()) + .protocol(protocol) + .password("wrong") + .build() + .async(); + + Throwable thrown = catchThrowable(() -> adb.getVersion().get()).getCause(); + assertThat(thrown).isInstanceOf(ArangoDBException.class); + ArangoDBException aEx = (ArangoDBException) thrown; + assertThat(aEx.getResponseCode()).isEqualTo(401); + adb.shutdown(); + } + } diff --git a/resilience-tests/src/test/java/resilience/loadbalance/LoadBalanceNoneClusterTest.java b/resilience-tests/src/test/java/resilience/loadbalance/LoadBalanceNoneClusterTest.java index 06691a376..f5a9eccee 100644 --- a/resilience-tests/src/test/java/resilience/loadbalance/LoadBalanceNoneClusterTest.java +++ b/resilience-tests/src/test/java/resilience/loadbalance/LoadBalanceNoneClusterTest.java @@ -76,7 +76,7 @@ void failover(ArangoDB arangoDB) throws IOException { @ParameterizedTest(name = "{index}") @MethodSource("asyncArangoProvider") - void failoverAsymc(ArangoDBAsync arangoDB) throws IOException { + void failoverAsync(ArangoDBAsync arangoDB) throws IOException { List endpoints = getEndpoints(); endpoints.get(0).getProxy().disable(); @@ -96,8 +96,6 @@ void failoverAsymc(ArangoDBAsync arangoDB) throws IOException { } - // FIXME: this fails for VST - @Disabled @ParameterizedTest(name = "{index}") @MethodSource("arangoProvider") void retryGET(ArangoDB arangoDB) throws IOException, InterruptedException { @@ -147,4 +145,31 @@ void retryPOST(ArangoDB arangoDB) throws IOException, InterruptedException { es.shutdown(); } + + @ParameterizedTest(name = "{index}") + @MethodSource("asyncArangoProvider") + void retryPOSTAsync(ArangoDBAsync arangoDB) throws IOException, InterruptedException { + List endpoints = getEndpoints(); + + assertThat(serverIdPOST(arangoDB)).isEqualTo(endpoints.get(0).getServerId()); + + // slow down the driver connection + Latency toxic = getEndpoints().get(0).getProxy().toxics().latency("latency", ToxicDirection.DOWNSTREAM, 10_000); + Thread.sleep(100); + + ScheduledExecutorService es = Executors.newSingleThreadScheduledExecutor(); + es.schedule(() -> getEndpoints().get(0).disable(), 300, TimeUnit.MILLISECONDS); + + Throwable thrown = catchThrowable(() -> serverIdPOST(arangoDB)); + assertThat(thrown).isInstanceOf(ArangoDBException.class); + assertThat(thrown.getCause()).isInstanceOf(IOException.class); + + assertThat(serverIdPOST(arangoDB)).isEqualTo(endpoints.get(1).getServerId()); + assertThat(serverIdPOST(arangoDB)).isEqualTo(endpoints.get(1).getServerId()); + + toxic.remove(); + enableAllEndpoints(); + es.shutdown(); + } + } diff --git a/resilience-tests/src/test/java/resilience/loadbalance/LoadBalanceRoundRobinClusterTest.java b/resilience-tests/src/test/java/resilience/loadbalance/LoadBalanceRoundRobinClusterTest.java index a54812e77..195dfe90f 100644 --- a/resilience-tests/src/test/java/resilience/loadbalance/LoadBalanceRoundRobinClusterTest.java +++ b/resilience-tests/src/test/java/resilience/loadbalance/LoadBalanceRoundRobinClusterTest.java @@ -7,7 +7,6 @@ import com.arangodb.entity.LoadBalancingStrategy; import eu.rekawek.toxiproxy.model.ToxicDirection; import eu.rekawek.toxiproxy.model.toxic.Latency; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import resilience.ClusterTest; @@ -15,6 +14,7 @@ import java.io.IOException; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -79,8 +79,6 @@ void failoverAsync(ArangoDBAsync arangoDB) throws IOException { assertThat(serverIdGET(arangoDB)).isEqualTo(endpoints.get(0).getServerId()); } - // FIXME: this fails for VST - @Disabled @ParameterizedTest(name = "{index}") @MethodSource("arangoProvider") void retryGET(ArangoDB arangoDB) throws IOException, InterruptedException { @@ -98,15 +96,12 @@ void retryGET(ArangoDB arangoDB) throws IOException, InterruptedException { toxic.remove(); enableAllEndpoints(); - Thread.sleep(100); assertThat(serverIdGET(arangoDB)).isEqualTo(endpoints.get(0).getServerId()); es.shutdown(); } - // FIXME: this fails for VST - @Disabled @ParameterizedTest(name = "{index}") @MethodSource("asyncArangoProvider") void retryGETAsync(ArangoDBAsync arangoDB) throws IOException, InterruptedException { @@ -124,7 +119,6 @@ void retryGETAsync(ArangoDBAsync arangoDB) throws IOException, InterruptedExcept toxic.remove(); enableAllEndpoints(); - Thread.sleep(100); assertThat(serverIdGET(arangoDB)).isEqualTo(endpoints.get(0).getServerId()); @@ -134,9 +128,9 @@ void retryGETAsync(ArangoDBAsync arangoDB) throws IOException, InterruptedExcept @ParameterizedTest(name = "{index}") @MethodSource("arangoProvider") void retryPOST(ArangoDB arangoDB) throws IOException, InterruptedException { - List endpoints = getEndpoints(); - for (Endpoint endpoint : endpoints) { - System.out.println(endpoint.getServerId()); + // create VST connections + for (int i = 0; i < getEndpoints().size(); i++) { + arangoDB.getVersion(); } // slow down the driver connection @@ -150,23 +144,23 @@ void retryPOST(ArangoDB arangoDB) throws IOException, InterruptedException { assertThat(thrown).isInstanceOf(ArangoDBException.class); assertThat(thrown.getCause()).isInstanceOf(IOException.class); - assertThat(serverIdPOST(arangoDB)).isEqualTo(endpoints.get(1).getServerId()); - assertThat(serverIdPOST(arangoDB)).isEqualTo(endpoints.get(2).getServerId()); + assertThat(serverIdPOST(arangoDB)).isEqualTo(getEndpoints().get(1).getServerId()); + assertThat(serverIdPOST(arangoDB)).isEqualTo(getEndpoints().get(2).getServerId()); toxic.remove(); enableAllEndpoints(); - assertThat(serverIdPOST(arangoDB)).isEqualTo(endpoints.get(0).getServerId()); + assertThat(serverIdPOST(arangoDB)).isEqualTo(getEndpoints().get(0).getServerId()); es.shutdown(); } @ParameterizedTest(name = "{index}") @MethodSource("asyncArangoProvider") - void retryPOSTAsync(ArangoDBAsync arangoDB) throws IOException, InterruptedException { - List endpoints = getEndpoints(); - for (Endpoint endpoint : endpoints) { - System.out.println(endpoint.getServerId()); + void retryPOSTAsync(ArangoDBAsync arangoDB) throws IOException, InterruptedException, ExecutionException { + // create VST connections + for (int i = 0; i < getEndpoints().size(); i++) { + arangoDB.getVersion().get(); } // slow down the driver connection @@ -180,13 +174,13 @@ void retryPOSTAsync(ArangoDBAsync arangoDB) throws IOException, InterruptedExcep assertThat(thrown).isInstanceOf(ArangoDBException.class); assertThat(thrown.getCause()).isInstanceOf(IOException.class); - assertThat(serverIdPOST(arangoDB)).isEqualTo(endpoints.get(1).getServerId()); - assertThat(serverIdPOST(arangoDB)).isEqualTo(endpoints.get(2).getServerId()); + assertThat(serverIdPOST(arangoDB)).isEqualTo(getEndpoints().get(1).getServerId()); + assertThat(serverIdPOST(arangoDB)).isEqualTo(getEndpoints().get(2).getServerId()); toxic.remove(); enableAllEndpoints(); - assertThat(serverIdPOST(arangoDB)).isEqualTo(endpoints.get(0).getServerId()); + assertThat(serverIdPOST(arangoDB)).isEqualTo(getEndpoints().get(0).getServerId()); es.shutdown(); } diff --git a/resilience-tests/src/test/java/resilience/retry/RetriableCursorTest.java b/resilience-tests/src/test/java/resilience/retry/RetriableCursorTest.java index aa38d498d..7594ae548 100644 --- a/resilience-tests/src/test/java/resilience/retry/RetriableCursorTest.java +++ b/resilience-tests/src/test/java/resilience/retry/RetriableCursorTest.java @@ -35,7 +35,7 @@ static Stream asyncArangoProvider() { @ParameterizedTest(name = "{index}") @MethodSource("arangoProvider") - void retryCursor(ArangoDB arangoDB) throws IOException { + void retryCursor(ArangoDB arangoDB) throws IOException, InterruptedException { try (ArangoCursor cursor = arangoDB.db() .query("for i in 1..2 return i", String.class, @@ -45,6 +45,7 @@ void retryCursor(ArangoDB arangoDB) throws IOException { assertThat(cursor.next()).isEqualTo("1"); assertThat(cursor.hasNext()).isTrue(); Latency toxic = getEndpoint().getProxy().toxics().latency("latency", ToxicDirection.DOWNSTREAM, 10_000); + Thread.sleep(100); Throwable thrown = catchThrowable(cursor::next); assertThat(thrown).isInstanceOf(ArangoDBException.class); assertThat(thrown.getCause()).isInstanceOfAny(TimeoutException.class); @@ -66,10 +67,12 @@ void retryCursorAsync(ArangoDBAsync arangoDB) throws IOException, ExecutionExcep assertThat(c1.getResult()).containsExactly("1"); assertThat(c1.hasMore()).isTrue(); Latency toxic = getEndpoint().getProxy().toxics().latency("latency", ToxicDirection.DOWNSTREAM, 10_000); + Thread.sleep(100); Throwable thrown = catchThrowable(() -> c1.nextBatch().get()).getCause(); assertThat(thrown).isInstanceOf(ArangoDBException.class); assertThat(thrown.getCause()).isInstanceOfAny(TimeoutException.class); toxic.remove(); + Thread.sleep(100); ArangoCursorAsync c2 = c1.nextBatch().get(); assertThat(c2.getResult()).containsExactly("2"); assertThat(c2.hasMore()).isFalse(); diff --git a/resilience-tests/src/test/java/resilience/retry/RetryClusterTest.java b/resilience-tests/src/test/java/resilience/retry/RetryClusterTest.java index ff07f37d4..ab76f27af 100644 --- a/resilience-tests/src/test/java/resilience/retry/RetryClusterTest.java +++ b/resilience-tests/src/test/java/resilience/retry/RetryClusterTest.java @@ -317,7 +317,6 @@ void notRetryPostOnClosedConnection(Protocol protocol) throws IOException, Inter toxic.remove(); enableAllEndpoints(); - arangoDB.shutdown(); es.shutdown(); } @@ -356,7 +355,6 @@ void notRetryPostOnClosedConnectionAsync(Protocol protocol) throws IOException, toxic.remove(); enableAllEndpoints(); - arangoDB.shutdown(); es.shutdown(); } diff --git a/resilience-tests/src/test/java/resilience/retry/RetryTest.java b/resilience-tests/src/test/java/resilience/retry/RetryTest.java index df2d5cd72..0fce057c4 100644 --- a/resilience-tests/src/test/java/resilience/retry/RetryTest.java +++ b/resilience-tests/src/test/java/resilience/retry/RetryTest.java @@ -224,7 +224,6 @@ void retryGetOnClosedConnection(Protocol protocol) throws IOException, Interrupt assertThat(exceptions.get(2)).isInstanceOf(ConnectException.class); toxic.remove(); - Thread.sleep(100); getEndpoint().enable(); arangoDB.getVersion(); @@ -272,7 +271,6 @@ void retryGetOnClosedConnectionAsync(Protocol protocol) throws IOException, Inte assertThat(exceptions.get(2)).isInstanceOf(ConnectException.class); toxic.remove(); - Thread.sleep(100); getEndpoint().enable(); arangoDB.getVersion().get(); @@ -311,7 +309,6 @@ void notRetryPostOnClosedConnection(Protocol protocol) throws IOException, Inter } toxic.remove(); - Thread.sleep(100); getEndpoint().enable(); arangoDB.db().query("return null", Void.class); @@ -350,7 +347,6 @@ void notRetryPostOnClosedConnectionAsync(Protocol protocol) throws IOException, } toxic.remove(); - Thread.sleep(100); getEndpoint().enable(); arangoDB.db().query("return null", Void.class).get(); diff --git a/resilience-tests/src/test/java/resilience/vstKeepAlive/VstKeepAliveCloseTest.java b/resilience-tests/src/test/java/resilience/vstKeepAlive/VstKeepAliveCloseTest.java index e3d22d12b..533be6d8a 100644 --- a/resilience-tests/src/test/java/resilience/vstKeepAlive/VstKeepAliveCloseTest.java +++ b/resilience-tests/src/test/java/resilience/vstKeepAlive/VstKeepAliveCloseTest.java @@ -44,7 +44,7 @@ void shutDown() { */ @Test @Timeout(10) - void keepAliveCloseAndReconnect() throws IOException { + void keepAliveCloseAndReconnect() throws IOException, InterruptedException { arangoDB.getVersion(); Latency toxic = getEndpoint().getProxy().toxics().latency("latency", ToxicDirection.DOWNSTREAM, 10_000); await().until(() -> logs.getLogs() @@ -53,6 +53,7 @@ void keepAliveCloseAndReconnect() throws IOException { .anyMatch(e -> e.getFormattedMessage().contains("Connection unresponsive!"))); toxic.setLatency(0); toxic.remove(); + Thread.sleep(100); arangoDB.getVersion(); } @@ -72,6 +73,7 @@ void keepAliveCloseAndReconnectAsync() throws IOException, ExecutionException, I .anyMatch(e -> e.getFormattedMessage().contains("Connection unresponsive!"))); toxic.setLatency(0); toxic.remove(); + Thread.sleep(100); arangoDB.async().getVersion().get(); } diff --git a/vst/src/main/java/com/arangodb/vst/VstCommunication.java b/vst/src/main/java/com/arangodb/vst/VstCommunication.java index cfb88537d..16a9d16a8 100644 --- a/vst/src/main/java/com/arangodb/vst/VstCommunication.java +++ b/vst/src/main/java/com/arangodb/vst/VstCommunication.java @@ -21,106 +21,50 @@ package com.arangodb.vst; import com.arangodb.ArangoDBException; -import com.arangodb.PackageVersion; import com.arangodb.internal.InternalRequest; import com.arangodb.internal.InternalResponse; import com.arangodb.internal.config.ArangoConfig; -import com.arangodb.internal.net.AccessType; -import com.arangodb.internal.net.Host; -import com.arangodb.internal.net.HostHandle; +import com.arangodb.internal.net.Communication; +import com.arangodb.internal.net.Connection; import com.arangodb.internal.net.HostHandler; -import com.arangodb.internal.serde.InternalSerde; -import com.arangodb.internal.util.RequestUtils; import com.arangodb.internal.util.ResponseUtils; -import com.arangodb.velocypack.VPackSlice; -import com.arangodb.velocypack.exception.VPackParserException; -import com.arangodb.vst.internal.Chunk; -import com.arangodb.vst.internal.Message; -import com.arangodb.vst.internal.VstConnection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.arangodb.vst.internal.AuthenticationRequest; +import com.arangodb.vst.internal.JwtAuthenticationRequest; +import com.arangodb.vst.internal.VstConnectionAsync; -import java.io.Closeable; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ExecutionException; /** * @author Mark Vollmary */ -public abstract class VstCommunication> implements Closeable { +public final class VstCommunication extends Communication { + private static final String ENCRYPTION_PLAIN = "plain"; + private static final String ENCRYPTION_JWT = "jwt"; - protected static final String ENCRYPTION_PLAIN = "plain"; - protected static final String ENCRYPTION_JWT = "jwt"; - protected static final AtomicLong mId = new AtomicLong(0L); - private static final Logger LOGGER = LoggerFactory.getLogger(VstCommunication.class); - protected final InternalSerde serde; - private static final String X_ARANGO_DRIVER = "JavaDriver/" + PackageVersion.VERSION + " (JVM/" + System.getProperty("java.specification.version") + ")"; + private final String user; + private final String password; + private volatile String jwt; - protected final String user; - protected final String password; - protected final Integer chunkSize; - protected final HostHandler hostHandler; - protected volatile String jwt; - - protected VstCommunication(final ArangoConfig config, final HostHandler hostHandler) { + public VstCommunication(final ArangoConfig config, final HostHandler hostHandler) { + super(config, hostHandler); user = config.getUser(); password = config.getPassword(); jwt = config.getJwt(); - serde = config.getInternalSerde(); - chunkSize = config.getChunkSize(); - this.hostHandler = hostHandler; } - @SuppressWarnings("unchecked") - protected synchronized C connect(final HostHandle hostHandle, final AccessType accessType) { - Host host = hostHandler.get(hostHandle, accessType); - while (true) { - if (host == null) { - hostHandler.reset(); - throw new ArangoDBException("Was not able to connect to any host"); - } - final C connection = (C) host.connection(); - if (connection.isOpen()) { - hostHandler.success(); - return connection; - } else { - try { - connection.open(); - hostHandler.success(); - if (jwt != null || user != null) { - tryAuthenticate(connection); - } - if (!connection.isOpen()) { - // see https://github.com/arangodb/arangodb-java-driver/issues/384 - hostHandler.fail(new IOException("The connection is closed.")); - host = hostHandler.get(hostHandle, accessType); - continue; - } - return connection; - } catch (final IOException e) { - hostHandler.fail(e); - if (hostHandle != null && hostHandle.getHost() != null) { - hostHandle.setHost(null); - } - final Host failedHost = host; - host = hostHandler.get(hostHandle, accessType); - if (host != null) { - LOGGER.warn(String.format("Could not connect to %s", failedHost.getDescription()), e); - LOGGER.warn( - String.format("Could not connect to %s or SSL Handshake failed. Try connecting to %s", - failedHost.getDescription(), host.getDescription())); - } else { - LOGGER.error(e.getMessage(), e); - throw ArangoDBException.of(e); - } - } + @Override + protected void connect(Connection conn) throws IOException { + VstConnectionAsync connection = (VstConnectionAsync) conn; + if (!connection.isOpen()) { + connection.open(); + if (jwt != null || user != null) { + tryAuthenticate(connection); } } } - private void tryAuthenticate(final C connection) { + private void tryAuthenticate(final VstConnectionAsync connection) throws IOException { try { authenticate(connection); } catch (final ArangoDBException authException) { @@ -129,69 +73,32 @@ private void tryAuthenticate(final C connection) { } } - protected abstract void authenticate(final C connection); - - @Override - public void close() throws IOException { - hostHandler.close(); - } - - public R execute(final InternalRequest request, final HostHandle hostHandle) { - return execute(request, hostHandle, 0); - } + private void authenticate(final VstConnectionAsync connection) throws IOException { + InternalRequest authRequest; + if (jwt != null) { + authRequest = new JwtAuthenticationRequest(jwt, ENCRYPTION_JWT); + } else { + authRequest = new AuthenticationRequest(user, password != null ? password : "", ENCRYPTION_PLAIN); + } - protected R execute(final InternalRequest request, final HostHandle hostHandle, final int attemptCount) { - final C connection = connect(hostHandle, RequestUtils.determineAccessType(request)); - return execute(request, connection, attemptCount); + InternalResponse response; + try { + response = connection.executeAsync(authRequest).get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw ArangoDBException.of(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } + checkError(response); } - protected abstract R execute(final InternalRequest request, C connection); - protected abstract R execute(final InternalRequest request, C connection, final int attemptCount); - - protected void checkError(final InternalResponse response) { + private void checkError(final InternalResponse response) { ArangoDBException e = ResponseUtils.translateError(serde, response); if (e != null) throw e; } - protected InternalResponse createResponse(final Message message) throws VPackParserException { - final InternalResponse response = serde.deserialize(message.getHead().toByteArray(), InternalResponse.class); - if (message.getBody() != null) { - response.setBody(message.getBody().toByteArray()); - } - return response; - } - - protected final Message createMessage(final InternalRequest request) throws VPackParserException { - request.putHeaderParam("accept", "application/x-velocypack"); - request.putHeaderParam("content-type", "application/x-velocypack"); - request.putHeaderParam("x-arango-driver", X_ARANGO_DRIVER); - final long id = mId.incrementAndGet(); - return new Message(id, serde.serialize(request), request.getBody()); - } - - protected Collection buildChunks(final Message message) { - final Collection chunks = new ArrayList<>(); - final VPackSlice head = message.getHead(); - int size = head.getByteSize(); - final VPackSlice body = message.getBody(); - if (body != null) { - size += body.getByteSize(); - } - final int n = size / chunkSize; - final int numberOfChunks = (size % chunkSize != 0) ? (n + 1) : n; - int off = 0; - for (int i = 0; size > 0; i++) { - final int len = Math.min(chunkSize, size); - final long messageLength = (i == 0 && numberOfChunks > 1) ? size : -1L; - final Chunk chunk = new Chunk(message.getId(), i, numberOfChunks, messageLength, off, len); - size -= len; - off += len; - chunks.add(chunk); - } - return chunks; - } - public void setJwt(String jwt) { this.jwt = jwt; } diff --git a/vst/src/main/java/com/arangodb/vst/VstCommunicationAsync.java b/vst/src/main/java/com/arangodb/vst/VstCommunicationAsync.java deleted file mode 100644 index 8c0aa11e5..000000000 --- a/vst/src/main/java/com/arangodb/vst/VstCommunicationAsync.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * DISCLAIMER - * - * Copyright 2016 ArangoDB GmbH, Cologne, Germany - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Copyright holder is ArangoDB GmbH, Cologne, Germany - */ - -package com.arangodb.vst; - -import com.arangodb.ArangoDBException; -import com.arangodb.config.HostDescription; -import com.arangodb.internal.InternalRequest; -import com.arangodb.internal.InternalResponse; -import com.arangodb.internal.config.ArangoConfig; -import com.arangodb.internal.net.ArangoDBRedirectException; -import com.arangodb.internal.net.HostHandle; -import com.arangodb.internal.net.HostHandler; -import com.arangodb.internal.util.HostUtils; -import com.arangodb.velocypack.exception.VPackException; -import com.arangodb.velocypack.exception.VPackParserException; -import com.arangodb.vst.internal.AuthenticationRequest; -import com.arangodb.vst.internal.JwtAuthenticationRequest; -import com.arangodb.vst.internal.Message; -import com.arangodb.vst.internal.VstConnectionAsync; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutionException; - -/** - * @author Mark Vollmary - */ -public class VstCommunicationAsync extends VstCommunication, VstConnectionAsync> { - - private static final Logger LOGGER = LoggerFactory.getLogger(VstCommunicationAsync.class); - - public VstCommunicationAsync(final ArangoConfig config, final HostHandler hostHandler) { - super(config, hostHandler); - } - - @Override - protected CompletableFuture execute(final InternalRequest request, final VstConnectionAsync connection) { - return execute(request, connection, 0); - } - - @Override - protected CompletableFuture execute(final InternalRequest request, final VstConnectionAsync connection, final int attemptCount) { - final CompletableFuture rfuture = new CompletableFuture<>(); - try { - final Message message = createMessage(request); - send(message, connection).whenComplete((m, ex) -> { - if (m != null) { - final InternalResponse response; - try { - response = createResponse(m); - } catch (final VPackParserException e) { - LOGGER.error(e.getMessage(), e); - rfuture.completeExceptionally(e); - return; - } - - try { - checkError(response); - } catch (final ArangoDBRedirectException e) { - if (attemptCount >= 3) { - rfuture.completeExceptionally(e); - return; - } - final String location = e.getLocation(); - final HostDescription redirectHost = HostUtils.createFromLocation(location); - hostHandler.failIfNotMatch(redirectHost, e); - execute(request, new HostHandle().setHost(redirectHost), attemptCount + 1) - .whenComplete((v, err) -> { - if (v != null) { - rfuture.complete(v); - } else if (err != null) { - rfuture.completeExceptionally(err instanceof CompletionException ? err.getCause() : err); - } else { - rfuture.cancel(true); - } - }); - return; - } catch (ArangoDBException e) { - rfuture.completeExceptionally(e); - } - rfuture.complete(response); - } else if (ex != null) { - Throwable e = ex instanceof CompletionException ? ex.getCause() : ex; - LOGGER.error(e.getMessage(), e); - rfuture.completeExceptionally(e); - } else { - rfuture.cancel(true); - } - }); - } catch (final VPackException e) { - LOGGER.error(e.getMessage(), e); - rfuture.completeExceptionally(e); - } - return rfuture; - } - - private CompletableFuture send(final Message message, final VstConnectionAsync connection) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(String.format("Send Message (id=%s, head=%s, body=%s)", message.getId(), message.getHead(), - message.getBody() != null ? message.getBody() : "{}")); - } - return connection.write(message, buildChunks(message)); - } - - @Override - protected void authenticate(final VstConnectionAsync connection) { - InternalRequest authRequest; - if (jwt != null) { - authRequest = new JwtAuthenticationRequest(jwt, ENCRYPTION_JWT); - } else { - authRequest = new AuthenticationRequest(user, password != null ? password : "", ENCRYPTION_PLAIN); - } - - InternalResponse response; - try { - response = execute(authRequest, connection).get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw ArangoDBException.of(e); - } catch (ExecutionException e) { - throw ArangoDBException.of(e.getCause()); - } - checkError(response); - } - -} diff --git a/vst/src/main/java/com/arangodb/vst/VstProtocol.java b/vst/src/main/java/com/arangodb/vst/VstProtocol.java index c7acfb0ec..e943df389 100644 --- a/vst/src/main/java/com/arangodb/vst/VstProtocol.java +++ b/vst/src/main/java/com/arangodb/vst/VstProtocol.java @@ -36,10 +36,10 @@ */ public class VstProtocol implements CommunicationProtocol { - private final VstCommunicationAsync communication; + private final VstCommunication communication; private final ExecutorService outgoingExecutor = Executors.newCachedThreadPool(); - public VstProtocol(final VstCommunicationAsync communication) { + public VstProtocol(final VstCommunication communication) { super(); this.communication = communication; } @@ -52,7 +52,7 @@ public CompletableFuture executeAsync(InternalRequest request, return cf; } return CompletableFuture.completedFuture(null) - .thenComposeAsync(__ -> communication.execute(request, hostHandle), outgoingExecutor); + .thenComposeAsync(__ -> communication.executeAsync(request, hostHandle), outgoingExecutor); } @Override diff --git a/vst/src/main/java/com/arangodb/vst/VstProtocolProvider.java b/vst/src/main/java/com/arangodb/vst/VstProtocolProvider.java index 6d8eb0fa5..033748522 100644 --- a/vst/src/main/java/com/arangodb/vst/VstProtocolProvider.java +++ b/vst/src/main/java/com/arangodb/vst/VstProtocolProvider.java @@ -21,7 +21,7 @@ public ConnectionFactory createConnectionFactory() { @Override public CommunicationProtocol createProtocol(ArangoConfig config, HostHandler hostHandler) { - return new VstProtocol(new VstCommunicationAsync(config, hostHandler)); + return new VstProtocol(new VstCommunication(config, hostHandler)); } @Override diff --git a/vst/src/main/java/com/arangodb/vst/internal/VstConnectionAsync.java b/vst/src/main/java/com/arangodb/vst/internal/VstConnectionAsync.java index ace6e21a7..b740efa9f 100644 --- a/vst/src/main/java/com/arangodb/vst/internal/VstConnectionAsync.java +++ b/vst/src/main/java/com/arangodb/vst/internal/VstConnectionAsync.java @@ -20,22 +20,42 @@ package com.arangodb.vst.internal; +import com.arangodb.PackageVersion; import com.arangodb.config.HostDescription; +import com.arangodb.internal.InternalRequest; +import com.arangodb.internal.InternalResponse; import com.arangodb.internal.config.ArangoConfig; +import com.arangodb.internal.serde.InternalSerde; +import com.arangodb.velocypack.VPackSlice; +import com.arangodb.velocypack.exception.VPackException; +import com.arangodb.velocypack.exception.VPackParserException; import com.arangodb.vst.internal.utils.CompletableFutureUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** * @author Mark Vollmary */ public class VstConnectionAsync extends VstConnection> { + private final static Logger LOGGER = LoggerFactory.getLogger(VstConnectionAsync.class); + private static final AtomicLong mId = new AtomicLong(0L); + private static final String X_ARANGO_DRIVER = "JavaDriver/" + PackageVersion.VERSION + " (JVM/" + System.getProperty("java.specification.version") + ")"; + private final Integer chunkSize; + private final InternalSerde serde; + public VstConnectionAsync(final ArangoConfig config, final HostDescription host) { super(config, host); + chunkSize = config.getChunkSize(); + serde = config.getInternalSerde(); } @Override @@ -63,4 +83,79 @@ protected void doKeepAlive() { sendKeepAlive().join(); } + @Override + public CompletableFuture executeAsync(final InternalRequest request) { + // TODO: refactor using Future composition + final CompletableFuture rfuture = new CompletableFuture<>(); + try { + final Message message = createMessage(request); + send(message).whenComplete((m, ex) -> { + if (m != null) { + final InternalResponse response; + try { + response = createResponse(m); + } catch (final VPackParserException e) { + rfuture.completeExceptionally(e); + return; + } + rfuture.complete(response); + } else { + Throwable e = ex instanceof CompletionException ? ex.getCause() : ex; + rfuture.completeExceptionally(e); + } + }); + } catch (final VPackException e) { + LOGGER.error(e.getMessage(), e); + rfuture.completeExceptionally(e); + } + return rfuture; + } + + private Message createMessage(final InternalRequest request) throws VPackParserException { + request.putHeaderParam("accept", "application/x-velocypack"); + request.putHeaderParam("content-type", "application/x-velocypack"); + request.putHeaderParam("x-arango-driver", X_ARANGO_DRIVER); + final long id = mId.incrementAndGet(); + return new Message(id, serde.serialize(request), request.getBody()); + } + + private CompletableFuture send(final Message message) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(String.format("Send Message (id=%s, head=%s, body=%s)", + message.getId(), + serde.toJsonString(message.getHead().toByteArray()), + message.getBody() != null ? serde.toJsonString(message.getBody().toByteArray()) : "{}")); + } + return write(message, buildChunks(message)); + } + + private Collection buildChunks(final Message message) { + final Collection chunks = new ArrayList<>(); + final VPackSlice head = message.getHead(); + int size = head.getByteSize(); + final VPackSlice body = message.getBody(); + if (body != null) { + size += body.getByteSize(); + } + final int n = size / chunkSize; + final int numberOfChunks = (size % chunkSize != 0) ? (n + 1) : n; + int off = 0; + for (int i = 0; size > 0; i++) { + final int len = Math.min(chunkSize, size); + final long messageLength = (i == 0 && numberOfChunks > 1) ? size : -1L; + final Chunk chunk = new Chunk(message.getId(), i, numberOfChunks, messageLength, off, len); + size -= len; + off += len; + chunks.add(chunk); + } + return chunks; + } + + private InternalResponse createResponse(final Message message) throws VPackParserException { + final InternalResponse response = serde.deserialize(message.getHead().toByteArray(), InternalResponse.class); + if (message.getBody() != null) { + response.setBody(message.getBody().toByteArray()); + } + return response; + } }