diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java index 43e0f76c..f1e0fda5 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java @@ -8,6 +8,7 @@ package org.opensearch.geospatial.ip2geo.action; +import java.io.IOException; import java.time.Instant; import lombok.extern.log4j.Log4j2; @@ -15,6 +16,7 @@ import org.opensearch.ResourceAlreadyExistsException; import org.opensearch.action.ActionListener; import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.StepListener; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.support.ActionFilters; @@ -71,18 +73,26 @@ public PutDatasourceTransportAction( @Override protected void doExecute(final Task task, final PutDatasourceRequest request, final ActionListener listener) { try { - Datasource datasource = Datasource.Builder.build(request); - IndexRequest indexRequest = new IndexRequest().index(DatasourceExtension.JOB_INDEX_NAME) - .id(datasource.getName()) - .source(datasource.toXContent(JsonXContent.contentBuilder(), null)) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .opType(DocWriteRequest.OpType.CREATE); - client.index(indexRequest, getIndexResponseListener(datasource, listener)); + StepListener createIndexStep = new StepListener<>(); + datasourceFacade.createIndexIfNotExists(createIndexStep); + createIndexStep.whenComplete(v -> putDatasource(request, listener), exception -> listener.onFailure(exception)); } catch (Exception e) { listener.onFailure(e); } } + @VisibleForTesting + protected void putDatasource(final PutDatasourceRequest request, final ActionListener listener) + throws IOException { + Datasource datasource = Datasource.Builder.build(request); + IndexRequest indexRequest = new IndexRequest().index(DatasourceExtension.JOB_INDEX_NAME) + .id(datasource.getName()) + .source(datasource.toXContent(JsonXContent.contentBuilder(), null)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .opType(DocWriteRequest.OpType.CREATE); + client.index(indexRequest, getIndexResponseListener(datasource, listener)); + } + @VisibleForTesting protected ActionListener getIndexResponseListener( final Datasource datasource, diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java index c3be7fef..99b98e57 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java @@ -8,18 +8,28 @@ package org.opensearch.geospatial.ip2geo.common; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; import lombok.extern.log4j.Log4j2; import org.opensearch.OpenSearchException; +import org.opensearch.ResourceAlreadyExistsException; import org.opensearch.action.ActionListener; import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.StepListener; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; import org.opensearch.action.get.MultiGetItemResponse; @@ -29,7 +39,9 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.WriteRequest; import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.XContentFactory; @@ -49,12 +61,64 @@ @Log4j2 public class DatasourceFacade { private static final Integer MAX_SIZE = 1000; + private static final Tuple INDEX_SETTING_NUM_OF_SHARDS = new Tuple<>("index.number_of_shards", 1); + private static final Tuple INDEX_SETTING_AUTO_EXPAND_REPLICAS = new Tuple<>("index.auto_expand_replicas", "0-all"); + private static final Tuple INDEX_SETTING_HIDDEN = new Tuple<>("index.hidden", true); private final Client client; + private final ClusterService clusterService; private final ClusterSettings clusterSettings; - public DatasourceFacade(final Client client, final ClusterSettings clusterSettings) { + public DatasourceFacade(final Client client, final ClusterService clusterService) { this.client = client; - this.clusterSettings = clusterSettings; + this.clusterService = clusterService; + this.clusterSettings = clusterService.getClusterSettings(); + } + + /** + * Create a datasource index of single shard with auto expand replicas to all nodes + * + * We want the index to expand to all replica so that datasource query request can be executed locally + * for faster ingestion time. + */ + public void createIndexIfNotExists(final StepListener stepListener) { + if (clusterService.state().metadata().hasIndex(DatasourceExtension.JOB_INDEX_NAME) == true) { + stepListener.onResponse(null); + return; + } + final Map indexSettings = new HashMap<>(); + indexSettings.put(INDEX_SETTING_NUM_OF_SHARDS.v1(), INDEX_SETTING_NUM_OF_SHARDS.v2()); + indexSettings.put(INDEX_SETTING_AUTO_EXPAND_REPLICAS.v1(), INDEX_SETTING_AUTO_EXPAND_REPLICAS.v2()); + indexSettings.put(INDEX_SETTING_HIDDEN.v1(), INDEX_SETTING_HIDDEN.v2()); + final CreateIndexRequest createIndexRequest = new CreateIndexRequest(DatasourceExtension.JOB_INDEX_NAME).mapping(getIndexMapping()) + .settings(indexSettings); + client.admin().indices().create(createIndexRequest, new ActionListener<>() { + @Override + public void onResponse(final CreateIndexResponse createIndexResponse) { + stepListener.onResponse(null); + } + + @Override + public void onFailure(final Exception e) { + if (e instanceof ResourceAlreadyExistsException) { + log.info("index[{}] already exist", DatasourceExtension.JOB_INDEX_NAME); + stepListener.onResponse(null); + return; + } + stepListener.onFailure(e); + } + }); + } + + private String getIndexMapping() { + try { + try (InputStream is = DatasourceFacade.class.getResourceAsStream("/mappings/ip2geo_datasource.json")) { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) { + return reader.lines().map(String::trim).collect(Collectors.joining()); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } } /** diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacade.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacade.java index 92650020..dd498d1b 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacade.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacade.java @@ -115,7 +115,7 @@ public void createIndexIfNotExists(final String indexName) { */ private String getIndexMapping() { try { - try (InputStream is = DatasourceFacade.class.getResourceAsStream("/mappings/ip2geo_datasource.json")) { + try (InputStream is = DatasourceFacade.class.getResourceAsStream("/mappings/ip2geo_geoip.json")) { try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) { return reader.lines().map(String::trim).collect(Collectors.joining()); } diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java index 7c7eae55..d56f5184 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java @@ -182,7 +182,6 @@ public class Datasource implements Writeable, ScheduledJobParameter { PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), INDICES_FIELD); PARSER.declareObject(ConstructingObjectParser.constructorArg(), Database.PARSER, DATABASE_FIELD); PARSER.declareObject(ConstructingObjectParser.constructorArg(), UpdateStats.PARSER, UPDATE_STATS_FIELD); - } @VisibleForTesting diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java index 07ea55c7..0f27993d 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java @@ -210,7 +210,8 @@ private String setupIndex(final DatasourceManifest manifest, final Datasource da * @return */ private boolean shouldUpdate(final Datasource datasource, final DatasourceManifest manifest) { - if (datasource.getDatabase().getUpdatedAt().toEpochMilli() > manifest.getUpdatedAt()) { + if (datasource.getDatabase().getUpdatedAt() != null + && datasource.getDatabase().getUpdatedAt().toEpochMilli() > manifest.getUpdatedAt()) { return false; } diff --git a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java index 085fab98..37104c69 100644 --- a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java +++ b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java @@ -94,7 +94,7 @@ public Map getProcessors(Processor.Parameters paramet new Ip2GeoProcessor.Factory( parameters.client, parameters.ingestService, - new DatasourceFacade(parameters.client, parameters.ingestService.getClusterService().getClusterSettings()), + new DatasourceFacade(parameters.client, parameters.ingestService.getClusterService()), new GeoIpDataFacade(parameters.ingestService.getClusterService(), parameters.client) ) ) @@ -128,7 +128,7 @@ public Collection createComponents( Supplier repositoriesServiceSupplier ) { GeoIpDataFacade geoIpDataFacade = new GeoIpDataFacade(clusterService, client); - DatasourceFacade datasourceFacade = new DatasourceFacade(client, clusterService.getClusterSettings()); + DatasourceFacade datasourceFacade = new DatasourceFacade(client, clusterService); DatasourceUpdateService datasourceUpdateService = new DatasourceUpdateService( clusterService, client, diff --git a/src/main/resources/mappings/ip2geo_datasource.json b/src/main/resources/mappings/ip2geo_datasource.json index 3179ef0d..3f3d5aa1 100644 --- a/src/main/resources/mappings/ip2geo_datasource.json +++ b/src/main/resources/mappings/ip2geo_datasource.json @@ -1,9 +1,77 @@ { - "dynamic": false, - "properties": { - "_cidr": { - "type": "ip_range", - "doc_values": false + "properties" : { + "database" : { + "properties" : { + "fields" : { + "type" : "text" + }, + "sha256_hash" : { + "type" : "text" + }, + "provider" : { + "type" : "text" + }, + "updated_at_in_epoch_millis" : { + "type" : "long" + }, + "valid_for_in_days" : { + "type" : "long" + } + } + }, + "enabled_time" : { + "type" : "long" + }, + "endpoint" : { + "type" : "text" + }, + "name" : { + "type" : "text" + }, + "indices" : { + "type" : "text" + }, + "last_update_time" : { + "type" : "long" + }, + "schedule" : { + "properties" : { + "interval" : { + "properties" : { + "period" : { + "type" : "long" + }, + "start_time" : { + "type" : "long" + }, + "unit" : { + "type" : "text" + } + } + } + } + }, + "state" : { + "type" : "text" + }, + "update_enabled" : { + "type" : "boolean" + }, + "update_stats" : { + "properties" : { + "last_failed_at_in_epoch_millis" : { + "type" : "long" + }, + "last_processing_time_in_millis" : { + "type" : "long" + }, + "last_skipped_at_in_epoch_millis" : { + "type" : "long" + }, + "last_succeeded_at_in_epoch_millis" : { + "type" : "long" + } + } } } -} +} \ No newline at end of file diff --git a/src/main/resources/mappings/ip2geo_geoip.json b/src/main/resources/mappings/ip2geo_geoip.json new file mode 100644 index 00000000..3179ef0d --- /dev/null +++ b/src/main/resources/mappings/ip2geo_geoip.json @@ -0,0 +1,9 @@ +{ + "dynamic": false, + "properties": { + "_cidr": { + "type": "ip_range", + "doc_values": false + } + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java index 48451a9c..7163c7cd 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java @@ -233,7 +233,11 @@ public void doE Request request, ActionListener listener ) { - listener.onResponse((Response) executeVerifier.get().apply(action, request)); + try { + listener.onResponse((Response) executeVerifier.get().apply(action, request)); + } catch (Exception e) { + listener.onFailure(e); + } } /** diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java index ec8d2c9b..2334cf0e 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java @@ -156,7 +156,7 @@ public void testStreamInOut_whenValidInput_thenSucceed() throws Exception { String domain = GeospatialTestHelper.randomLowerCaseString(); PutDatasourceRequest request = new PutDatasourceRequest(datasourceName); request.setEndpoint(String.format(Locale.ROOT, "https://%s.com", domain)); - request.setUpdateInterval(TimeValue.timeValueDays(Randomness.get().nextInt(30) + 1)); + request.setUpdateInterval(TimeValue.timeValueDays(Randomness.get().nextInt(29) + 1)); // Run BytesStreamOutput output = new BytesStreamOutput(); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportActionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportActionTests.java index ff542418..62613581 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportActionTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportActionTests.java @@ -14,9 +14,11 @@ import static org.mockito.Mockito.verify; import org.junit.Before; +import org.mockito.ArgumentCaptor; import org.opensearch.ResourceAlreadyExistsException; import org.opensearch.action.ActionListener; import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.StepListener; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.support.WriteRequest; import org.opensearch.action.support.master.AcknowledgedResponse; @@ -44,7 +46,7 @@ public void init() { ); } - public void testDoExecute() throws Exception { + public void testDoExecute_whenValidInput_thenSucceed() throws Exception { Task task = mock(Task.class); PutDatasourceRequest request = new PutDatasourceRequest("test"); request.setEndpoint(sampleManifestUrl()); @@ -59,7 +61,18 @@ public void testDoExecute() throws Exception { assertEquals(DocWriteRequest.OpType.CREATE, indexRequest.opType()); return null; }); + + // Run action.doExecute(task, request, listener); + + // Verify + ArgumentCaptor captor = ArgumentCaptor.forClass(StepListener.class); + verify(datasourceFacade).createIndexIfNotExists(captor.capture()); + + // Run + captor.getValue().onResponse(null); + + // Verify verify(verifyingClient).index(any(IndexRequest.class), any(ActionListener.class)); verify(listener).onResponse(new AcknowledgedResponse(true)); } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java index 9f114751..bac7f7dd 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java @@ -17,14 +17,16 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.Arrays; -import java.util.HashSet; import java.util.List; import org.apache.lucene.search.TotalHits; import org.junit.Before; import org.mockito.ArgumentCaptor; +import org.opensearch.ResourceAlreadyExistsException; import org.opensearch.action.ActionListener; import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.StepListener; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; import org.opensearch.action.get.MultiGetItemResponse; @@ -36,8 +38,6 @@ import org.opensearch.action.support.WriteRequest; import org.opensearch.common.Randomness; import org.opensearch.common.bytes.BytesReference; -import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.geospatial.GeospatialTestHelper; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; @@ -54,10 +54,70 @@ public class DatasourceFacadeTests extends Ip2GeoTestCase { @Before public void init() { - datasourceFacade = new DatasourceFacade( - verifyingClient, - new ClusterSettings(Settings.EMPTY, new HashSet<>(Ip2GeoSettings.settings())) + datasourceFacade = new DatasourceFacade(verifyingClient, clusterService); + } + + public void testCreateIndexIfNotExists_whenIndexExist_thenCreateRequestIsNotCalled() { + when(metadata.hasIndex(DatasourceExtension.JOB_INDEX_NAME)).thenReturn(true); + + // Verify + verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { throw new RuntimeException("Shouldn't get called"); }); + + // Run + StepListener stepListener = new StepListener<>(); + datasourceFacade.createIndexIfNotExists(stepListener); + + // Verify stepListener is called + stepListener.result(); + } + + public void testCreateIndexIfNotExists_whenIndexExist_thenCreateRequestIsCalled() { + when(metadata.hasIndex(DatasourceExtension.JOB_INDEX_NAME)).thenReturn(false); + + // Verify + verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { + assertTrue(actionRequest instanceof CreateIndexRequest); + CreateIndexRequest request = (CreateIndexRequest) actionRequest; + assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.index()); + assertEquals("1", request.settings().get("index.number_of_shards")); + assertEquals("0-all", request.settings().get("index.auto_expand_replicas")); + assertEquals("true", request.settings().get("index.hidden")); + assertNotNull(request.mappings()); + return null; + }); + + // Run + StepListener stepListener = new StepListener<>(); + datasourceFacade.createIndexIfNotExists(stepListener); + + // Verify stepListener is called + stepListener.result(); + } + + public void testCreateIndexIfNotExists_whenIndexCreatedAlready_thenExceptionIsIgnored() { + when(metadata.hasIndex(DatasourceExtension.JOB_INDEX_NAME)).thenReturn(false); + verifyingClient.setExecuteVerifier( + (actionResponse, actionRequest) -> { throw new ResourceAlreadyExistsException(DatasourceExtension.JOB_INDEX_NAME); } ); + + // Run + StepListener stepListener = new StepListener<>(); + datasourceFacade.createIndexIfNotExists(stepListener); + + // Verify stepListener is called + stepListener.result(); + } + + public void testCreateIndexIfNotExists_whenExceptionIsThrown_thenExceptionIsThrown() { + when(metadata.hasIndex(DatasourceExtension.JOB_INDEX_NAME)).thenReturn(false); + verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { throw new RuntimeException(); }); + + // Run + StepListener stepListener = new StepListener<>(); + datasourceFacade.createIndexIfNotExists(stepListener); + + // Verify stepListener is called + expectThrows(RuntimeException.class, () -> stepListener.result()); } public void testUpdateDatasource_whenValidInput_thenSucceed() throws Exception { @@ -106,7 +166,7 @@ public void testGetDatasource_whenExistWithListener_thenListenerIsCalledWithData verify(listener).onResponse(eq(datasource)); } - public void testGetDatasource_whenExistWithListener_thenListenerIsCalledWithNull() { + public void testGetDatasource_whenNotExistWithListener_thenListenerIsCalledWithNull() { Datasource datasource = setupClientForGetRequest(false, null); ActionListener listener = mock(ActionListener.class); datasourceFacade.getDatasource(datasource.getName(), listener); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceExtensionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceExtensionTests.java index 18fc8628..75dea954 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceExtensionTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceExtensionTests.java @@ -20,7 +20,7 @@ import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; public class DatasourceExtensionTests extends Ip2GeoTestCase { - public void testBasic() throws Exception { + public void testBasic() { DatasourceExtension extension = new DatasourceExtension(); assertEquals("scheduler_geospatial_ip2geo_datasource", extension.getJobType()); assertEquals(JOB_INDEX_NAME, extension.getJobIndex());