Skip to content

Commit

Permalink
Add integration test for UpdateDatasource API (#307)
Browse files Browse the repository at this point in the history
Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 authored May 16, 2023
1 parent 521487b commit c77c5af
Show file tree
Hide file tree
Showing 11 changed files with 316 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.exceptions;

import java.io.IOException;

import org.opensearch.OpenSearchException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.rest.RestStatus;

/**
* IncompatibleDatasourceException corresponding to the {@link RestStatus#BAD_REQUEST} status code
*
* The exception is thrown when a user tries to update datasource with new endpoint which is not compatible
* with current datasource
*/
public class IncompatibleDatasourceException extends OpenSearchException {

public IncompatibleDatasourceException(String msg, Object... args) {
super(msg, args);
}

public IncompatibleDatasourceException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}

public IncompatibleDatasourceException(StreamInput in) throws IOException {
super(in);
}

@Override
public final RestStatus status() {
return RestStatus.BAD_REQUEST;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ public String getName() {

@Override
protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
final PutDatasourceRequest putDatasourceRequest = new PutDatasourceRequest(request.param("name"));
final UpdateDatasourceRequest updateDatasourceRequest = new UpdateDatasourceRequest(request.param("name"));
if (request.hasContentOrSourceParam()) {
try (XContentParser parser = request.contentOrSourceParamParser()) {
PutDatasourceRequest.PARSER.parse(parser, putDatasourceRequest, null);
UpdateDatasourceRequest.PARSER.parse(parser, updateDatasourceRequest, null);
}
}
return channel -> client.executeLocally(
UpdateDatasourceAction.INSTANCE,
putDatasourceRequest,
updateDatasourceRequest,
new RestToXContentListener<>(channel)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
@Log4j2
@EqualsAndHashCode(callSuper = false)
public class UpdateDatasourceRequest extends AcknowledgedRequest<UpdateDatasourceRequest> {
private static final ParseField ENDPOINT_FIELD = new ParseField("endpoint");
private static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days");
public static final ParseField ENDPOINT_FIELD = new ParseField("endpoint");
public static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days");
private static final int MAX_DATASOURCE_NAME_BYTES = 255;
/**
* @param name the datasource name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.inject.Inject;
import org.opensearch.geospatial.exceptions.IncompatibleDatasourceException;
import org.opensearch.geospatial.exceptions.ConcurrentModificationException;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;
Expand Down Expand Up @@ -149,7 +150,7 @@ private void validateFieldsCompatibility(final UpdateDatasourceRequest request,

List<String> fields = datasourceUpdateService.getHeaderFields(request.getEndpoint());
if (datasource.isCompatible(fields) == false) {
throw new OpenSearchException(
throw new IncompatibleDatasourceException(
"new fields [{}] does not contain all old fields [{}]",
fields.toString(),
datasource.getDatabase().getFields().toString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ protected static void deletePipeline(String name) throws IOException {
client().performRequest(request);
}

protected Response createDatasource(final String name, Map<String, String> properties) throws IOException {
protected Response createDatasource(final String name, Map<String, Object> properties) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
for (Map.Entry<String, String> config : properties.entrySet()) {
for (Map.Entry<String, Object> config : properties.entrySet()) {
builder.field(config.getKey(), config.getValue());
}
builder.endObject();
Expand Down Expand Up @@ -149,16 +149,31 @@ protected Response deleteDatasource(final String name) throws IOException {
return client().performRequest(request);
}

protected Response deleteDatasource(final String name, final int retry) throws Exception {
for (int i = 0; i < retry; i++) {
try {
Request request = new Request(DELETE, buildDatasourcePath(name));
return client().performRequest(request);
} catch (Exception e) {
if (i + 1 == retry) {
throw e;
}
Thread.sleep(1000);
}
}
throw new RuntimeException("should not reach here");
}

protected Map<String, Object> getDatasource(final String name) throws Exception {
Request request = new Request(GET, buildDatasourcePath(name));
Response response = client().performRequest(request);
return createParser(XContentType.JSON.xContent(), EntityUtils.toString(response.getEntity())).map();
}

protected Response updateDatasource(final String name, Map<String, Object> config) throws IOException {
protected Response updateDatasource(final String name, Map<String, Object> properties) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
if (config != null && !config.isEmpty()) {
builder.value(config);
for (Map.Entry<String, Object> config : properties.entrySet()) {
builder.field(config.getKey(), config.getValue());
}
builder.endObject();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.exceptions;

import lombok.SneakyThrows;

import org.opensearch.common.io.stream.BytesStreamInput;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.rest.RestStatus;
import org.opensearch.test.OpenSearchTestCase;

public class IncompatibleDatasourceExceptionTests extends OpenSearchTestCase {
public void testConstructor_whenCreated_thenSucceed() {
IncompatibleDatasourceException exception = new IncompatibleDatasourceException(
"New datasource is not compatible with existing datasource"
);
assertEquals(RestStatus.BAD_REQUEST, exception.status());
}

public void testConstructor_whenCreatedWithRootCause_thenSucceed() {
IncompatibleDatasourceException exception = new IncompatibleDatasourceException(
"New datasource is not compatible with existing datasource",
new RuntimeException()
);
assertEquals(RestStatus.BAD_REQUEST, exception.status());
}

@SneakyThrows
public void testConstructor_whenCreatedWithStream_thenSucceed() {
IncompatibleDatasourceException exception = new IncompatibleDatasourceException(
"New datasource is not compatible with existing datasource"
);

BytesStreamOutput output = new BytesStreamOutput();
exception.writeTo(output);
BytesStreamInput input = new BytesStreamInput(output.bytes().toBytesRef().bytes);
IncompatibleDatasourceException copiedException = new IncompatibleDatasourceException(input);
assertEquals(exception.getMessage(), copiedException.getMessage());
assertEquals(exception.status(), copiedException.status());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.exceptions;

import lombok.SneakyThrows;

import org.opensearch.common.io.stream.BytesStreamInput;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.rest.RestStatus;
import org.opensearch.test.OpenSearchTestCase;

public class ResourceInUseExceptionTests extends OpenSearchTestCase {
public void testConstructor_whenCreated_thenSucceed() {
ResourceInUseException exception = new ResourceInUseException("Resource is in use");
assertEquals(RestStatus.BAD_REQUEST, exception.status());
}

public void testConstructor_whenCreatedWithRootCause_thenSucceed() {
ResourceInUseException exception = new ResourceInUseException("Resource is in use", new RuntimeException());
assertEquals(RestStatus.BAD_REQUEST, exception.status());
}

@SneakyThrows
public void testConstructor_whenCreatedWithStream_thenSucceed() {
ResourceInUseException exception = new ResourceInUseException("New datasource is not compatible with existing datasource");

BytesStreamOutput output = new BytesStreamOutput();
exception.writeTo(output);
BytesStreamInput input = new BytesStreamInput(output.bytes().toBytesRef().bytes);
ResourceInUseException copiedException = new ResourceInUseException(input);
assertEquals(exception.getMessage(), copiedException.getMessage());
assertEquals(exception.status(), copiedException.status());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,34 @@ public void testPrepareRequest_whenValidInput_thenSucceed() {
AtomicBoolean isExecuted = new AtomicBoolean(false);

verifyingClient.setExecuteLocallyVerifier((actionResponse, actionRequest) -> {
assertTrue(actionRequest instanceof PutDatasourceRequest);
PutDatasourceRequest putDatasourceRequest = (PutDatasourceRequest) actionRequest;
assertEquals("https://test.com", putDatasourceRequest.getEndpoint());
assertEquals(TimeValue.timeValueDays(1), putDatasourceRequest.getUpdateInterval());
assertEquals(datasourceName, putDatasourceRequest.getName());
assertTrue(actionRequest instanceof UpdateDatasourceRequest);
UpdateDatasourceRequest updateDatasourceRequest = (UpdateDatasourceRequest) actionRequest;
assertEquals("https://test.com", updateDatasourceRequest.getEndpoint());
assertEquals(TimeValue.timeValueDays(1), updateDatasourceRequest.getUpdateInterval());
assertEquals(datasourceName, updateDatasourceRequest.getName());
isExecuted.set(true);
return null;
});

dispatchRequest(request);
assertTrue(isExecuted.get());
}

public void testPrepareRequest_whenNullInput_thenSucceed() {
String datasourceName = GeospatialTestHelper.randomLowerCaseString();
String content = "{}";
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT)
.withPath(String.format(Locale.ROOT, path, datasourceName))
.withContent(new BytesArray(content), XContentType.JSON)
.build();
AtomicBoolean isExecuted = new AtomicBoolean(false);

verifyingClient.setExecuteLocallyVerifier((actionResponse, actionRequest) -> {
assertTrue(actionRequest instanceof UpdateDatasourceRequest);
UpdateDatasourceRequest updateDatasourceRequest = (UpdateDatasourceRequest) actionRequest;
assertNull(updateDatasourceRequest.getEndpoint());
assertNull(updateDatasourceRequest.getUpdateInterval());
assertEquals(datasourceName, updateDatasourceRequest.getName());
isExecuted.set(true);
return null;
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.action;

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Locale;
import java.util.Map;

import lombok.SneakyThrows;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.opensearch.client.ResponseException;
import org.opensearch.geospatial.GeospatialRestTestCase;
import org.opensearch.geospatial.GeospatialTestHelper;
import org.opensearch.geospatial.ip2geo.Ip2GeoDataServer;
import org.opensearch.rest.RestStatus;

public class UpdateDatasourceIT extends GeospatialRestTestCase {
// Use this value in resource name to avoid name conflict among tests
private static final String PREFIX = UpdateDatasourceIT.class.getSimpleName().toLowerCase(Locale.ROOT);

@BeforeClass
public static void start() {
Ip2GeoDataServer.start();
}

@AfterClass
public static void stop() {
Ip2GeoDataServer.stop();
}

@SneakyThrows
public void testUpdateDatasource_whenValidInput_thenUpdated() {
boolean isDatasourceCreated = false;
String datasourceName = PREFIX + GeospatialTestHelper.randomLowerCaseString();
try {
Map<String, Object> datasourceProperties = Map.of(
PutDatasourceRequest.ENDPOINT_FIELD.getPreferredName(),
Ip2GeoDataServer.getEndpointCountry()
);

// Create datasource and wait for it to be available
createDatasource(datasourceName, datasourceProperties);
isDatasourceCreated = true;
waitForDatasourceToBeAvailable(datasourceName, Duration.ofSeconds(10));

int updateIntervalInDays = 1;
updateDatasourceEndpoint(datasourceName, Ip2GeoDataServer.getEndpointCity(), updateIntervalInDays);
List<Map<String, Object>> datasources = (List<Map<String, Object>>) getDatasource(datasourceName).get("datasources");

assertEquals(Ip2GeoDataServer.getEndpointCity(), datasources.get(0).get("endpoint"));
assertEquals(updateIntervalInDays, datasources.get(0).get("update_interval_in_days"));
} finally {
if (isDatasourceCreated) {
deleteDatasource(datasourceName, 3);
}
}
}

@SneakyThrows
public void testUpdateDatasource_whenIncompatibleFields_thenFails() {
boolean isDatasourceCreated = false;
String datasourceName = PREFIX + GeospatialTestHelper.randomLowerCaseString();
try {
Map<String, Object> datasourceProperties = Map.of(
PutDatasourceRequest.ENDPOINT_FIELD.getPreferredName(),
Ip2GeoDataServer.getEndpointCity()
);

// Create datasource and wait for it to be available
createDatasource(datasourceName, datasourceProperties);
isDatasourceCreated = true;
waitForDatasourceToBeAvailable(datasourceName, Duration.ofSeconds(10));

// Update should fail as country data does not have every fields that city data has
int updateIntervalInDays = 1;
ResponseException exception = expectThrows(
ResponseException.class,
() -> updateDatasourceEndpoint(datasourceName, Ip2GeoDataServer.getEndpointCountry(), updateIntervalInDays)
);
assertEquals(RestStatus.BAD_REQUEST.getStatus(), exception.getResponse().getStatusLine().getStatusCode());
} finally {
if (isDatasourceCreated) {
deleteDatasource(datasourceName, 3);
}
}
}

private void updateDatasourceEndpoint(final String datasourceName, final String endpoint, final int updateInterval) throws IOException {
Map<String, Object> properties = Map.of(
UpdateDatasourceRequest.ENDPOINT_FIELD.getPreferredName(),
endpoint,
UpdateDatasourceRequest.UPDATE_INTERVAL_IN_DAYS_FIELD.getPreferredName(),
updateInterval
);
updateDatasource(datasourceName, properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.Randomness;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.geospatial.exceptions.IncompatibleDatasourceException;
import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.jobscheduler.spi.LockModel;
Expand Down Expand Up @@ -197,7 +198,7 @@ public void testDoExecute_whenIncompatibleFields_thenError() {
// Verify
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
verify(listener).onFailure(exceptionCaptor.capture());
assertEquals(OpenSearchException.class, exceptionCaptor.getValue().getClass());
assertEquals(IncompatibleDatasourceException.class, exceptionCaptor.getValue().getClass());
exceptionCaptor.getValue().getMessage().contains("does not contain");
verify(ip2GeoLockService).releaseLock(eq(lockModel), any(ActionListener.class));
}
Expand Down
Loading

0 comments on commit c77c5af

Please sign in to comment.