From 80821a0e7c30bfa5801a0f54b5a9878815e6b7ca Mon Sep 17 00:00:00 2001 From: Heemin Kim Date: Tue, 16 May 2023 15:14:33 -0700 Subject: [PATCH] Bug fix on lock management and few performance improvements * Release lock before response back to caller for update/delete API * Release lock in background task for creation API * Change index settings to improve indexing performance Signed-off-by: Heemin Kim --- .../DeleteDatasourceTransportAction.java | 10 +---- .../action/PutDatasourceTransportAction.java | 36 ++++++++++----- .../UpdateDatasourceTransportAction.java | 19 +++----- .../ip2geo/common/GeoIpDataFacade.java | 44 +++++++++++++------ .../ip2geo/common/Ip2GeoLockService.java | 12 +++-- .../ip2geo/jobscheduler/DatasourceRunner.java | 5 +-- .../DeleteDatasourceTransportActionTests.java | 2 +- .../PutDatasourceTransportActionTests.java | 44 +++++++++++-------- .../UpdateDatasourceTransportActionTests.java | 10 ++--- .../ip2geo/common/GeoIpDataFacadeTests.java | 10 +++-- .../ip2geo/common/Ip2GeoLockServiceTests.java | 2 +- .../jobscheduler/DatasourceRunnerTests.java | 4 +- 12 files changed, 111 insertions(+), 87 deletions(-) diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java index c287e0f1..a4c6e5c4 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java @@ -80,17 +80,11 @@ protected void doExecute(final Task task, final DeleteDatasourceRequest request, } try { deleteDatasource(request.getName()); + lockService.releaseLock(lock); listener.onResponse(new AcknowledgedResponse(true)); } catch (Exception e) { + lockService.releaseLock(lock); listener.onFailure(e); - } finally { - lockService.releaseLock( - lock, - ActionListener.wrap( - released -> { log.info("Released lock for datasource[{}]", request.getName()); }, - exception -> { log.error("Failed to release the lock", exception); } - ) - ); } }, exception -> { listener.onFailure(exception); })); } diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java index e6cf7f24..75b30f29 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java @@ -80,16 +80,16 @@ protected void doExecute(final Task task, final PutDatasourceRequest request, fi try { internalDoExecute(request, lock, listener); } catch (Exception e) { + lockService.releaseLock(lock); listener.onFailure(e); - } finally { - lockService.releaseLock( - lock, - ActionListener.wrap(released -> {}, exception -> log.error("Failed to release the lock", exception)) - ); } }, exception -> { listener.onFailure(exception); })); } + /** + * This method takes lock as a parameter and is responsible for releasing lock + * unless exception is thrown + */ @VisibleForTesting protected void internalDoExecute( final PutDatasourceRequest request, @@ -100,17 +100,21 @@ protected void internalDoExecute( datasourceFacade.createIndexIfNotExists(createIndexStep); createIndexStep.whenComplete(v -> { Datasource datasource = Datasource.Builder.build(request); - datasourceFacade.putDatasource( - datasource, - getIndexResponseListener(datasource, lockService.getRenewLockRunnable(new AtomicReference<>(lock)), listener) - ); - }, exception -> listener.onFailure(exception)); + datasourceFacade.putDatasource(datasource, getIndexResponseListener(datasource, lock, listener)); + }, exception -> { + lockService.releaseLock(lock); + listener.onFailure(exception); + }); } + /** + * This method takes lock as a parameter and is responsible for releasing lock + * unless exception is thrown + */ @VisibleForTesting protected ActionListener getIndexResponseListener( final Datasource datasource, - final Runnable renewLock, + final LockModel lock, final ActionListener listener ) { return new ActionListener<>() { @@ -118,12 +122,20 @@ protected ActionListener getIndexResponseListener( public void onResponse(final IndexResponse indexResponse) { // This is user initiated request. Therefore, we want to handle the first datasource update task in a generic thread // pool. - threadPool.generic().submit(() -> { createDatasource(datasource, renewLock); }); + threadPool.generic().submit(() -> { + AtomicReference lockReference = new AtomicReference<>(lock); + try { + createDatasource(datasource, lockService.getRenewLockRunnable(lockReference)); + } finally { + lockService.releaseLock(lockReference.get()); + } + }); listener.onResponse(new AcknowledgedResponse(true)); } @Override public void onFailure(final Exception e) { + lockService.releaseLock(lock); if (e instanceof VersionConflictEngineException) { listener.onFailure(new ResourceAlreadyExistsException("datasource [{}] already exists", datasource.getName())); } else { diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java index 31c50007..f082c2e4 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java @@ -14,15 +14,14 @@ import lombok.extern.log4j.Log4j2; -import org.opensearch.OpenSearchException; import org.opensearch.ResourceNotFoundException; import org.opensearch.action.ActionListener; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.common.inject.Inject; -import org.opensearch.geospatial.exceptions.IncompatibleDatasourceException; import org.opensearch.geospatial.exceptions.ConcurrentModificationException; +import org.opensearch.geospatial.exceptions.IncompatibleDatasourceException; import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService; @@ -81,27 +80,21 @@ protected void doExecute(final Task task, final UpdateDatasourceRequest request, ); return; } + try { Datasource datasource = datasourceFacade.getDatasource(request.getName()); if (datasource == null) { - listener.onFailure(new ResourceNotFoundException("no such datasource exist")); - return; + throw new ResourceNotFoundException("no such datasource exist"); } validate(request, datasource); updateIfChanged(request, datasource); + lockService.releaseLock(lock); listener.onResponse(new AcknowledgedResponse(true)); } catch (Exception e) { + lockService.releaseLock(lock); listener.onFailure(e); - } finally { - lockService.releaseLock( - lock, - ActionListener.wrap( - released -> { log.info("Released lock for datasource[{}]", request.getName()); }, - exception -> { log.error("Failed to release the lock", exception); } - ) - ); } - }, exception -> { listener.onFailure(exception); })); + }, exception -> listener.onFailure(exception))); } private void updateIfChanged(final UpdateDatasourceRequest request, final Datasource datasource) throws IOException { diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacade.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacade.java index dfd2f099..5584f5eb 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacade.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacade.java @@ -43,7 +43,6 @@ import org.opensearch.action.search.MultiSearchResponse; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.IndicesOptions; -import org.opensearch.action.support.WriteRequest; import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.client.Client; import org.opensearch.client.Requests; @@ -67,6 +66,8 @@ public class GeoIpDataFacade { private static final String IP_RANGE_FIELD_NAME = "_cidr"; private static final String DATA_FIELD_NAME = "_data"; private static final Tuple INDEX_SETTING_NUM_OF_SHARDS = new Tuple<>("index.number_of_shards", 1); + private static final Tuple INDEX_SETTING_NUM_OF_REPLICAS = new Tuple<>("index.number_of_replicas", 0); + private static final Tuple INDEX_SETTING_REFRESH_INTERVAL = new Tuple<>("index.refresh_interval", -1); private static final Tuple INDEX_SETTING_AUTO_EXPAND_REPLICAS = new Tuple<>("index.auto_expand_replicas", "0-all"); private static final Tuple INDEX_SETTING_HIDDEN = new Tuple<>("index.hidden", true); private static final Tuple INDEX_SETTING_READ_ONLY_ALLOW_DELETE = new Tuple<>( @@ -84,7 +85,12 @@ public GeoIpDataFacade(final ClusterService clusterService, final Client client) } /** - * Create an index of single shard with auto expand replicas to all nodes + * Create an index for GeoIP data + * + * Index setting start with single shard, zero replica, no refresh interval, and hidden. + * Once the GeoIP data is indexed, do refresh and force merge. + * Then, change the index setting to expand replica to all nodes, and read only allow delete. + * See {@link #freezeIndex} * * @param indexName index name */ @@ -94,7 +100,8 @@ public void createIndexIfNotExists(final String indexName) { } final Map indexSettings = new HashMap<>(); indexSettings.put(INDEX_SETTING_NUM_OF_SHARDS.v1(), INDEX_SETTING_NUM_OF_SHARDS.v2()); - indexSettings.put(INDEX_SETTING_AUTO_EXPAND_REPLICAS.v1(), INDEX_SETTING_AUTO_EXPAND_REPLICAS.v2()); + indexSettings.put(INDEX_SETTING_REFRESH_INTERVAL.v1(), INDEX_SETTING_REFRESH_INTERVAL.v2()); + indexSettings.put(INDEX_SETTING_NUM_OF_REPLICAS.v1(), INDEX_SETTING_NUM_OF_REPLICAS.v2()); indexSettings.put(INDEX_SETTING_HIDDEN.v1(), INDEX_SETTING_HIDDEN.v2()); final CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(indexSettings).mapping(getIndexMapping()); StashedThreadContext.run( @@ -103,6 +110,24 @@ public void createIndexIfNotExists(final String indexName) { ); } + private void freezeIndex(final String indexName) { + TimeValue timeout = clusterSettings.get(Ip2GeoSettings.TIMEOUT); + StashedThreadContext.run(client, () -> { + client.admin().indices().prepareRefresh(indexName).execute().actionGet(timeout); + client.admin().indices().prepareForceMerge(indexName).setMaxNumSegments(1).execute().actionGet(timeout); + Map settings = new HashMap<>(); + settings.put(INDEX_SETTING_READ_ONLY_ALLOW_DELETE.v1(), INDEX_SETTING_READ_ONLY_ALLOW_DELETE.v2()); + settings.put(INDEX_SETTING_NUM_OF_REPLICAS.v1(), null); + settings.put(INDEX_SETTING_AUTO_EXPAND_REPLICAS.v1(), INDEX_SETTING_AUTO_EXPAND_REPLICAS.v2()); + client.admin() + .indices() + .prepareUpdateSettings(indexName) + .setSettings(settings) + .execute() + .actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT)); + }); + } + /** * Generate XContentBuilder representing datasource database index mapping * @@ -349,7 +374,7 @@ public void putGeoIpData( @NonNull final Runnable renewLock ) { TimeValue timeout = clusterSettings.get(Ip2GeoSettings.TIMEOUT); - final BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); + final BulkRequest bulkRequest = new BulkRequest(); while (iterator.hasNext()) { CSVRecord record = iterator.next(); String document = createDocument(fields, record.values()); @@ -368,16 +393,7 @@ public void putGeoIpData( } renewLock.run(); } - StashedThreadContext.run(client, () -> { - client.admin().indices().prepareRefresh(indexName).execute().actionGet(timeout); - client.admin().indices().prepareForceMerge(indexName).setMaxNumSegments(1).execute().actionGet(timeout); - client.admin() - .indices() - .prepareUpdateSettings(indexName) - .setSettings(Map.of(INDEX_SETTING_READ_ONLY_ALLOW_DELETE.v1(), INDEX_SETTING_READ_ONLY_ALLOW_DELETE.v2())) - .execute() - .actionGet(timeout); - }); + freezeIndex(indexName); } public AcknowledgedResponse deleteIp2GeoDataIndex(final String index) { diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockService.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockService.java index ab646628..6b3c83b7 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockService.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockService.java @@ -12,6 +12,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import lombok.extern.log4j.Log4j2; + import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.client.Client; @@ -22,6 +24,7 @@ /** * A wrapper of job scheduler's lock service for datasource */ +@Log4j2 public class Ip2GeoLockService { public static final long LOCK_DURATION_IN_SECONDS = 300l; public static final long RENEW_AFTER_IN_SECONDS = 120l; @@ -57,10 +60,12 @@ public void acquireLock(final String datasourceName, final Long lockDurationSeco * Wrapper method of LockService#release * * @param lockModel the lock model - * @param listener the listener */ - public void releaseLock(final LockModel lockModel, final ActionListener listener) { - lockService.release(lockModel, listener); + public void releaseLock(final LockModel lockModel) { + lockService.release( + lockModel, + ActionListener.wrap(released -> {}, exception -> log.error("Failed to release the lock", exception)) + ); } /** @@ -110,7 +115,6 @@ public Runnable getRenewLockRunnable(final AtomicReference lockModel) if (Instant.now().isBefore(preLock.getLockTime().plusSeconds(RENEW_AFTER_IN_SECONDS))) { return; } - lockModel.set(renewLock(lockModel.get())); if (lockModel.get() == null) { new OpenSearchException("failed to renew a lock [{}]", preLock); diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java index 4233c8a8..0ae2953a 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java @@ -111,10 +111,7 @@ protected Runnable updateDatasourceRunner(final ScheduledJobParameter jobParamet try { updateDatasource(jobParameter, ip2GeoLockService.getRenewLockRunnable(new AtomicReference<>(lock))); } finally { - ip2GeoLockService.releaseLock( - lock, - ActionListener.wrap(released -> {}, exception -> { log.error("Failed to release lock [{}]", lock, exception); }) - ); + ip2GeoLockService.releaseLock(lock); } }, exception -> { log.error("Failed to acquire lock for job [{}]", jobParameter.getName(), exception); })); }; diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java index 54f9ed61..71447454 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java @@ -90,7 +90,7 @@ private void validateDoExecute(final LockModel lockModel, final Exception except verify(listener).onFailure(any(OpenSearchException.class)); } else { verify(listener).onResponse(new AcknowledgedResponse(true)); - verify(ip2GeoLockService).releaseLock(eq(lockModel), any(ActionListener.class)); + verify(ip2GeoLockService).releaseLock(eq(lockModel)); } } else { // Run diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportActionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportActionTests.java index 339c65cb..156c9d0d 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportActionTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportActionTests.java @@ -12,22 +12,20 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import java.io.IOException; -import java.time.Instant; import lombok.SneakyThrows; import org.junit.Before; import org.mockito.ArgumentCaptor; -import org.opensearch.OpenSearchException; import org.opensearch.ResourceAlreadyExistsException; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.common.unit.TimeValue; import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.exceptions.ConcurrentModificationException; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; import org.opensearch.geospatial.ip2geo.common.DatasourceState; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; @@ -52,28 +50,32 @@ public void init() { @SneakyThrows public void testDoExecute_whenFailedToAcquireLock_thenError() { - validateDoExecute(null, null); + validateDoExecute(null, null, null); } @SneakyThrows - public void testDoExecute_whenValidInput_thenSucceed() { - String jobIndexName = GeospatialTestHelper.randomLowerCaseString(); - String jobId = GeospatialTestHelper.randomLowerCaseString(); - LockModel lockModel = new LockModel(jobIndexName, jobId, Instant.now(), randomPositiveLong(), false); - validateDoExecute(lockModel, null); + public void testDoExecute_whenAcquiredLock_thenSucceed() { + validateDoExecute(randomLockModel(), null, null); } @SneakyThrows - public void testDoExecute_whenException_thenError() { - validateDoExecute(null, new RuntimeException()); + public void testDoExecute_whenExceptionBeforeAcquiringLock_thenError() { + validateDoExecute(randomLockModel(), new RuntimeException(), null); } - private void validateDoExecute(final LockModel lockModel, final Exception exception) throws IOException { + @SneakyThrows + public void testDoExecute_whenExceptionAfterAcquiringLock_thenError() { + validateDoExecute(randomLockModel(), null, new RuntimeException()); + } + + private void validateDoExecute(final LockModel lockModel, final Exception before, final Exception after) throws IOException { Task task = mock(Task.class); Datasource datasource = randomDatasource(); - when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource); PutDatasourceRequest request = new PutDatasourceRequest(datasource.getName()); ActionListener listener = mock(ActionListener.class); + if (after != null) { + doThrow(after).when(datasourceFacade).createIndexIfNotExists(any(StepListener.class)); + } // Run action.doExecute(task, request, listener); @@ -82,21 +84,25 @@ private void validateDoExecute(final LockModel lockModel, final Exception except ArgumentCaptor> captor = ArgumentCaptor.forClass(ActionListener.class); verify(ip2GeoLockService).acquireLock(eq(datasource.getName()), anyLong(), captor.capture()); - if (exception == null) { + if (before == null) { // Run captor.getValue().onResponse(lockModel); // Verify if (lockModel == null) { - verify(listener).onFailure(any(OpenSearchException.class)); + verify(listener).onFailure(any(ConcurrentModificationException.class)); + } + if (after != null) { + verify(ip2GeoLockService).releaseLock(eq(lockModel)); + verify(listener).onFailure(after); } else { - verify(ip2GeoLockService).releaseLock(eq(lockModel), any(ActionListener.class)); + verify(ip2GeoLockService, never()).releaseLock(eq(lockModel)); } } else { // Run - captor.getValue().onFailure(exception); + captor.getValue().onFailure(before); // Verify - verify(listener).onFailure(exception); + verify(listener).onFailure(before); } } @@ -133,7 +139,7 @@ public void testInternalDoExecute_whenValidInput_thenSucceed() { public void testGetIndexResponseListener_whenVersionConflict_thenFailure() { Datasource datasource = new Datasource(); ActionListener listener = mock(ActionListener.class); - action.getIndexResponseListener(datasource, mock(Runnable.class), listener) + action.getIndexResponseListener(datasource, randomLockModel(), listener) .onFailure( new VersionConflictEngineException( null, diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java index 76763fb1..9364115f 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java @@ -112,7 +112,7 @@ public void testDoExecute_whenValidInput_thenUpdate() { assertEquals(request.getEndpoint(), datasource.getEndpoint()); assertEquals(request.getUpdateInterval().days(), datasource.getSchedule().getInterval()); verify(listener).onResponse(new AcknowledgedResponse(true)); - verify(ip2GeoLockService).releaseLock(eq(lockModel), any(ActionListener.class)); + verify(ip2GeoLockService).releaseLock(eq(lockModel)); } @SneakyThrows @@ -142,7 +142,7 @@ public void testDoExecute_whenNoChangesInValues_thenNoUpdate() { verify(datasourceUpdateService, never()).getHeaderFields(anyString()); verify(datasourceFacade, never()).updateDatasource(datasource); verify(listener).onResponse(new AcknowledgedResponse(true)); - verify(ip2GeoLockService).releaseLock(eq(lockModel), any(ActionListener.class)); + verify(ip2GeoLockService).releaseLock(eq(lockModel)); } @SneakyThrows @@ -169,7 +169,7 @@ public void testDoExecute_whenNoDatasource_thenError() { verify(listener).onFailure(exceptionCaptor.capture()); assertEquals(ResourceNotFoundException.class, exceptionCaptor.getValue().getClass()); exceptionCaptor.getValue().getMessage().contains("no such datasource exist"); - verify(ip2GeoLockService).releaseLock(eq(lockModel), any(ActionListener.class)); + verify(ip2GeoLockService).releaseLock(eq(lockModel)); } @SneakyThrows @@ -200,7 +200,7 @@ public void testDoExecute_whenIncompatibleFields_thenError() { verify(listener).onFailure(exceptionCaptor.capture()); assertEquals(IncompatibleDatasourceException.class, exceptionCaptor.getValue().getClass()); exceptionCaptor.getValue().getMessage().contains("does not contain"); - verify(ip2GeoLockService).releaseLock(eq(lockModel), any(ActionListener.class)); + verify(ip2GeoLockService).releaseLock(eq(lockModel)); } @SneakyThrows @@ -229,6 +229,6 @@ public void testDoExecute_whenInvalidUpdateInterval_thenError() { verify(listener).onFailure(exceptionCaptor.capture()); assertEquals(InvalidParameterException.class, exceptionCaptor.getValue().getClass()); exceptionCaptor.getValue().getMessage().contains("should be smaller"); - verify(ip2GeoLockService).releaseLock(eq(lockModel), any(ActionListener.class)); + verify(ip2GeoLockService).releaseLock(eq(lockModel)); } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacadeTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacadeTests.java index 5a269539..19ee2bc7 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacadeTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacadeTests.java @@ -51,7 +51,6 @@ import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.IndicesOptions; -import org.opensearch.action.support.WriteRequest; import org.opensearch.common.Randomness; import org.opensearch.common.SuppressForbidden; import org.opensearch.common.bytes.BytesReference; @@ -89,8 +88,10 @@ public void testCreateIndexIfNotExistsWithoutExistingIndex() { assertTrue(actionRequest instanceof CreateIndexRequest); CreateIndexRequest request = (CreateIndexRequest) actionRequest; assertEquals(index, request.index()); - assertEquals(1, (int) request.settings().getAsInt("index.number_of_shards", 2)); - assertEquals("0-all", request.settings().get("index.auto_expand_replicas")); + assertEquals(1, (int) request.settings().getAsInt("index.number_of_shards", 0)); + assertNull(request.settings().get("index.auto_expand_replicas")); + assertEquals(0, (int) request.settings().getAsInt("index.number_of_replicas", 1)); + assertEquals(-1, (int) request.settings().getAsInt("index.refresh_interval", 0)); assertEquals(true, request.settings().getAsBoolean("index.hidden", false)); assertEquals( @@ -191,7 +192,6 @@ public void testPutGeoIpData_whenValidInput_thenSucceed() { if (actionRequest instanceof BulkRequest) { BulkRequest request = (BulkRequest) actionRequest; assertEquals(1, request.numberOfActions()); - assertEquals(WriteRequest.RefreshPolicy.WAIT_UNTIL, request.getRefreshPolicy()); BulkResponse response = mock(BulkResponse.class); when(response.hasFailures()).thenReturn(false); return response; @@ -211,6 +211,8 @@ public void testPutGeoIpData_whenValidInput_thenSucceed() { assertEquals(1, request.indices().length); assertEquals(index, request.indices()[0]); assertEquals(true, request.settings().getAsBoolean("index.blocks.read_only_allow_delete", false)); + assertNull(request.settings().get("index.num_of_replica")); + assertEquals("0-all", request.settings().get("index.auto_expand_replicas")); return null; } else { throw new RuntimeException("invalid request is called"); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockServiceTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockServiceTests.java index f0bc8ce7..d6bfcd13 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockServiceTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockServiceTests.java @@ -41,7 +41,7 @@ public void testAcquireLock_whenValidInput_thenSucceed() { public void testReleaseLock_whenValidInput_thenSucceed() { // Cannot test because LockService is final class // Simply calling method to increase coverage - noOpsLockService.releaseLock(null, mock(ActionListener.class)); + noOpsLockService.releaseLock(null); } public void testRenewLock_whenCalled_thenNotBlocked() { diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java index 1ccd88a5..004e1dec 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java @@ -98,13 +98,13 @@ private void validateDoExecute(final LockModel lockModel, final Exception except captor.getValue().onResponse(lockModel); // Verify - verify(ip2GeoLockService, lockModel == null ? never() : times(1)).releaseLock(eq(lockModel), any(ActionListener.class)); + verify(ip2GeoLockService, lockModel == null ? never() : times(1)).releaseLock(eq(lockModel)); } else { // Run captor.getValue().onFailure(exception); // Verify - verify(ip2GeoLockService, never()).releaseLock(eq(lockModel), any(ActionListener.class)); + verify(ip2GeoLockService, never()).releaseLock(eq(lockModel)); } }