diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java index 2acdfd52..27055d41 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java @@ -20,9 +20,8 @@ import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; import org.opensearch.geospatial.ip2geo.common.DatasourceState; import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService; +import org.opensearch.geospatial.ip2geo.common.Ip2GeoProcessorFacade; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; -import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor; -import org.opensearch.ingest.IngestMetadata; import org.opensearch.ingest.IngestService; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -36,6 +35,7 @@ public class DeleteDatasourceTransportAction extends HandledTransportAction ingestService.getProcessorsInPipeline(pipelineId, Ip2GeoProcessor.class).stream()) - .filter(ip2GeoProcessor -> ip2GeoProcessor.getDatasourceName().equals(datasource.getName())) - .findAny() - .isEmpty(); - } } diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoProcessorFacade.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoProcessorFacade.java new file mode 100644 index 00000000..0f1b764f --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoProcessorFacade.java @@ -0,0 +1,37 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.common; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import org.opensearch.common.inject.Inject; +import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor; +import org.opensearch.ingest.IngestMetadata; +import org.opensearch.ingest.IngestService; + +public class Ip2GeoProcessorFacade { + private final IngestService ingestService; + + @Inject + public Ip2GeoProcessorFacade(final IngestService ingestService) { + this.ingestService = ingestService; + } + + public List getProcessors(final String datasourceName) { + IngestMetadata ingestMetadata = ingestService.getClusterService().state().getMetadata().custom(IngestMetadata.TYPE); + if (ingestMetadata == null) { + return Collections.emptyList(); + } + return ingestMetadata.getPipelines() + .keySet() + .stream() + .flatMap(pipelineId -> ingestService.getProcessorsInPipeline(pipelineId, Ip2GeoProcessor.class).stream()) + .filter(ip2GeoProcessor -> ip2GeoProcessor.getDatasourceName().equals(datasourceName)) + .collect(Collectors.toList()); + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java index 83383158..b2c7e678 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java @@ -47,7 +47,7 @@ public final class Ip2GeoProcessor extends AbstractProcessor { public static final String CONFIG_FIELD = "field"; public static final String CONFIG_TARGET_FIELD = "target_field"; public static final String CONFIG_DATASOURCE = "datasource"; - public static final String CONFIG_PROPERTIES = "target_field"; + public static final String CONFIG_PROPERTIES = "properties"; public static final String CONFIG_IGNORE_MISSING = "ignore_missing"; public static final String CONFIG_FIRST_ONLY = "first_only"; diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java index 25775a39..2484b335 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java @@ -16,6 +16,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.Locale; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -46,9 +47,11 @@ import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade; import org.opensearch.geospatial.ip2geo.common.Ip2GeoExecutor; import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService; +import org.opensearch.geospatial.ip2geo.common.Ip2GeoProcessorFacade; import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService; +import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor; import org.opensearch.ingest.IngestMetadata; import org.opensearch.ingest.IngestService; import org.opensearch.jobscheduler.spi.LockModel; @@ -86,6 +89,8 @@ public abstract class Ip2GeoTestCase extends RestActionTestCase { protected TransportService transportService; @Mock protected Ip2GeoLockService ip2GeoLockService; + @Mock + protected Ip2GeoProcessorFacade ip2GeoProcessorFacade; protected IngestMetadata ingestMetadata; protected NoOpNodeClient client; protected VerifyingClient verifyingClient; @@ -211,6 +216,28 @@ protected LockModel randomLockModel() { return lockModel; } + protected Ip2GeoProcessor randomIp2GeoProcessor(String datasourceName) { + String tag = GeospatialTestHelper.randomLowerCaseString(); + String description = GeospatialTestHelper.randomLowerCaseString(); + String field = GeospatialTestHelper.randomLowerCaseString(); + String targetField = GeospatialTestHelper.randomLowerCaseString(); + Set properties = Set.of(GeospatialTestHelper.randomLowerCaseString()); + Ip2GeoProcessor ip2GeoProcessor = new Ip2GeoProcessor( + tag, + description, + field, + targetField, + datasourceName, + properties, + true, + true, + clusterSettings, + datasourceFacade, + geoIpDataFacade + ); + return ip2GeoProcessor; + } + /** * Temporary class of VerifyingClient until this PR(https://github.com/opensearch-project/OpenSearch/pull/7167) * is merged in OpenSearch core diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java index 535e5422..54f9ed61 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java @@ -15,14 +15,9 @@ import static org.mockito.Mockito.when; import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; import lombok.SneakyThrows; @@ -32,15 +27,10 @@ import org.opensearch.ResourceNotFoundException; import org.opensearch.action.ActionListener; import org.opensearch.action.support.master.AcknowledgedResponse; -import org.opensearch.common.bytes.BytesReference; -import org.opensearch.common.xcontent.XContentType; import org.opensearch.geospatial.GeospatialTestHelper; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; import org.opensearch.geospatial.ip2geo.common.DatasourceState; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; -import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor; -import org.opensearch.ingest.IngestMetadata; -import org.opensearch.ingest.PipelineConfiguration; import org.opensearch.jobscheduler.spi.LockModel; import org.opensearch.tasks.Task; @@ -49,7 +39,14 @@ public class DeleteDatasourceTransportActionTests extends Ip2GeoTestCase { @Before public void init() { - action = new DeleteDatasourceTransportAction(transportService, actionFilters, ip2GeoLockService, ingestService, datasourceFacade); + action = new DeleteDatasourceTransportAction( + transportService, + actionFilters, + ip2GeoLockService, + ingestService, + datasourceFacade, + ip2GeoProcessorFacade + ); } @SneakyThrows @@ -113,6 +110,7 @@ public void testDeleteDatasource_whenNull_thenThrowException() { public void testDeleteDatasource_whenSafeToDelete_thenDelete() { Datasource datasource = randomDatasource(); when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource); + when(ip2GeoProcessorFacade.getProcessors(datasource.getName())).thenReturn(Collections.emptyList()); // Run action.deleteDatasource(datasource.getName()); @@ -128,14 +126,8 @@ public void testDeleteDatasource_whenProcessorIsUsingDatasource_thenThrowExcepti Datasource datasource = randomDatasource(); datasource.setState(DatasourceState.AVAILABLE); when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource); - - String pipelineId = GeospatialTestHelper.randomLowerCaseString(); - Map pipelines = new HashMap<>(); - pipelines.put(pipelineId, createPipelineConfiguration()); - IngestMetadata ingestMetadata = new IngestMetadata(pipelines); - when(metadata.custom(IngestMetadata.TYPE)).thenReturn(ingestMetadata); - when(ingestService.getProcessorsInPipeline(pipelineId, Ip2GeoProcessor.class)).thenReturn( - Arrays.asList(createIp2GeoProcessor(datasource.getName())) + when(ip2GeoProcessorFacade.getProcessors(datasource.getName())).thenReturn( + Arrays.asList(randomIp2GeoProcessor(datasource.getName())) ); // Run @@ -152,15 +144,9 @@ public void testDeleteDatasource_whenProcessorIsCreatedDuringDeletion_thenThrowE Datasource datasource = randomDatasource(); datasource.setState(DatasourceState.AVAILABLE); when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource); - - String pipelineId = GeospatialTestHelper.randomLowerCaseString(); - Map pipelines = new HashMap<>(); - pipelines.put(pipelineId, createPipelineConfiguration()); - IngestMetadata ingestMetadata = new IngestMetadata(pipelines); - when(metadata.custom(IngestMetadata.TYPE)).thenReturn(ingestMetadata); - when(ingestService.getProcessorsInPipeline(pipelineId, Ip2GeoProcessor.class)).thenReturn( + when(ip2GeoProcessorFacade.getProcessors(datasource.getName())).thenReturn( Collections.emptyList(), - Arrays.asList(createIp2GeoProcessor(datasource.getName())) + Arrays.asList(randomIp2GeoProcessor(datasource.getName())) ); // Run @@ -170,33 +156,4 @@ public void testDeleteDatasource_whenProcessorIsCreatedDuringDeletion_thenThrowE verify(datasourceFacade, times(2)).updateDatasource(datasource); verify(datasourceFacade, never()).deleteDatasource(datasource); } - - private PipelineConfiguration createPipelineConfiguration() { - String id = GeospatialTestHelper.randomLowerCaseString(); - ByteBuffer byteBuffer = ByteBuffer.wrap(GeospatialTestHelper.randomLowerCaseString().getBytes(StandardCharsets.US_ASCII)); - BytesReference config = BytesReference.fromByteBuffer(byteBuffer); - return new PipelineConfiguration(id, config, XContentType.JSON); - } - - private Ip2GeoProcessor createIp2GeoProcessor(String datasourceName) { - String tag = GeospatialTestHelper.randomLowerCaseString(); - String description = GeospatialTestHelper.randomLowerCaseString(); - String field = GeospatialTestHelper.randomLowerCaseString(); - String targetField = GeospatialTestHelper.randomLowerCaseString(); - Set properties = Set.of(GeospatialTestHelper.randomLowerCaseString()); - Ip2GeoProcessor ip2GeoProcessor = new Ip2GeoProcessor( - tag, - description, - field, - targetField, - datasourceName, - properties, - true, - true, - clusterSettings, - datasourceFacade, - geoIpDataFacade - ); - return ip2GeoProcessor; - } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoProcessorFacadeTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoProcessorFacadeTests.java new file mode 100644 index 00000000..05fed2ca --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoProcessorFacadeTests.java @@ -0,0 +1,78 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.common; + +import static org.mockito.Mockito.when; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.junit.Before; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; +import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor; +import org.opensearch.ingest.IngestMetadata; +import org.opensearch.ingest.PipelineConfiguration; + +public class Ip2GeoProcessorFacadeTests extends Ip2GeoTestCase { + private Ip2GeoProcessorFacade ip2GeoProcessorFacade; + + @Before + public void init() { + ip2GeoProcessorFacade = new Ip2GeoProcessorFacade(ingestService); + } + + public void testGetProcessors_whenNullMetadata_thenReturnEmpty() { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + when(metadata.custom(IngestMetadata.TYPE)).thenReturn(null); + + List ip2GeoProcessorList = ip2GeoProcessorFacade.getProcessors(datasourceName); + assertTrue(ip2GeoProcessorList.isEmpty()); + } + + public void testGetProcessors_whenNoProcessorForGivenDatasource_thenReturnEmpty() { + String datasourceBeingUsed = GeospatialTestHelper.randomLowerCaseString(); + String datasourceNotBeingUsed = GeospatialTestHelper.randomLowerCaseString(); + String pipelineId = GeospatialTestHelper.randomLowerCaseString(); + Map pipelines = new HashMap<>(); + pipelines.put(pipelineId, createPipelineConfiguration()); + IngestMetadata ingestMetadata = new IngestMetadata(pipelines); + when(metadata.custom(IngestMetadata.TYPE)).thenReturn(ingestMetadata); + Ip2GeoProcessor ip2GeoProcessor = randomIp2GeoProcessor(datasourceBeingUsed); + when(ingestService.getProcessorsInPipeline(pipelineId, Ip2GeoProcessor.class)).thenReturn(Arrays.asList(ip2GeoProcessor)); + + List ip2GeoProcessorList = ip2GeoProcessorFacade.getProcessors(datasourceNotBeingUsed); + assertTrue(ip2GeoProcessorList.isEmpty()); + } + + public void testGetProcessors_whenProcessorsForGivenDatasource_thenReturnProcessors() { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + String pipelineId = GeospatialTestHelper.randomLowerCaseString(); + Map pipelines = new HashMap<>(); + pipelines.put(pipelineId, createPipelineConfiguration()); + IngestMetadata ingestMetadata = new IngestMetadata(pipelines); + when(metadata.custom(IngestMetadata.TYPE)).thenReturn(ingestMetadata); + Ip2GeoProcessor ip2GeoProcessor = randomIp2GeoProcessor(datasourceName); + when(ingestService.getProcessorsInPipeline(pipelineId, Ip2GeoProcessor.class)).thenReturn(Arrays.asList(ip2GeoProcessor)); + + List ip2GeoProcessorList = ip2GeoProcessorFacade.getProcessors(datasourceName); + assertEquals(1, ip2GeoProcessorList.size()); + assertEquals(ip2GeoProcessor.getDatasourceName(), ip2GeoProcessorList.get(0).getDatasourceName()); + } + + private PipelineConfiguration createPipelineConfiguration() { + String id = GeospatialTestHelper.randomLowerCaseString(); + ByteBuffer byteBuffer = ByteBuffer.wrap(GeospatialTestHelper.randomLowerCaseString().getBytes(StandardCharsets.US_ASCII)); + BytesReference config = BytesReference.fromByteBuffer(byteBuffer); + return new PipelineConfiguration(id, config, XContentType.JSON); + } +}