diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java index f1e0fda5..40882492 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java @@ -15,22 +15,16 @@ import org.opensearch.ResourceAlreadyExistsException; import org.opensearch.action.ActionListener; -import org.opensearch.action.DocWriteRequest; import org.opensearch.action.StepListener; -import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; -import org.opensearch.action.support.WriteRequest; import org.opensearch.action.support.master.AcknowledgedResponse; -import org.opensearch.client.Client; import org.opensearch.common.inject.Inject; -import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.geospatial.annotation.VisibleForTesting; import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; import org.opensearch.geospatial.ip2geo.common.DatasourceState; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; -import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService; import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.tasks.Task; @@ -42,7 +36,6 @@ */ @Log4j2 public class PutDatasourceTransportAction extends HandledTransportAction { - private final Client client; private final ThreadPool threadPool; private final DatasourceFacade datasourceFacade; private final DatasourceUpdateService datasourceUpdateService; @@ -51,20 +44,19 @@ public class PutDatasourceTransportAction extends HandledTransportAction listener) throws IOException { Datasource datasource = Datasource.Builder.build(request); - IndexRequest indexRequest = new IndexRequest().index(DatasourceExtension.JOB_INDEX_NAME) - .id(datasource.getName()) - .source(datasource.toXContent(JsonXContent.contentBuilder(), null)) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .opType(DocWriteRequest.OpType.CREATE); - client.index(indexRequest, getIndexResponseListener(datasource, listener)); + datasourceFacade.putDatasource(datasource, getIndexResponseListener(datasource, listener)); } @VisibleForTesting diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java index 99b98e57..71d8122b 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java @@ -21,6 +21,8 @@ import java.util.Objects; import java.util.stream.Collectors; +import javax.swing.*; + import lombok.extern.log4j.Log4j2; import org.opensearch.OpenSearchException; @@ -34,7 +36,6 @@ import org.opensearch.action.get.GetResponse; import org.opensearch.action.get.MultiGetItemResponse; import org.opensearch.action.get.MultiGetResponse; -import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.WriteRequest; @@ -129,12 +130,30 @@ private String getIndexMapping() { */ public IndexResponse updateDatasource(final Datasource datasource) throws IOException { datasource.setLastUpdateTime(Instant.now()); - IndexRequestBuilder requestBuilder = client.prepareIndex(DatasourceExtension.JOB_INDEX_NAME); - requestBuilder.setId(datasource.getName()); - requestBuilder.setOpType(DocWriteRequest.OpType.INDEX); - requestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - requestBuilder.setSource(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)); - return client.index(requestBuilder.request()).actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT)); + return client.prepareIndex(DatasourceExtension.JOB_INDEX_NAME) + .setId(datasource.getName()) + .setOpType(DocWriteRequest.OpType.INDEX) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .setSource(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .execute() + .actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT)); + } + + /** + * Put datasource in an index {@code DatasourceExtension.JOB_INDEX_NAME} + * + * @param datasource the datasource + * @param listener the listener + * @throws IOException exception + */ + public void putDatasource(final Datasource datasource, final ActionListener listener) throws IOException { + datasource.setLastUpdateTime(Instant.now()); + client.prepareIndex(DatasourceExtension.JOB_INDEX_NAME) + .setId(datasource.getName()) + .setOpType(DocWriteRequest.OpType.CREATE) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .setSource(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .execute(listener); } /** diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java index bc78fd0b..3e902c61 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java @@ -14,7 +14,6 @@ import lombok.extern.log4j.Log4j2; import org.opensearch.action.ActionListener; -import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; import org.opensearch.geospatial.annotation.VisibleForTesting; import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; @@ -53,7 +52,6 @@ public static DatasourceRunner getJobRunnerInstance() { } private ClusterService clusterService; - private Client client; private DatasourceUpdateService datasourceUpdateService; private Ip2GeoExecutor ip2GeoExecutor; private DatasourceFacade datasourceFacade; @@ -68,13 +66,11 @@ private DatasourceRunner() { */ public void initialize( final ClusterService clusterService, - final Client client, final DatasourceUpdateService datasourceUpdateService, final Ip2GeoExecutor ip2GeoExecutor, final DatasourceFacade datasourceFacade ) { this.clusterService = clusterService; - this.client = client; this.datasourceUpdateService = datasourceUpdateService; this.ip2GeoExecutor = ip2GeoExecutor; this.datasourceFacade = datasourceFacade; diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java index 0f27993d..98423a7a 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java @@ -22,7 +22,6 @@ import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVRecord; import org.opensearch.OpenSearchException; -import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; @@ -35,19 +34,16 @@ public class DatasourceUpdateService { private final ClusterService clusterService; private final ClusterSettings clusterSettings; - private final Client client; private final DatasourceFacade datasourceFacade; private final GeoIpDataFacade geoIpDataFacade; public DatasourceUpdateService( final ClusterService clusterService, - final Client client, final DatasourceFacade datasourceFacade, final GeoIpDataFacade geoIpDataFacade ) { this.clusterService = clusterService; this.clusterSettings = clusterService.getClusterSettings(); - this.client = client; this.datasourceFacade = datasourceFacade; this.geoIpDataFacade = geoIpDataFacade; } diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java index a0f8974d..ab810fb1 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java @@ -26,7 +26,6 @@ import lombok.extern.log4j.Log4j2; import org.opensearch.action.ActionListener; -import org.opensearch.client.Client; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.geospatial.annotation.VisibleForTesting; import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; @@ -52,7 +51,6 @@ public final class Ip2GeoProcessor extends AbstractProcessor { private final Set properties; private final boolean ignoreMissing; private final boolean firstOnly; - private final Client client; private final ClusterSettings clusterSettings; private final DatasourceFacade datasourceFacade; private final GeoIpDataFacade geoIpDataFacade; @@ -72,8 +70,9 @@ public final class Ip2GeoProcessor extends AbstractProcessor { * @param properties the properties * @param ignoreMissing true if documents with a missing value for the field should be ignored * @param firstOnly true if only first result should be returned in case of array - * @param client the client * @param clusterSettings the cluster settings + * @param datasourceFacade the datasource facade + * @param geoIpDataFacade the geoip data facade */ public Ip2GeoProcessor( final String tag, @@ -84,7 +83,6 @@ public Ip2GeoProcessor( final Set properties, final boolean ignoreMissing, final boolean firstOnly, - final Client client, final ClusterSettings clusterSettings, final DatasourceFacade datasourceFacade, final GeoIpDataFacade geoIpDataFacade @@ -96,7 +94,6 @@ public Ip2GeoProcessor( this.properties = properties; this.ignoreMissing = ignoreMissing; this.firstOnly = firstOnly; - this.client = client; this.clusterSettings = clusterSettings; this.datasourceFacade = datasourceFacade; this.geoIpDataFacade = geoIpDataFacade; @@ -318,7 +315,6 @@ public String getType() { * Ip2Geo processor factory */ public static final class Factory implements Processor.Factory { - private final Client client; private final IngestService ingestService; private final DatasourceFacade datasourceFacade; private final GeoIpDataFacade geoIpDataFacade; @@ -326,16 +322,11 @@ public static final class Factory implements Processor.Factory { /** * Default constructor * - * @param client the client * @param ingestService the ingest service + * @param datasourceFacade the datasource facade + * @param geoIpDataFacade the geoip data facade */ - public Factory( - final Client client, - final IngestService ingestService, - final DatasourceFacade datasourceFacade, - final GeoIpDataFacade geoIpDataFacade - ) { - this.client = client; + public Factory(final IngestService ingestService, final DatasourceFacade datasourceFacade, final GeoIpDataFacade geoIpDataFacade) { this.ingestService = ingestService; this.datasourceFacade = datasourceFacade; this.geoIpDataFacade = geoIpDataFacade; @@ -380,7 +371,6 @@ public Ip2GeoProcessor create( propertyNames == null ? null : new HashSet<>(propertyNames), ignoreMissing, firstOnly, - client, ingestService.getClusterService().getClusterSettings(), datasourceFacade, geoIpDataFacade diff --git a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java index 3a21189e..ff54aed6 100644 --- a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java +++ b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java @@ -92,7 +92,6 @@ public Map getProcessors(Processor.Parameters paramet .put( Ip2GeoProcessor.TYPE, new Ip2GeoProcessor.Factory( - parameters.client, parameters.ingestService, new DatasourceFacade(parameters.client, parameters.ingestService.getClusterService()), new GeoIpDataFacade(parameters.ingestService.getClusterService(), parameters.client) @@ -129,19 +128,13 @@ public Collection createComponents( ) { GeoIpDataFacade geoIpDataFacade = new GeoIpDataFacade(clusterService, client); DatasourceFacade datasourceFacade = new DatasourceFacade(client, clusterService); - DatasourceUpdateService datasourceUpdateService = new DatasourceUpdateService( - clusterService, - client, - datasourceFacade, - geoIpDataFacade - ); + DatasourceUpdateService datasourceUpdateService = new DatasourceUpdateService(clusterService, datasourceFacade, geoIpDataFacade); Ip2GeoExecutor ip2GeoExecutor = new Ip2GeoExecutor(threadPool); /** * We don't need to return datasource runner because it is used only by job scheduler and job scheduler * does not use DI but it calls DatasourceExtension#getJobRunner to get DatasourceRunner instance. */ - DatasourceRunner.getJobRunnerInstance() - .initialize(clusterService, client, datasourceUpdateService, ip2GeoExecutor, datasourceFacade); + DatasourceRunner.getJobRunnerInstance().initialize(clusterService, datasourceUpdateService, ip2GeoExecutor, datasourceFacade); return List.of(UploadStats.getInstance(), datasourceUpdateService, datasourceFacade, ip2GeoExecutor, geoIpDataFacade); } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportActionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportActionTests.java index 62613581..3d255cd2 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportActionTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportActionTests.java @@ -13,21 +13,19 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import lombok.SneakyThrows; + import org.junit.Before; import org.mockito.ArgumentCaptor; import org.opensearch.ResourceAlreadyExistsException; import org.opensearch.action.ActionListener; -import org.opensearch.action.DocWriteRequest; import org.opensearch.action.StepListener; -import org.opensearch.action.index.IndexRequest; -import org.opensearch.action.support.WriteRequest; import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.common.unit.TimeValue; import org.opensearch.geospatial.GeospatialTestHelper; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; import org.opensearch.geospatial.ip2geo.common.DatasourceState; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; -import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension; import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.tasks.Task; @@ -36,31 +34,16 @@ public class PutDatasourceTransportActionTests extends Ip2GeoTestCase { @Before public void init() { - action = new PutDatasourceTransportAction( - transportService, - actionFilters, - verifyingClient, - threadPool, - datasourceFacade, - datasourceUpdateService - ); + action = new PutDatasourceTransportAction(transportService, actionFilters, threadPool, datasourceFacade, datasourceUpdateService); } - public void testDoExecute_whenValidInput_thenSucceed() throws Exception { + @SneakyThrows + public void testDoExecute_whenValidInput_thenSucceed() { Task task = mock(Task.class); - PutDatasourceRequest request = new PutDatasourceRequest("test"); + PutDatasourceRequest request = new PutDatasourceRequest(GeospatialTestHelper.randomLowerCaseString()); request.setEndpoint(sampleManifestUrl()); request.setUpdateInterval(TimeValue.timeValueDays(1)); ActionListener listener = mock(ActionListener.class); - verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { - assertTrue(actionRequest instanceof IndexRequest); - IndexRequest indexRequest = (IndexRequest) actionRequest; - assertEquals(DatasourceExtension.JOB_INDEX_NAME, indexRequest.index()); - assertEquals(request.getName(), indexRequest.id()); - assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, indexRequest.getRefreshPolicy()); - assertEquals(DocWriteRequest.OpType.CREATE, indexRequest.opType()); - return null; - }); // Run action.doExecute(task, request, listener); @@ -71,13 +54,21 @@ public void testDoExecute_whenValidInput_thenSucceed() throws Exception { // Run captor.getValue().onResponse(null); - // Verify - verify(verifyingClient).index(any(IndexRequest.class), any(ActionListener.class)); + ArgumentCaptor datasourceCaptor = ArgumentCaptor.forClass(Datasource.class); + ArgumentCaptor actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); + verify(datasourceFacade).putDatasource(datasourceCaptor.capture(), actionListenerCaptor.capture()); + assertEquals(request.getName(), datasourceCaptor.getValue().getName()); + assertEquals(request.getEndpoint(), datasourceCaptor.getValue().getEndpoint()); + assertEquals(request.getUpdateInterval().days(), datasourceCaptor.getValue().getSchedule().getInterval()); + + // Run next listener.onResponse + actionListenerCaptor.getValue().onResponse(null); + // Verify verify(listener).onResponse(new AcknowledgedResponse(true)); } - public void testIndexResponseListenerFailure() { + public void testGetIndexResponseListener_whenVersionConflict_thenFailure() { Datasource datasource = new Datasource(); ActionListener listener = mock(ActionListener.class); action.getIndexResponseListener(datasource, listener) @@ -91,7 +82,8 @@ public void testIndexResponseListenerFailure() { verify(listener).onFailure(any(ResourceAlreadyExistsException.class)); } - public void testCreateDatasourceInvalidState() throws Exception { + @SneakyThrows + public void testCreateDatasource_whenInvalidState_thenUpdateStateAsFailed() { Datasource datasource = new Datasource(); datasource.setState(randomStateExcept(DatasourceState.CREATING)); datasource.getUpdateStats().setLastFailedAt(null); @@ -105,7 +97,8 @@ public void testCreateDatasourceInvalidState() throws Exception { verify(datasourceFacade).updateDatasource(datasource); } - public void testCreateDatasourceWithException() throws Exception { + @SneakyThrows + public void testCreateDatasource_whenExceptionHappens_thenUpdateStateAsFailed() { Datasource datasource = new Datasource(); doThrow(new RuntimeException()).when(datasourceUpdateService).updateOrCreateGeoIpData(datasource); @@ -118,7 +111,8 @@ public void testCreateDatasourceWithException() throws Exception { verify(datasourceFacade).updateDatasource(datasource); } - public void testCreateDatasource() throws Exception { + @SneakyThrows + public void testCreateDatasource_whenValidInput_thenUpdateStateAsCreating() { Datasource datasource = new Datasource(); // Run diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java index bac7f7dd..25f37b74 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java @@ -19,6 +19,8 @@ import java.util.Arrays; import java.util.List; +import lombok.SneakyThrows; + import org.apache.lucene.search.TotalHits; import org.junit.Before; import org.mockito.ArgumentCaptor; @@ -144,6 +146,26 @@ public void testUpdateDatasource_whenValidInput_thenSucceed() throws Exception { assertTrue(previousTime.isBefore(datasource.getLastUpdateTime())); } + @SneakyThrows + public void testPutDatasource_whenValidInpu_thenSucceed() { + Datasource datasource = randomDatasource(); + Instant previousTime = Instant.now().minusMillis(1); + datasource.setLastUpdateTime(previousTime); + + verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { + assertTrue(actionRequest instanceof IndexRequest); + IndexRequest indexRequest = (IndexRequest) actionRequest; + assertEquals(DatasourceExtension.JOB_INDEX_NAME, indexRequest.index()); + assertEquals(datasource.getName(), indexRequest.id()); + assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, indexRequest.getRefreshPolicy()); + assertEquals(DocWriteRequest.OpType.CREATE, indexRequest.opType()); + return null; + }); + + datasourceFacade.putDatasource(datasource, mock(ActionListener.class)); + assertTrue(previousTime.isBefore(datasource.getLastUpdateTime())); + } + public void testGetDatasource_whenException_thenNull() throws Exception { Datasource datasource = setupClientForGetRequest(true, new IndexNotFoundException(DatasourceExtension.JOB_INDEX_NAME)); assertNull(datasourceFacade.getDatasource(datasource.getName())); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java index 34e4ab3b..f32d0ab4 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java @@ -29,8 +29,7 @@ public class DatasourceRunnerTests extends Ip2GeoTestCase { @Before public void init() { - DatasourceRunner.getJobRunnerInstance() - .initialize(clusterService, client, datasourceUpdateService, ip2GeoExecutor, datasourceFacade); + DatasourceRunner.getJobRunnerInstance().initialize(clusterService, datasourceUpdateService, ip2GeoExecutor, datasourceFacade); } public void testRunJobInvalidClass() { diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java index 8d78f44b..91dc6c09 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java @@ -35,7 +35,7 @@ public class DatasourceUpdateServiceTests extends Ip2GeoTestCase { @Before public void init() { - datasourceUpdateService = new DatasourceUpdateService(clusterService, client, datasourceFacade, geoIpDataFacade); + datasourceUpdateService = new DatasourceUpdateService(clusterService, datasourceFacade, geoIpDataFacade); } public void testUpdateDatasourceSkip() throws Exception { diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java index 6028b029..0e1cf1c2 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java @@ -47,7 +47,7 @@ public class Ip2GeoProcessorTests extends Ip2GeoTestCase { @Before public void init() { - factory = new Ip2GeoProcessor.Factory(client, ingestService, datasourceFacade, geoIpDataFacade); + factory = new Ip2GeoProcessor.Factory(ingestService, datasourceFacade, geoIpDataFacade); } public void testCreateWithNoDatasource() {