Skip to content

Commit

Permalink
Added retry logic if client request returns an error code that is con…
Browse files Browse the repository at this point in the history
…figured for retrying

Closes #4408

Added additional tests for coverage

Included example of comma separated values for retry-error-codes config parameter

Simplified retry loop logic

Moved retry configurations into constructor and removed setters

Signed-off-by: Allan Clements <[email protected]>
  • Loading branch information
criminosis committed Apr 26, 2024
1 parent 0d601cf commit d75cfe6
Show file tree
Hide file tree
Showing 6 changed files with 283 additions and 12 deletions.
4 changes: 4 additions & 0 deletions docs/configs/janusgraph-cfg.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ Elasticsearch index configuration
| index.[X].elasticsearch.enable_index_names_cache | Enables cache for generated index store names. It is recommended to always enable index store names cache unless you have more then 50000 indexes per index store. | Boolean | true | MASKABLE |
| index.[X].elasticsearch.health-request-timeout | When JanusGraph initializes its ES backend, JanusGraph waits up to this duration for the ES cluster health to reach at least yellow status. This string should be formatted as a natural number followed by the lowercase letter "s", e.g. 3s or 60s. | String | 30s | MASKABLE |
| index.[X].elasticsearch.interface | Interface for connecting to Elasticsearch. TRANSPORT_CLIENT and NODE were previously supported, but now are required to migrate to REST_CLIENT. See the JanusGraph upgrade instructions for more details. | String | REST_CLIENT | MASKABLE |
| index.[X].elasticsearch.retry-error-codes | Comma separated list of Elasticsearch REST client ResponseException error codes to retry. E.g. "408,429" | String[] | | LOCAL |
| index.[X].elasticsearch.retry-initial-wait | Sets the initial retry wait time (in milliseconds) before exponential backoff. | Long | 1 | LOCAL |
| index.[X].elasticsearch.retry-limit | Sets the number of attempts for configured retryable error codes. | Integer | 0 | LOCAL |
| index.[X].elasticsearch.retry-max-wait | Sets the max retry wait time (in milliseconds). | Long | 1000 | LOCAL |
| index.[X].elasticsearch.retry_on_conflict | Specify how many times should the operation be retried when a conflict occurs. | Integer | 0 | MASKABLE |
| index.[X].elasticsearch.scroll-keep-alive | How long (in seconds) elasticsearch should keep alive the scroll context. | Integer | 60 | GLOBAL_OFFLINE |
| index.[X].elasticsearch.setup-max-open-scroll-contexts | Whether JanusGraph should setup max_open_scroll_context to maximum value for the cluster or not. | Boolean | true | MASKABLE |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,26 @@ public class ElasticSearchIndex implements IndexProvider {
"Sets the maximum socket timeout (in milliseconds).", ConfigOption.Type.MASKABLE,
Integer.class, RestClientBuilder.DEFAULT_SOCKET_TIMEOUT_MILLIS);

public static final ConfigOption<Integer> RETRY_LIMIT =
new ConfigOption<>(ELASTICSEARCH_NS, "retry-limit",
"Sets the number of attempts for configured retryable error codes.", ConfigOption.Type.LOCAL,
Integer.class, 0);

public static final ConfigOption<Long> RETRY_INITIAL_WAIT =
new ConfigOption<>(ELASTICSEARCH_NS, "retry-initial-wait",
"Sets the initial retry wait time (in milliseconds) before exponential backoff.",
ConfigOption.Type.LOCAL, Long.class, 1L);

public static final ConfigOption<Long> RETRY_MAX_WAIT =
new ConfigOption<>(ELASTICSEARCH_NS, "retry-max-wait",
"Sets the max retry wait time (in milliseconds).", ConfigOption.Type.LOCAL,
Long.class, 1000L);

public static final ConfigOption<String[]> RETRY_ERROR_CODES =
new ConfigOption<>(ELASTICSEARCH_NS, "retry-error-codes",
"Comma separated list of Elasticsearch REST client ResponseException error codes to retry. " +
"E.g. \"408,429\"", ConfigOption.Type.LOCAL, String[].class, new String[0]);

public static final int HOST_PORT_DEFAULT = 9200;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_HOSTS;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_PORT;
Expand Down Expand Up @@ -73,7 +76,13 @@ public ElasticSearchClient connect(Configuration config) throws IOException {
final int scrollKeepAlive = config.get(ElasticSearchIndex.ES_SCROLL_KEEP_ALIVE);
Preconditions.checkArgument(scrollKeepAlive >= 1, "Scroll keep-alive should be greater than or equal to 1");
final boolean useMappingTypesForES7 = config.get(ElasticSearchIndex.USE_MAPPING_FOR_ES7);
final RestElasticSearchClient client = getElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7);
int retryLimit = config.getOrDefault(ElasticSearchIndex.RETRY_LIMIT);
long retryInitialWaitMs = config.getOrDefault(ElasticSearchIndex.RETRY_INITIAL_WAIT);
long retryMaxWaitMs = config.getOrDefault(ElasticSearchIndex.RETRY_MAX_WAIT);
Set<Integer> errorCodesToRetry = Arrays.stream(config.getOrDefault(ElasticSearchIndex.RETRY_ERROR_CODES))
.mapToInt(Integer::parseInt).boxed().collect(Collectors.toSet());
final RestElasticSearchClient client = getElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7,
retryLimit, errorCodesToRetry, retryInitialWaitMs, retryMaxWaitMs);
if (config.has(ElasticSearchIndex.BULK_REFRESH)) {
client.setBulkRefresh(config.get(ElasticSearchIndex.BULK_REFRESH));
}
Expand Down Expand Up @@ -104,8 +113,11 @@ protected RestClientBuilder getRestClientBuilder(HttpHost[] hosts) {
return RestClient.builder(hosts);
}

protected RestElasticSearchClient getElasticSearchClient(RestClient rc, int scrollKeepAlive, boolean useMappingTypesForES7) {
return new RestElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7);
protected RestElasticSearchClient getElasticSearchClient(RestClient rc, int scrollKeepAlive, boolean useMappingTypesForES7,
int retryAttemptLimit, Set<Integer> retryOnErrorCodes, long retryInitialWaitMs,
long retryMaxWaitMs) {
return new RestElasticSearchClient(rc, scrollKeepAlive, useMappingTypesForES7, retryAttemptLimit, retryOnErrorCodes,
retryInitialWaitMs, retryMaxWaitMs);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -110,14 +112,28 @@ public class RestElasticSearchClient implements ElasticSearchClient {
private Integer retryOnConflict;

private final String retryOnConflictKey;

public RestElasticSearchClient(RestClient delegate, int scrollKeepAlive, boolean useMappingTypesForES7) {

private final int retryAttemptLimit;

private final Set<Integer> retryOnErrorCodes;

private final long retryInitialWaitMs;

private final long retryMaxWaitMs;

public RestElasticSearchClient(RestClient delegate, int scrollKeepAlive, boolean useMappingTypesForES7,
int retryAttemptLimit, Set<Integer> retryOnErrorCodes, long retryInitialWaitMs,
long retryMaxWaitMs) {
this.delegate = delegate;
majorVersion = getMajorVersion();
this.scrollKeepAlive = scrollKeepAlive+"s";
esVersion7 = ElasticMajorVersion.SEVEN.equals(majorVersion);
useMappingTypes = majorVersion.getValue() < 7 || (useMappingTypesForES7 && esVersion7);
retryOnConflictKey = majorVersion.getValue() >= 7 ? "retry_on_conflict" : "_retry_on_conflict";
this.retryAttemptLimit = retryAttemptLimit;
this.retryOnErrorCodes = Collections.unmodifiableSet(retryOnErrorCodes);
this.retryInitialWaitMs = retryInitialWaitMs;
this.retryMaxWaitMs = retryMaxWaitMs;
}

@Override
Expand Down Expand Up @@ -546,13 +562,35 @@ private Response performRequest(String method, String path, byte[] requestData)
return performRequest(new Request(method, path), requestData);
}

private Response performRequestWithRetry(Request request) throws IOException {
int retryCount = 0;
while (true) {
try {
return delegate.performRequest(request);
} catch (ResponseException e) {
if (!retryOnErrorCodes.contains(e.getResponse().getStatusLine().getStatusCode()) || retryCount >= retryAttemptLimit) {
throw e;
}
//Wait before trying again
long waitDurationMs = Math.min((long) (retryInitialWaitMs * Math.pow(10, retryCount)), retryMaxWaitMs);
log.warn("Retrying Elasticsearch request in {} ms. Attempt {} of {}", waitDurationMs, retryCount, retryAttemptLimit);
try {
Thread.sleep(waitDurationMs);
} catch (InterruptedException interruptedException) {
throw new RuntimeException(String.format("Thread interrupted while waiting for retry attempt %d of %d", retryCount, retryAttemptLimit), interruptedException);

Check warning on line 580 in janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java#L580

Avoid throwing raw exception types.

Check warning on line 580 in janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java#L579-L580

Added lines #L579 - L580 were not covered by tests
}
}
retryCount++;
}
}

private Response performRequest(Request request, byte[] requestData) throws IOException {

final HttpEntity entity = requestData != null ? new ByteArrayEntity(requestData, ContentType.APPLICATION_JSON) : null;

request.setEntity(entity);

final Response response = delegate.performRequest(request);
final Response response = performRequestWithRetry(request);

if (response.getStatusLine().getStatusCode() >= 400) {
throw new IOException("Error executing request: " + response.getStatusLine().getReasonPhrase());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright 2024 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.janusgraph.diskstorage.es.rest;

import com.google.common.collect.Sets;
import org.apache.http.StatusLine;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

import java.io.IOException;
import java.util.Collections;
import java.util.Set;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
public class RestClientRetryTest {
@Mock
private RestClient restClientMock;

@Mock
private ResponseException responseException;

@Mock
private Response response;

@Mock
private StatusLine statusLine;

@Captor
private ArgumentCaptor<Request> requestCaptor;

RestElasticSearchClient createClient(int retryAttemptLimit, Set<Integer> retryErrorCodes) throws IOException {
//Just throw an exception when there's an attempt to look up the ES version during instantiation
when(restClientMock.performRequest(any())).thenThrow(new IOException());

RestElasticSearchClient clientUnderTest = new RestElasticSearchClient(restClientMock, 0, false,
retryAttemptLimit, retryErrorCodes, 0, 0);
//There's an initial query to get the ES version we need to accommodate, and then reset for the actual test
Mockito.reset(restClientMock);
return clientUnderTest;
}

@Test
public void testRetryOnConfiguredErrorStatus() throws IOException {
Integer retryCode = 429;
int expectedNumberOfRequestAttempts = 2;
doReturn(retryCode).when(statusLine).getStatusCode();
doReturn(statusLine).when(response).getStatusLine();
doReturn(response).when(responseException).getResponse();
//Just throw an expected exception the second time to confirm the retry occurred
//rather than mock out a parsable response
IOException expectedFinalException = new IOException("Expected");

try (RestElasticSearchClient restClientUnderTest = createClient(expectedNumberOfRequestAttempts - 1,
Sets.newHashSet(retryCode))) {
//prime the restClientMock again after it's reset after creation
when(restClientMock.performRequest(any()))
.thenThrow(responseException)
.thenThrow(expectedFinalException);
restClientUnderTest.bulkRequest(Collections.emptyList(), null);
Assertions.fail("Should have thrown the expected exception after retry");
} catch (Exception actualException) {
Assertions.assertSame(expectedFinalException, actualException);
}
verify(restClientMock, times(expectedNumberOfRequestAttempts)).performRequest(requestCaptor.capture());
}

@Test
public void testRetriesExhaustedReturnsLastRetryException() throws IOException {
Integer retryCode = 429;
int expectedNumberOfRequestAttempts = 2;
doReturn(retryCode).when(statusLine).getStatusCode();
doReturn(statusLine).when(response).getStatusLine();
doReturn(response).when(responseException).getResponse();
ResponseException initialRetryException = mock(ResponseException.class);
doReturn(response).when(initialRetryException).getResponse();

try (RestElasticSearchClient restClientUnderTest = createClient(expectedNumberOfRequestAttempts - 1,
Sets.newHashSet(retryCode))) {
//prime the restClientMock again after it's reset after creation
when(restClientMock.performRequest(any()))
//first throw a different retry exception instance, then make sure it's the latter one
//that was retained and then thrown
.thenThrow(initialRetryException)
.thenThrow(responseException);


restClientUnderTest.bulkRequest(Collections.emptyList(), null);
Assertions.fail("Should have thrown the expected exception after retry");
} catch (Exception e) {
Assertions.assertSame(responseException, e);
}
verify(restClientMock, times(expectedNumberOfRequestAttempts)).performRequest(requestCaptor.capture());
}

@Test
public void testNonRetryErrorCodeException() throws IOException {
doReturn(503).when(statusLine).getStatusCode();
doReturn(statusLine).when(response).getStatusLine();
doReturn(response).when(responseException).getResponse();
try (RestElasticSearchClient restClientUnderTest = createClient(0,
//Other retry error code is configured
Sets.newHashSet(429))) {
//prime the restClientMock again after it's reset after creation
when(restClientMock.performRequest(any()))
.thenThrow(responseException);
restClientUnderTest.bulkRequest(Collections.emptyList(), null);
Assertions.fail("Should have thrown the expected exception");
} catch (Exception e) {
Assertions.assertSame(responseException, e);
}
verify(restClientMock, times(1)).performRequest(requestCaptor.capture());
}

@Test
public void testNonResponseExceptionErrorThrown() throws IOException {
IOException differentExceptionType = new IOException();
when(restClientMock.performRequest(any()))
.thenThrow(differentExceptionType);
try (RestElasticSearchClient restClientUnderTest = createClient(0, Collections.emptySet())) {
restClientUnderTest.bulkRequest(Collections.emptyList(), null);
Assertions.fail("Should have thrown the expected exception");
} catch (Exception e) {
Assertions.assertSame(differentExceptionType, e);
}
verify(restClientMock, times(1)).performRequest(requestCaptor.capture());
}
}
Loading

0 comments on commit d75cfe6

Please sign in to comment.