diff --git a/src/main/java/org/opensearch/geospatial/exceptions/IncompatibleDatasourceException.java b/src/main/java/org/opensearch/geospatial/exceptions/IncompatibleDatasourceException.java new file mode 100644 index 00000000..d4f00c02 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/exceptions/IncompatibleDatasourceException.java @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.exceptions; + +import java.io.IOException; + +import org.opensearch.OpenSearchException; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.rest.RestStatus; + +/** + * IncompatibleDatasourceException corresponding to the {@link RestStatus#BAD_REQUEST} status code + * + * The exception is thrown when a user tries to update datasource with new endpoint which is not compatible + * with current datasource + */ +public class IncompatibleDatasourceException extends OpenSearchException { + + public IncompatibleDatasourceException(String msg, Object... args) { + super(msg, args); + } + + public IncompatibleDatasourceException(String msg, Throwable cause, Object... args) { + super(msg, cause, args); + } + + public IncompatibleDatasourceException(StreamInput in) throws IOException { + super(in); + } + + @Override + public final RestStatus status() { + return RestStatus.BAD_REQUEST; + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/RestUpdateDatasourceHandler.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/RestUpdateDatasourceHandler.java index 77abae84..f9ba73ec 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/RestUpdateDatasourceHandler.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/RestUpdateDatasourceHandler.java @@ -31,15 +31,15 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { - final PutDatasourceRequest putDatasourceRequest = new PutDatasourceRequest(request.param("name")); + final UpdateDatasourceRequest updateDatasourceRequest = new UpdateDatasourceRequest(request.param("name")); if (request.hasContentOrSourceParam()) { try (XContentParser parser = request.contentOrSourceParamParser()) { - PutDatasourceRequest.PARSER.parse(parser, putDatasourceRequest, null); + UpdateDatasourceRequest.PARSER.parse(parser, updateDatasourceRequest, null); } } return channel -> client.executeLocally( UpdateDatasourceAction.INSTANCE, - putDatasourceRequest, + updateDatasourceRequest, new RestToXContentListener<>(channel) ); } diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceRequest.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceRequest.java index fdc0b357..41a7fd90 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceRequest.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceRequest.java @@ -33,8 +33,8 @@ @Log4j2 @EqualsAndHashCode(callSuper = false) public class UpdateDatasourceRequest extends AcknowledgedRequest { - private static final ParseField ENDPOINT_FIELD = new ParseField("endpoint"); - private static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days"); + public static final ParseField ENDPOINT_FIELD = new ParseField("endpoint"); + public static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days"); private static final int MAX_DATASOURCE_NAME_BYTES = 255; /** * @param name the datasource name diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java index a9b933ca..31c50007 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java @@ -21,6 +21,7 @@ import org.opensearch.action.support.HandledTransportAction; import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.common.inject.Inject; +import org.opensearch.geospatial.exceptions.IncompatibleDatasourceException; import org.opensearch.geospatial.exceptions.ConcurrentModificationException; import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; @@ -149,7 +150,7 @@ private void validateFieldsCompatibility(final UpdateDatasourceRequest request, List fields = datasourceUpdateService.getHeaderFields(request.getEndpoint()); if (datasource.isCompatible(fields) == false) { - throw new OpenSearchException( + throw new IncompatibleDatasourceException( "new fields [{}] does not contain all old fields [{}]", fields.toString(), datasource.getDatabase().getFields().toString() diff --git a/src/test/java/org/opensearch/geospatial/GeospatialRestTestCase.java b/src/test/java/org/opensearch/geospatial/GeospatialRestTestCase.java index f9449a0e..89e2819b 100644 --- a/src/test/java/org/opensearch/geospatial/GeospatialRestTestCase.java +++ b/src/test/java/org/opensearch/geospatial/GeospatialRestTestCase.java @@ -110,9 +110,9 @@ protected static void deletePipeline(String name) throws IOException { client().performRequest(request); } - protected Response createDatasource(final String name, Map properties) throws IOException { + protected Response createDatasource(final String name, Map properties) throws IOException { XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); - for (Map.Entry config : properties.entrySet()) { + for (Map.Entry config : properties.entrySet()) { builder.field(config.getKey(), config.getValue()); } builder.endObject(); @@ -149,16 +149,31 @@ protected Response deleteDatasource(final String name) throws IOException { return client().performRequest(request); } + protected Response deleteDatasource(final String name, final int retry) throws Exception { + for (int i = 0; i < retry; i++) { + try { + Request request = new Request(DELETE, buildDatasourcePath(name)); + return client().performRequest(request); + } catch (Exception e) { + if (i + 1 == retry) { + throw e; + } + Thread.sleep(1000); + } + } + throw new RuntimeException("should not reach here"); + } + protected Map getDatasource(final String name) throws Exception { Request request = new Request(GET, buildDatasourcePath(name)); Response response = client().performRequest(request); return createParser(XContentType.JSON.xContent(), EntityUtils.toString(response.getEntity())).map(); } - protected Response updateDatasource(final String name, Map config) throws IOException { + protected Response updateDatasource(final String name, Map properties) throws IOException { XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); - if (config != null && !config.isEmpty()) { - builder.value(config); + for (Map.Entry config : properties.entrySet()) { + builder.field(config.getKey(), config.getValue()); } builder.endObject(); diff --git a/src/test/java/org/opensearch/geospatial/exceptions/IncompatibleDatasourceExceptionTests.java b/src/test/java/org/opensearch/geospatial/exceptions/IncompatibleDatasourceExceptionTests.java new file mode 100644 index 00000000..9d03b10c --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/exceptions/IncompatibleDatasourceExceptionTests.java @@ -0,0 +1,44 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.exceptions; + +import lombok.SneakyThrows; + +import org.opensearch.common.io.stream.BytesStreamInput; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.rest.RestStatus; +import org.opensearch.test.OpenSearchTestCase; + +public class IncompatibleDatasourceExceptionTests extends OpenSearchTestCase { + public void testConstructor_whenCreated_thenSucceed() { + IncompatibleDatasourceException exception = new IncompatibleDatasourceException( + "New datasource is not compatible with existing datasource" + ); + assertEquals(RestStatus.BAD_REQUEST, exception.status()); + } + + public void testConstructor_whenCreatedWithRootCause_thenSucceed() { + IncompatibleDatasourceException exception = new IncompatibleDatasourceException( + "New datasource is not compatible with existing datasource", + new RuntimeException() + ); + assertEquals(RestStatus.BAD_REQUEST, exception.status()); + } + + @SneakyThrows + public void testConstructor_whenCreatedWithStream_thenSucceed() { + IncompatibleDatasourceException exception = new IncompatibleDatasourceException( + "New datasource is not compatible with existing datasource" + ); + + BytesStreamOutput output = new BytesStreamOutput(); + exception.writeTo(output); + BytesStreamInput input = new BytesStreamInput(output.bytes().toBytesRef().bytes); + IncompatibleDatasourceException copiedException = new IncompatibleDatasourceException(input); + assertEquals(exception.getMessage(), copiedException.getMessage()); + assertEquals(exception.status(), copiedException.status()); + } +} diff --git a/src/test/java/org/opensearch/geospatial/exceptions/ResourceInUseExceptionTests.java b/src/test/java/org/opensearch/geospatial/exceptions/ResourceInUseExceptionTests.java new file mode 100644 index 00000000..00465840 --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/exceptions/ResourceInUseExceptionTests.java @@ -0,0 +1,37 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.exceptions; + +import lombok.SneakyThrows; + +import org.opensearch.common.io.stream.BytesStreamInput; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.rest.RestStatus; +import org.opensearch.test.OpenSearchTestCase; + +public class ResourceInUseExceptionTests extends OpenSearchTestCase { + public void testConstructor_whenCreated_thenSucceed() { + ResourceInUseException exception = new ResourceInUseException("Resource is in use"); + assertEquals(RestStatus.BAD_REQUEST, exception.status()); + } + + public void testConstructor_whenCreatedWithRootCause_thenSucceed() { + ResourceInUseException exception = new ResourceInUseException("Resource is in use", new RuntimeException()); + assertEquals(RestStatus.BAD_REQUEST, exception.status()); + } + + @SneakyThrows + public void testConstructor_whenCreatedWithStream_thenSucceed() { + ResourceInUseException exception = new ResourceInUseException("New datasource is not compatible with existing datasource"); + + BytesStreamOutput output = new BytesStreamOutput(); + exception.writeTo(output); + BytesStreamInput input = new BytesStreamInput(output.bytes().toBytesRef().bytes); + ResourceInUseException copiedException = new ResourceInUseException(input); + assertEquals(exception.getMessage(), copiedException.getMessage()); + assertEquals(exception.status(), copiedException.status()); + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/RestUpdateDatasourceHandlerTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/RestUpdateDatasourceHandlerTests.java index 6af74414..4227e7c6 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/RestUpdateDatasourceHandlerTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/RestUpdateDatasourceHandlerTests.java @@ -41,11 +41,34 @@ public void testPrepareRequest_whenValidInput_thenSucceed() { AtomicBoolean isExecuted = new AtomicBoolean(false); verifyingClient.setExecuteLocallyVerifier((actionResponse, actionRequest) -> { - assertTrue(actionRequest instanceof PutDatasourceRequest); - PutDatasourceRequest putDatasourceRequest = (PutDatasourceRequest) actionRequest; - assertEquals("https://test.com", putDatasourceRequest.getEndpoint()); - assertEquals(TimeValue.timeValueDays(1), putDatasourceRequest.getUpdateInterval()); - assertEquals(datasourceName, putDatasourceRequest.getName()); + assertTrue(actionRequest instanceof UpdateDatasourceRequest); + UpdateDatasourceRequest updateDatasourceRequest = (UpdateDatasourceRequest) actionRequest; + assertEquals("https://test.com", updateDatasourceRequest.getEndpoint()); + assertEquals(TimeValue.timeValueDays(1), updateDatasourceRequest.getUpdateInterval()); + assertEquals(datasourceName, updateDatasourceRequest.getName()); + isExecuted.set(true); + return null; + }); + + dispatchRequest(request); + assertTrue(isExecuted.get()); + } + + public void testPrepareRequest_whenNullInput_thenSucceed() { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + String content = "{}"; + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT) + .withPath(String.format(Locale.ROOT, path, datasourceName)) + .withContent(new BytesArray(content), XContentType.JSON) + .build(); + AtomicBoolean isExecuted = new AtomicBoolean(false); + + verifyingClient.setExecuteLocallyVerifier((actionResponse, actionRequest) -> { + assertTrue(actionRequest instanceof UpdateDatasourceRequest); + UpdateDatasourceRequest updateDatasourceRequest = (UpdateDatasourceRequest) actionRequest; + assertNull(updateDatasourceRequest.getEndpoint()); + assertNull(updateDatasourceRequest.getUpdateInterval()); + assertEquals(datasourceName, updateDatasourceRequest.getName()); isExecuted.set(true); return null; }); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceIT.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceIT.java new file mode 100644 index 00000000..706ea6c5 --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceIT.java @@ -0,0 +1,104 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.action; + +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import lombok.SneakyThrows; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.opensearch.client.ResponseException; +import org.opensearch.geospatial.GeospatialRestTestCase; +import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.ip2geo.Ip2GeoDataServer; +import org.opensearch.rest.RestStatus; + +public class UpdateDatasourceIT extends GeospatialRestTestCase { + // Use this value in resource name to avoid name conflict among tests + private static final String PREFIX = UpdateDatasourceIT.class.getSimpleName().toLowerCase(Locale.ROOT); + + @BeforeClass + public static void start() { + Ip2GeoDataServer.start(); + } + + @AfterClass + public static void stop() { + Ip2GeoDataServer.stop(); + } + + @SneakyThrows + public void testUpdateDatasource_whenValidInput_thenUpdated() { + boolean isDatasourceCreated = false; + String datasourceName = PREFIX + GeospatialTestHelper.randomLowerCaseString(); + try { + Map datasourceProperties = Map.of( + PutDatasourceRequest.ENDPOINT_FIELD.getPreferredName(), + Ip2GeoDataServer.getEndpointCountry() + ); + + // Create datasource and wait for it to be available + createDatasource(datasourceName, datasourceProperties); + isDatasourceCreated = true; + waitForDatasourceToBeAvailable(datasourceName, Duration.ofSeconds(10)); + + int updateIntervalInDays = 1; + updateDatasourceEndpoint(datasourceName, Ip2GeoDataServer.getEndpointCity(), updateIntervalInDays); + List> datasources = (List>) getDatasource(datasourceName).get("datasources"); + + assertEquals(Ip2GeoDataServer.getEndpointCity(), datasources.get(0).get("endpoint")); + assertEquals(updateIntervalInDays, datasources.get(0).get("update_interval_in_days")); + } finally { + if (isDatasourceCreated) { + deleteDatasource(datasourceName, 3); + } + } + } + + @SneakyThrows + public void testUpdateDatasource_whenIncompatibleFields_thenFails() { + boolean isDatasourceCreated = false; + String datasourceName = PREFIX + GeospatialTestHelper.randomLowerCaseString(); + try { + Map datasourceProperties = Map.of( + PutDatasourceRequest.ENDPOINT_FIELD.getPreferredName(), + Ip2GeoDataServer.getEndpointCity() + ); + + // Create datasource and wait for it to be available + createDatasource(datasourceName, datasourceProperties); + isDatasourceCreated = true; + waitForDatasourceToBeAvailable(datasourceName, Duration.ofSeconds(10)); + + // Update should fail as country data does not have every fields that city data has + int updateIntervalInDays = 1; + ResponseException exception = expectThrows( + ResponseException.class, + () -> updateDatasourceEndpoint(datasourceName, Ip2GeoDataServer.getEndpointCountry(), updateIntervalInDays) + ); + assertEquals(RestStatus.BAD_REQUEST.getStatus(), exception.getResponse().getStatusLine().getStatusCode()); + } finally { + if (isDatasourceCreated) { + deleteDatasource(datasourceName, 3); + } + } + } + + private void updateDatasourceEndpoint(final String datasourceName, final String endpoint, final int updateInterval) throws IOException { + Map properties = Map.of( + UpdateDatasourceRequest.ENDPOINT_FIELD.getPreferredName(), + endpoint, + UpdateDatasourceRequest.UPDATE_INTERVAL_IN_DAYS_FIELD.getPreferredName(), + updateInterval + ); + updateDatasource(datasourceName, properties); + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java index 2d3a5602..76763fb1 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java @@ -27,6 +27,7 @@ import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.common.Randomness; import org.opensearch.common.unit.TimeValue; +import org.opensearch.geospatial.exceptions.IncompatibleDatasourceException; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; import org.opensearch.jobscheduler.spi.LockModel; @@ -197,7 +198,7 @@ public void testDoExecute_whenIncompatibleFields_thenError() { // Verify ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); verify(listener).onFailure(exceptionCaptor.capture()); - assertEquals(OpenSearchException.class, exceptionCaptor.getValue().getClass()); + assertEquals(IncompatibleDatasourceException.class, exceptionCaptor.getValue().getClass()); exceptionCaptor.getValue().getMessage().contains("does not contain"); verify(ip2GeoLockService).releaseLock(eq(lockModel), any(ActionListener.class)); } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorIT.java b/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorIT.java index 9c59039c..bf447a5a 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorIT.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorIT.java @@ -25,8 +25,11 @@ import org.opensearch.geospatial.GeospatialTestHelper; import org.opensearch.geospatial.ip2geo.Ip2GeoDataServer; import org.opensearch.geospatial.ip2geo.action.PutDatasourceRequest; +import org.opensearch.rest.RestStatus; public class Ip2GeoProcessorIT extends GeospatialRestTestCase { + // Use this value in resource name to avoid name conflict among tests + private static final String PREFIX = Ip2GeoProcessorIT.class.getSimpleName().toLowerCase(Locale.ROOT); private static final String CITY = "city"; private static final String COUNTRY = "country"; private static final String IP = "ip"; @@ -34,7 +37,7 @@ public class Ip2GeoProcessorIT extends GeospatialRestTestCase { @SneakyThrows public void testCreateIp2GeoProcessor_whenNoSuchDatasourceExist_thenFails() { - String pipelineName = GeospatialTestHelper.randomLowerCaseString(); + String pipelineName = PREFIX + GeospatialTestHelper.randomLowerCaseString(); // Run ResponseException exception = expectThrows( @@ -44,25 +47,35 @@ public void testCreateIp2GeoProcessor_whenNoSuchDatasourceExist_thenFails() { // Verify assertTrue(exception.getMessage().contains("doesn't exist")); - assertEquals(400, exception.getResponse().getStatusLine().getStatusCode()); + assertEquals(RestStatus.BAD_REQUEST.getStatus(), exception.getResponse().getStatusLine().getStatusCode()); } @SneakyThrows public void testCreateIp2GeoProcessor_whenValidInput_thenAddData() { Ip2GeoDataServer.start(); + boolean isDatasourceCreated = false; + boolean isProcessorCreated = false; + String pipelineName = PREFIX + GeospatialTestHelper.randomLowerCaseString(); + String datasourceName = PREFIX + GeospatialTestHelper.randomLowerCaseString(); try { - String pipelineName = GeospatialTestHelper.randomLowerCaseString(); - String datasourceName = GeospatialTestHelper.randomLowerCaseString(); String targetField = GeospatialTestHelper.randomLowerCaseString(); String field = GeospatialTestHelper.randomLowerCaseString(); - Map datasourceProperties = Map.of( + Map datasourceProperties = Map.of( PutDatasourceRequest.ENDPOINT_FIELD.getPreferredName(), Ip2GeoDataServer.getEndpointCity() ); // Create datasource and wait for it to be available createDatasource(datasourceName, datasourceProperties); + isDatasourceCreated = true; + // Creation of datasource with same name should fail + ResponseException createException = expectThrows( + ResponseException.class, + () -> createDatasource(datasourceName, datasourceProperties) + ); + // Verify + assertEquals(RestStatus.BAD_REQUEST.getStatus(), createException.getResponse().getStatusLine().getStatusCode()); waitForDatasourceToBeAvailable(datasourceName, Duration.ofSeconds(10)); Map processorProperties = Map.of( @@ -78,6 +91,7 @@ public void testCreateIp2GeoProcessor_whenValidInput_thenAddData() { // Create ip2geo processor createIp2GeoProcessorPipeline(pipelineName, processorProperties); + isProcessorCreated = true; Map> sampleData = getSampleData(); List docs = sampleData.entrySet() @@ -96,21 +110,31 @@ public void testCreateIp2GeoProcessor_whenValidInput_thenAddData() { }); // Delete datasource fails when there is a process using it - ResponseException exception = expectThrows(ResponseException.class, () -> deleteDatasource(datasourceName)); + ResponseException deleteException = expectThrows(ResponseException.class, () -> deleteDatasource(datasourceName)); // Verify - assertEquals(400, exception.getResponse().getStatusLine().getStatusCode()); - - // Delete resources - deletePipeline(pipelineName); - deleteDatasource(datasourceName); + assertEquals(RestStatus.BAD_REQUEST.getStatus(), deleteException.getResponse().getStatusLine().getStatusCode()); } finally { + Exception exception = null; + try { + if (isProcessorCreated) { + deletePipeline(pipelineName); + } + if (isDatasourceCreated) { + deleteDatasource(datasourceName, 3); + } + } catch (Exception e) { + exception = e; + } Ip2GeoDataServer.stop(); + if (exception != null) { + throw exception; + } } } private Response createIp2GeoProcessorPipeline(final String pipelineName, final Map properties) throws IOException { String field = GeospatialTestHelper.randomLowerCaseString(); - String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + String datasourceName = PREFIX + GeospatialTestHelper.randomLowerCaseString(); Map defaultProperties = Map.of( Ip2GeoProcessor.CONFIG_FIELD, field,