diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java index a4c6e5c4..a4706834 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java @@ -20,6 +20,7 @@ import org.opensearch.geospatial.exceptions.ResourceInUseException; import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; import org.opensearch.geospatial.ip2geo.common.DatasourceState; +import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade; import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService; import org.opensearch.geospatial.ip2geo.common.Ip2GeoProcessorFacade; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; @@ -36,6 +37,7 @@ public class DeleteDatasourceTransportAction extends HandledTransportAction indices) { + if (indices == null || indices.isEmpty()) { + return; + } + + Optional invalidIndex = indices.stream() + .filter(index -> index.startsWith(IP2GEO_DATA_INDEX_NAME_PREFIX) == false) + .findAny(); + if (invalidIndex.isPresent()) { throw new OpenSearchException( "the index[{}] is not ip2geo data index which should start with {}", - index, + invalidIndex.get(), IP2GEO_DATA_INDEX_NAME_PREFIX ); } - return StashedThreadContext.run( + + AcknowledgedResponse response = StashedThreadContext.run( client, () -> client.admin() .indices() - .prepareDelete(index) - .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) + .prepareDelete(indices.toArray(new String[0])) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN) .execute() .actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT)) ); + + if (response.isAcknowledged() == false) { + throw new OpenSearchException("failed to delete data[{}] in datasource", String.join(",", indices)); + } } } diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java index dd63d746..b3d6b328 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java @@ -50,7 +50,7 @@ public class Datasource implements Writeable, ScheduledJobParameter { /** * Prefix of indices having Ip2Geo data */ - public static final String IP2GEO_DATA_INDEX_NAME_PREFIX = ".ip2geo-data"; + public static final String IP2GEO_DATA_INDEX_NAME_PREFIX = ".geospatial.ip2geo.data"; /** * Default fields for job scheduling diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java index 579f5bbe..d4e55f79 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java @@ -164,11 +164,8 @@ private List deleteIndices(final List indicesToDelete) { } try { - if (geoIpDataFacade.deleteIp2GeoDataIndex(index).isAcknowledged()) { - deletedIndices.add(index); - } else { - log.error("Failed to delete an index [{}]", index); - } + geoIpDataFacade.deleteIp2GeoDataIndex(index); + deletedIndices.add(index); } catch (Exception e) { log.error("Failed to delete an index [{}]", index, e); } diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/listener/Ip2GeoListener.java b/src/main/java/org/opensearch/geospatial/ip2geo/listener/Ip2GeoListener.java index e58ab4b9..a06bb30c 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/listener/Ip2GeoListener.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/listener/Ip2GeoListener.java @@ -5,10 +5,13 @@ package org.opensearch.geospatial.ip2geo.listener; +import static org.opensearch.geospatial.ip2geo.jobscheduler.Datasource.IP2GEO_DATA_INDEX_NAME_PREFIX; + import java.io.IOException; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.List; +import java.util.stream.Collectors; import lombok.AllArgsConstructor; import lombok.extern.log4j.Log4j2; @@ -23,6 +26,7 @@ import org.opensearch.common.component.AbstractLifecycleComponent; import org.opensearch.common.inject.Inject; import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; +import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceTask; @@ -37,6 +41,7 @@ public class Ip2GeoListener extends AbstractLifecycleComponent implements Cluste private final ClusterService clusterService; private final ThreadPool threadPool; private final DatasourceFacade datasourceFacade; + private final GeoIpDataFacade geoIpDataFacade; @Override public void clusterChanged(final ClusterChangedEvent event) { @@ -49,11 +54,17 @@ public void clusterChanged(final ClusterChangedEvent event) { continue; } - if (entry.indices().stream().anyMatch(index -> DatasourceExtension.JOB_INDEX_NAME.equals(index)) == false) { - continue; + if (entry.indices().stream().anyMatch(index -> DatasourceExtension.JOB_INDEX_NAME.equals(index))) { + threadPool.generic().submit(() -> forceUpdateGeoIpData()); } - threadPool.generic().submit(() -> forceUpdateGeoIpData()); + List ip2GeoDataIndices = entry.indices() + .stream() + .filter(index -> index.startsWith(IP2GEO_DATA_INDEX_NAME_PREFIX)) + .collect(Collectors.toList()); + if (ip2GeoDataIndices.isEmpty() == false) { + threadPool.generic().submit(() -> geoIpDataFacade.deleteIp2GeoDataIndex(ip2GeoDataIndices)); + } } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java index f8b40232..01f23e63 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java @@ -17,6 +17,7 @@ import java.util.HashSet; import java.util.Locale; import java.util.Set; +import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -208,7 +209,7 @@ protected Datasource randomDatasource(final Instant updateStartTime) { datasource.setSystemSchedule(datasource.getUserSchedule()); datasource.setTask(randomTask()); datasource.setState(randomState()); - datasource.setCurrentIndex(GeospatialTestHelper.randomLowerCaseString()); + datasource.setCurrentIndex(datasource.newIndexName(UUID.randomUUID().toString())); datasource.setIndices(Arrays.asList(GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString())); datasource.setEndpoint(String.format(Locale.ROOT, "https://%s.com/manifest.json", GeospatialTestHelper.randomLowerCaseString())); datasource.getDatabase() diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java index 71447454..fa2afef8 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java @@ -23,6 +23,8 @@ import org.junit.Before; import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; +import org.mockito.Mockito; import org.opensearch.OpenSearchException; import org.opensearch.ResourceNotFoundException; import org.opensearch.action.ActionListener; @@ -45,6 +47,7 @@ public void init() { ip2GeoLockService, ingestService, datasourceFacade, + geoIpDataFacade, ip2GeoProcessorFacade ); } @@ -118,7 +121,9 @@ public void testDeleteDatasource_whenSafeToDelete_thenDelete() { // Verify assertEquals(DatasourceState.DELETING, datasource.getState()); verify(datasourceFacade).updateDatasource(datasource); - verify(datasourceFacade).deleteDatasource(datasource); + InOrder inOrder = Mockito.inOrder(geoIpDataFacade, datasourceFacade); + inOrder.verify(geoIpDataFacade).deleteIp2GeoDataIndex(datasource.getIndices()); + inOrder.verify(datasourceFacade).deleteDatasource(datasource); } @SneakyThrows @@ -136,6 +141,7 @@ public void testDeleteDatasource_whenProcessorIsUsingDatasource_thenThrowExcepti // Verify assertEquals(DatasourceState.AVAILABLE, datasource.getState()); verify(datasourceFacade, never()).updateDatasource(datasource); + verify(geoIpDataFacade, never()).deleteIp2GeoDataIndex(datasource.getIndices()); verify(datasourceFacade, never()).deleteDatasource(datasource); } @@ -154,6 +160,7 @@ public void testDeleteDatasource_whenProcessorIsCreatedDuringDeletion_thenThrowE // Verify verify(datasourceFacade, times(2)).updateDatasource(datasource); + verify(geoIpDataFacade, never()).deleteIp2GeoDataIndex(datasource.getIndices()); verify(datasourceFacade, never()).deleteDatasource(datasource); } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java index 2a29afb1..43d8727e 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java @@ -22,11 +22,11 @@ import org.junit.Before; import org.mockito.ArgumentCaptor; import org.opensearch.ResourceAlreadyExistsException; +import org.opensearch.ResourceNotFoundException; import org.opensearch.action.ActionListener; import org.opensearch.action.DocWriteRequest; import org.opensearch.action.StepListener; import org.opensearch.action.admin.indices.create.CreateIndexRequest; -import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.delete.DeleteResponse; @@ -38,9 +38,7 @@ import org.opensearch.action.index.IndexRequest; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; -import org.opensearch.action.support.IndicesOptions; import org.opensearch.action.support.WriteRequest; -import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.common.Randomness; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.xcontent.json.JsonXContent; @@ -218,36 +216,36 @@ private Datasource setupClientForGetRequest(final boolean isExist, final Runtime public void testDeleteDatasource_whenValidInput_thenSucceed() { Datasource datasource = randomDatasource(); - verifyingClient.setExecuteVerifier( - (actionResponse, actionRequest) -> { - // Verify - if (actionRequest instanceof DeleteIndexRequest) { - DeleteIndexRequest request = (DeleteIndexRequest) actionRequest; - assertEquals(datasource.getIndices().size(), request.indices().length); - assertEquals(IndicesOptions.LENIENT_EXPAND_OPEN, request.indicesOptions()); - - AcknowledgedResponse response = new AcknowledgedResponse(true); - return response; - } else if (actionRequest instanceof DeleteRequest) { - DeleteRequest request = (DeleteRequest) actionRequest; - assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.index()); - assertEquals(DocWriteRequest.OpType.DELETE, request.opType()); - assertEquals(datasource.getName(), request.id()); - assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, request.getRefreshPolicy()); - - DeleteResponse response = mock(DeleteResponse.class); - when(response.status()).thenReturn(RestStatus.OK); - return response; - } else { - throw new RuntimeException("Not expected request type is passed" + actionRequest.getClass()); - } - } - ); + verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { + // Verify + assertTrue(actionRequest instanceof DeleteRequest); + DeleteRequest request = (DeleteRequest) actionRequest; + assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.index()); + assertEquals(DocWriteRequest.OpType.DELETE, request.opType()); + assertEquals(datasource.getName(), request.id()); + assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, request.getRefreshPolicy()); + + DeleteResponse response = mock(DeleteResponse.class); + when(response.status()).thenReturn(RestStatus.OK); + return response; + }); // Run datasourceFacade.deleteDatasource(datasource); } + public void testDeleteDatasource_whenIndexNotFound_thenThrowException() { + Datasource datasource = randomDatasource(); + verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { + DeleteResponse response = mock(DeleteResponse.class); + when(response.status()).thenReturn(RestStatus.NOT_FOUND); + return response; + }); + + // Run + expectThrows(ResourceNotFoundException.class, () -> datasourceFacade.deleteDatasource(datasource)); + } + public void testGetDatasources_whenValidInput_thenSucceed() { List datasources = Arrays.asList(randomDatasource(), randomDatasource()); String[] names = datasources.stream().map(Datasource::getName).toArray(String[]::new); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacadeTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacadeTests.java index 74d5f189..6ddac0a5 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacadeTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacadeTests.java @@ -51,7 +51,7 @@ import org.opensearch.action.search.MultiSearchResponse; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; -import org.opensearch.action.support.IndicesOptions; +import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.common.Randomness; import org.opensearch.common.Strings; import org.opensearch.common.SuppressForbidden; @@ -168,15 +168,14 @@ public void testInternalGetDatabaseReader_whenCalled_thenSetUserAgent() { verify(connection).addRequestProperty(Constants.USER_AGENT_KEY, Constants.USER_AGENT_VALUE); } - public void testDeleteIp2GeoDataIndex() { + public void testDeleteIp2GeoDataIndex_whenCalled_thenDeleteIndex() { String index = String.format(Locale.ROOT, "%s.%s", IP2GEO_DATA_INDEX_NAME_PREFIX, GeospatialTestHelper.randomLowerCaseString()); verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { assertTrue(actionRequest instanceof DeleteIndexRequest); DeleteIndexRequest request = (DeleteIndexRequest) actionRequest; assertEquals(1, request.indices().length); assertEquals(index, request.indices()[0]); - assertEquals(IndicesOptions.LENIENT_EXPAND_OPEN, request.indicesOptions()); - return null; + return new AcknowledgedResponse(true); }); verifyingGeoIpDataFacade.deleteIp2GeoDataIndex(index); } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java index f650fc98..b110ddea 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java @@ -28,7 +28,6 @@ import org.apache.commons.csv.CSVParser; import org.junit.Before; import org.opensearch.OpenSearchException; -import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.common.SuppressForbidden; import org.opensearch.geospatial.GeospatialTestHelper; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; @@ -199,13 +198,13 @@ public void testDeleteUnusedIndices_whenValidInput_thenSucceed() { when(metadata.hasIndex(currentIndex)).thenReturn(true); when(metadata.hasIndex(oldIndex)).thenReturn(true); when(metadata.hasIndex(lingeringIndex)).thenReturn(false); - when(geoIpDataFacade.deleteIp2GeoDataIndex(any())).thenReturn(new AcknowledgedResponse(true)); datasourceUpdateService.deleteUnusedIndices(datasource); assertEquals(1, datasource.getIndices().size()); assertEquals(currentIndex, datasource.getIndices().get(0)); verify(datasourceFacade).updateDatasource(datasource); + verify(geoIpDataFacade).deleteIp2GeoDataIndex(oldIndex); } public void testUpdateDatasource_whenNoChange_thenNoUpdate() { diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/listener/Ip2GeoListenerTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/listener/Ip2GeoListenerTests.java index ff2cd3e3..a72a5a1b 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/listener/Ip2GeoListenerTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/listener/Ip2GeoListenerTests.java @@ -36,7 +36,7 @@ public class Ip2GeoListenerTests extends Ip2GeoTestCase { @Before public void init() { - ip2GeoListener = new Ip2GeoListener(clusterService, threadPool, datasourceFacade); + ip2GeoListener = new Ip2GeoListener(clusterService, threadPool, datasourceFacade, geoIpDataFacade); } public void testDoStart_whenClusterManagerNode_thenAddListener() { @@ -170,4 +170,30 @@ public void testClusterChanged_whenDatasourceIndexIsRestored_thenUpdate() { verify(datasourceFacade).updateDatasource(eq(datasources), any()); } + public void testClusterChanged_whenGeoIpDataIsRestored_thenDelete() { + Datasource datasource = randomDatasource(); + SnapshotId snapshotId = new SnapshotId(GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString()); + Snapshot snapshot = new Snapshot(GeospatialTestHelper.randomLowerCaseString(), snapshotId); + RestoreInProgress.Entry entry = new RestoreInProgress.Entry( + GeospatialTestHelper.randomLowerCaseString(), + snapshot, + RestoreInProgress.State.SUCCESS, + Arrays.asList(datasource.currentIndexName()), + null + ); + RestoreInProgress restoreInProgress = new RestoreInProgress.Builder().add(entry).build(); + ClusterState clusterState = mock(ClusterState.class); + when(clusterState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)).thenReturn(restoreInProgress); + ClusterChangedEvent event = mock(ClusterChangedEvent.class); + when(event.localNodeClusterManager()).thenReturn(true); + when(event.state()).thenReturn(clusterState); + + // Run + ip2GeoListener.clusterChanged(event); + + // Verify + verify(threadPool).generic(); + verify(geoIpDataFacade).deleteIp2GeoDataIndex(Arrays.asList(datasource.currentIndexName())); + } + }