diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java index 7ae05cdd..fa3fcf48 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java @@ -5,11 +5,14 @@ package org.opensearch.geospatial.ip2geo.action; -import java.io.IOException; +import static org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService.LOCK_DURATION_IN_SECONDS; + import java.time.Instant; +import java.util.concurrent.atomic.AtomicReference; import lombok.extern.log4j.Log4j2; +import org.opensearch.OpenSearchException; import org.opensearch.ResourceAlreadyExistsException; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; @@ -21,9 +24,11 @@ import org.opensearch.geospatial.annotation.VisibleForTesting; import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; import org.opensearch.geospatial.ip2geo.common.DatasourceState; +import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService; import org.opensearch.index.engine.VersionConflictEngineException; +import org.opensearch.jobscheduler.spi.LockModel; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -36,6 +41,7 @@ public class PutDatasourceTransportAction extends HandledTransportAction listener) { - try { - StepListener createIndexStep = new StepListener<>(); - datasourceFacade.createIndexIfNotExists(createIndexStep); - createIndexStep.whenComplete(v -> putDatasource(request, listener), exception -> listener.onFailure(exception)); - } catch (Exception e) { - listener.onFailure(e); - } + lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> { + if (lock == null) { + listener.onFailure(new OpenSearchException("another processor is holding a lock on the resource. Try again later")); + return; + } + try { + internalDoExecute(request, lock, listener); + } catch (Exception e) { + listener.onFailure(e); + } finally { + lockService.releaseLock( + lock, + ActionListener.wrap(released -> {}, exception -> log.error("Failed to release the lock", exception)) + ); + } + }, exception -> { listener.onFailure(exception); })); } @VisibleForTesting - protected void putDatasource(final PutDatasourceRequest request, final ActionListener listener) - throws IOException { - Datasource datasource = Datasource.Builder.build(request); - datasourceFacade.putDatasource(datasource, getIndexResponseListener(datasource, listener)); + protected void internalDoExecute( + final PutDatasourceRequest request, + final LockModel lock, + final ActionListener listener + ) { + StepListener createIndexStep = new StepListener<>(); + datasourceFacade.createIndexIfNotExists(createIndexStep); + createIndexStep.whenComplete(v -> { + Datasource datasource = Datasource.Builder.build(request); + datasourceFacade.putDatasource( + datasource, + getIndexResponseListener(datasource, lockService.getRenewLockRunnable(new AtomicReference<>(lock)), listener) + ); + }, exception -> listener.onFailure(exception)); } @VisibleForTesting protected ActionListener getIndexResponseListener( final Datasource datasource, + final Runnable renewLock, final ActionListener listener ) { return new ActionListener<>() { @@ -87,7 +116,7 @@ protected ActionListener getIndexResponseListener( public void onResponse(final IndexResponse indexResponse) { // This is user initiated request. Therefore, we want to handle the first datasource update task in a generic thread // pool. - threadPool.generic().submit(() -> { createDatasource(datasource); }); + threadPool.generic().submit(() -> { createDatasource(datasource, renewLock); }); listener.onResponse(new AcknowledgedResponse(true)); } @@ -103,7 +132,7 @@ public void onFailure(final Exception e) { } @VisibleForTesting - protected void createDatasource(final Datasource datasource) { + protected void createDatasource(final Datasource datasource, final Runnable renewLock) { if (DatasourceState.CREATING.equals(datasource.getState()) == false) { log.error("Invalid datasource state. Expecting {} but received {}", DatasourceState.CREATING, datasource.getState()); markDatasourceAsCreateFailed(datasource); @@ -111,7 +140,7 @@ protected void createDatasource(final Datasource datasource) { } try { - datasourceUpdateService.updateOrCreateGeoIpData(datasource); + datasourceUpdateService.updateOrCreateGeoIpData(datasource, renewLock); } catch (Exception e) { log.error("Failed to create datasource for {}", datasource.getName(), e); markDatasourceAsCreateFailed(datasource); diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacade.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacade.java index bf436c33..95ae8003 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacade.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacade.java @@ -25,6 +25,7 @@ import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; +import lombok.NonNull; import lombok.extern.log4j.Log4j2; import org.apache.commons.csv.CSVFormat; @@ -332,8 +333,15 @@ public void onFailure(final Exception e) { * @param fields Field name matching with data in CSVRecord in order * @param iterator GeoIP data to insert * @param bulkSize Bulk size of data to process + * @param renewLock Runnable to renew lock */ - public void putGeoIpData(final String indexName, final String[] fields, final Iterator iterator, final int bulkSize) { + public void putGeoIpData( + @NonNull final String indexName, + @NonNull final String[] fields, + @NonNull final Iterator iterator, + final int bulkSize, + @NonNull final Runnable renewLock + ) { TimeValue timeout = clusterSettings.get(Ip2GeoSettings.TIMEOUT); final BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); while (iterator.hasNext()) { @@ -352,6 +360,7 @@ public void putGeoIpData(final String indexName, final String[] fields, final It } bulkRequest.requests().clear(); } + renewLock.run(); } StashedThreadContext.run(client, () -> { client.admin().indices().prepareRefresh(indexName).execute().actionGet(timeout); diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockService.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockService.java index cd41251a..ab646628 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockService.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockService.java @@ -7,15 +7,15 @@ import static org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension.JOB_INDEX_NAME; +import java.time.Instant; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.inject.Inject; -import org.opensearch.common.unit.TimeValue; import org.opensearch.jobscheduler.spi.LockModel; import org.opensearch.jobscheduler.spi.utils.LockService; @@ -23,8 +23,9 @@ * A wrapper of job scheduler's lock service for datasource */ public class Ip2GeoLockService { + public static final long LOCK_DURATION_IN_SECONDS = 300l; + public static final long RENEW_AFTER_IN_SECONDS = 120l; private final ClusterService clusterService; - private final Client client; private final LockService lockService; /** @@ -33,10 +34,8 @@ public class Ip2GeoLockService { * @param clusterService the cluster service * @param client the client */ - @Inject public Ip2GeoLockService(final ClusterService clusterService, final Client client) { this.clusterService = clusterService; - this.client = client; this.lockService = new LockService(client, clusterService); } @@ -68,10 +67,9 @@ public void releaseLock(final LockModel lockModel, final ActionListener * Synchronous method of LockService#renewLock * * @param lockModel lock to renew - * @param timeout timeout in milliseconds precise * @return renewed lock if renew succeed and null otherwise */ - public LockModel renewLock(final LockModel lockModel, final TimeValue timeout) { + public LockModel renewLock(final LockModel lockModel) { AtomicReference lockReference = new AtomicReference(); CountDownLatch countDownLatch = new CountDownLatch(1); lockService.renewLock(lockModel, new ActionListener<>() { @@ -89,10 +87,34 @@ public void onFailure(final Exception e) { }); try { - countDownLatch.await(timeout.getMillis(), TimeUnit.MILLISECONDS); + countDownLatch.await(clusterService.getClusterSettings().get(Ip2GeoSettings.TIMEOUT).getSeconds(), TimeUnit.SECONDS); return lockReference.get(); } catch (InterruptedException e) { return null; } } + + /** + * Return a runnable which can renew the given lock model + * + * The runnable renews the lock and store the renewed lock in the AtomicReference. + * It only renews the lock when it passed {@code RENEW_AFTER_IN_SECONDS} since + * the last time the lock was renewed to avoid resource abuse. + * + * @param lockModel lock model to renew + * @return runnable which can renew the given lock for every call + */ + public Runnable getRenewLockRunnable(final AtomicReference lockModel) { + return () -> { + LockModel preLock = lockModel.get(); + if (Instant.now().isBefore(preLock.getLockTime().plusSeconds(RENEW_AFTER_IN_SECONDS))) { + return; + } + + lockModel.set(renewLock(lockModel.get())); + if (lockModel.get() == null) { + new OpenSearchException("failed to renew a lock [{}]", preLock); + } + }; + } } diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java index 2604490f..f27ab286 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java @@ -33,6 +33,7 @@ import org.opensearch.geospatial.ip2geo.action.PutDatasourceRequest; import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; import org.opensearch.geospatial.ip2geo.common.DatasourceState; +import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService; import org.opensearch.jobscheduler.spi.ScheduledJobParameter; import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; import org.opensearch.jobscheduler.spi.schedule.ScheduleParser; @@ -50,7 +51,6 @@ public class Datasource implements Writeable, ScheduledJobParameter { * Prefix of indices having Ip2Geo data */ public static final String IP2GEO_DATA_INDEX_NAME_PREFIX = ".ip2geo-data"; - private static final long LOCK_DURATION_IN_SECONDS = 60 * 60; private static final long MAX_JITTER_IN_MINUTES = 5; private static final long ONE_DAY_IN_HOURS = 24; private static final long ONE_HOUR_IN_MINUTES = 60; @@ -282,7 +282,7 @@ public boolean isEnabled() { @Override public Long getLockDurationSeconds() { - return LOCK_DURATION_IN_SECONDS; + return Ip2GeoLockService.LOCK_DURATION_IN_SECONDS; } /** diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java index 0ad8438b..4233c8a8 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java @@ -7,6 +7,7 @@ import java.io.IOException; import java.time.Instant; +import java.util.concurrent.atomic.AtomicReference; import lombok.extern.log4j.Log4j2; @@ -16,10 +17,10 @@ import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; import org.opensearch.geospatial.ip2geo.common.DatasourceState; import org.opensearch.geospatial.ip2geo.common.Ip2GeoExecutor; +import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService; import org.opensearch.jobscheduler.spi.JobExecutionContext; import org.opensearch.jobscheduler.spi.ScheduledJobParameter; import org.opensearch.jobscheduler.spi.ScheduledJobRunner; -import org.opensearch.jobscheduler.spi.utils.LockService; /** * Datasource update task @@ -52,6 +53,7 @@ public static DatasourceRunner getJobRunnerInstance() { private DatasourceUpdateService datasourceUpdateService; private Ip2GeoExecutor ip2GeoExecutor; private DatasourceFacade datasourceFacade; + private Ip2GeoLockService ip2GeoLockService; private boolean initialized; private DatasourceRunner() { @@ -65,12 +67,14 @@ public void initialize( final ClusterService clusterService, final DatasourceUpdateService datasourceUpdateService, final Ip2GeoExecutor ip2GeoExecutor, - final DatasourceFacade datasourceFacade + final DatasourceFacade datasourceFacade, + final Ip2GeoLockService ip2GeoLockService ) { this.clusterService = clusterService; this.datasourceUpdateService = datasourceUpdateService; this.ip2GeoExecutor = ip2GeoExecutor; this.datasourceFacade = datasourceFacade; + this.ip2GeoLockService = ip2GeoLockService; this.initialized = true; } @@ -87,30 +91,27 @@ public void runJob(final ScheduledJobParameter jobParameter, final JobExecutionC ); } - ip2GeoExecutor.forDatasourceUpdate().submit(updateDatasourceRunner(jobParameter, context)); + ip2GeoExecutor.forDatasourceUpdate().submit(updateDatasourceRunner(jobParameter)); } /** * Update GeoIP data * * Lock is used so that only one of nodes run this task. - * Lock duration is 1 hour to avoid refreshing. This is okay because update interval is 1 day minimum. * * @param jobParameter job parameter - * @param context context */ @VisibleForTesting - protected Runnable updateDatasourceRunner(final ScheduledJobParameter jobParameter, final JobExecutionContext context) { - final LockService lockService = context.getLockService(); + protected Runnable updateDatasourceRunner(final ScheduledJobParameter jobParameter) { return () -> { - lockService.acquireLock(jobParameter, context, ActionListener.wrap(lock -> { + ip2GeoLockService.acquireLock(jobParameter.getName(), Ip2GeoLockService.LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> { if (lock == null) { return; } try { - updateDatasource(jobParameter); + updateDatasource(jobParameter, ip2GeoLockService.getRenewLockRunnable(new AtomicReference<>(lock))); } finally { - lockService.release( + ip2GeoLockService.releaseLock( lock, ActionListener.wrap(released -> {}, exception -> { log.error("Failed to release lock [{}]", lock, exception); }) ); @@ -120,7 +121,7 @@ protected Runnable updateDatasourceRunner(final ScheduledJobParameter jobParamet } @VisibleForTesting - protected void updateDatasource(final ScheduledJobParameter jobParameter) throws IOException { + protected void updateDatasource(final ScheduledJobParameter jobParameter, final Runnable renewLock) throws IOException { Datasource datasource = datasourceFacade.getDatasource(jobParameter.getName()); /** * If delete request comes while update task is waiting on a queue for other update tasks to complete, @@ -143,7 +144,7 @@ protected void updateDatasource(final ScheduledJobParameter jobParameter) throws try { datasourceUpdateService.deleteUnusedIndices(datasource); - datasourceUpdateService.updateOrCreateGeoIpData(datasource); + datasourceUpdateService.updateOrCreateGeoIpData(datasource, renewLock); datasourceUpdateService.deleteUnusedIndices(datasource); } catch (Exception e) { log.error("Failed to update datasource for {}", datasource.getName(), e); diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java index 2a6155d3..57be4d4a 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java @@ -48,10 +48,11 @@ public DatasourceUpdateService( /** * Update GeoIp data * - * @param datasource + * @param datasource the datasource + * @param renewLock runnable to renew lock * @throws Exception */ - public void updateOrCreateGeoIpData(final Datasource datasource) throws Exception { + public void updateOrCreateGeoIpData(final Datasource datasource, final Runnable renewLock) throws Exception { URL url = new URL(datasource.getEndpoint()); DatasourceManifest manifest = DatasourceManifest.Builder.build(url); @@ -77,7 +78,13 @@ public void updateOrCreateGeoIpData(final Datasource datasource) throws Exceptio datasource.getDatabase().getFields().toString() ); } - geoIpDataFacade.putGeoIpData(indexName, header, reader.iterator(), clusterSettings.get(Ip2GeoSettings.INDEXING_BULK_SIZE)); + geoIpDataFacade.putGeoIpData( + indexName, + header, + reader.iterator(), + clusterSettings.get(Ip2GeoSettings.INDEXING_BULK_SIZE), + renewLock + ); } Instant endTime = Instant.now(); diff --git a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java index e52939b0..6febdf3f 100644 --- a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java +++ b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java @@ -45,6 +45,7 @@ import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade; import org.opensearch.geospatial.ip2geo.common.Ip2GeoExecutor; +import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService; import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceRunner; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService; @@ -130,13 +131,22 @@ public Collection createComponents( DatasourceFacade datasourceFacade = new DatasourceFacade(client, clusterService); DatasourceUpdateService datasourceUpdateService = new DatasourceUpdateService(clusterService, datasourceFacade, geoIpDataFacade); Ip2GeoExecutor ip2GeoExecutor = new Ip2GeoExecutor(threadPool); + Ip2GeoLockService ip2GeoLockService = new Ip2GeoLockService(clusterService, client); /** * We don't need to return datasource runner because it is used only by job scheduler and job scheduler * does not use DI but it calls DatasourceExtension#getJobRunner to get DatasourceRunner instance. */ - DatasourceRunner.getJobRunnerInstance().initialize(clusterService, datasourceUpdateService, ip2GeoExecutor, datasourceFacade); + DatasourceRunner.getJobRunnerInstance() + .initialize(clusterService, datasourceUpdateService, ip2GeoExecutor, datasourceFacade, ip2GeoLockService); - return List.of(UploadStats.getInstance(), datasourceUpdateService, datasourceFacade, ip2GeoExecutor, geoIpDataFacade); + return List.of( + UploadStats.getInstance(), + datasourceUpdateService, + datasourceFacade, + ip2GeoExecutor, + geoIpDataFacade, + ip2GeoLockService + ); } @Override diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java index ee57b5ba..a28ba089 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java @@ -15,7 +15,6 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Locale; -import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -43,10 +42,12 @@ import org.opensearch.geospatial.ip2geo.common.DatasourceState; import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade; import org.opensearch.geospatial.ip2geo.common.Ip2GeoExecutor; +import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService; import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService; import org.opensearch.ingest.IngestService; +import org.opensearch.jobscheduler.spi.LockModel; import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; import org.opensearch.jobscheduler.spi.utils.LockService; import org.opensearch.tasks.Task; @@ -66,8 +67,6 @@ public abstract class Ip2GeoTestCase extends RestActionTestCase { @Mock protected Ip2GeoExecutor ip2GeoExecutor; @Mock - protected ExecutorService executorService; - @Mock protected GeoIpDataFacade geoIpDataFacade; @Mock protected ClusterState clusterState; @@ -81,6 +80,8 @@ public abstract class Ip2GeoTestCase extends RestActionTestCase { protected ThreadPool threadPool; @Mock protected TransportService transportService; + @Mock + protected Ip2GeoLockService ip2GeoLockService; protected NoOpNodeClient client; protected VerifyingClient verifyingClient; protected LockService lockService; @@ -101,7 +102,7 @@ public void prepareIp2GeoTestCase() { when(clusterService.state()).thenReturn(clusterState); when(clusterState.metadata()).thenReturn(metadata); when(clusterState.routingTable()).thenReturn(RoutingTable.EMPTY_ROUTING_TABLE); - when(ip2GeoExecutor.forDatasourceUpdate()).thenReturn(executorService); + when(ip2GeoExecutor.forDatasourceUpdate()).thenReturn(OpenSearchExecutors.newDirectExecutorService()); when(ingestService.getClusterService()).thenReturn(clusterService); when(threadPool.generic()).thenReturn(OpenSearchExecutors.newDirectExecutorService()); } @@ -190,6 +191,17 @@ protected Datasource randomDatasource() { return datasource; } + protected LockModel randomLockModel() { + LockModel lockModel = new LockModel( + GeospatialTestHelper.randomLowerCaseString(), + GeospatialTestHelper.randomLowerCaseString(), + Instant.now(), + randomPositiveLong(), + false + ); + return lockModel; + } + /** * Temporary class of VerifyingClient until this PR(https://github.com/opensearch-project/OpenSearch/pull/7167) * is merged in OpenSearch core diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java index dc150b18..2c8bf9f6 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java @@ -36,7 +36,11 @@ public void testValidate_whenInvalidManifestFile_thenFails() { PutDatasourceRequest request = new PutDatasourceRequest(datasourceName); request.setEndpoint(String.format(Locale.ROOT, "https://%s.com", domain)); request.setUpdateInterval(TimeValue.timeValueDays(1)); + + // Run ActionRequestValidationException exception = request.validate(); + + // Verify assertEquals(1, exception.validationErrors().size()); assertTrue(exception.validationErrors().get(0).contains("Error occurred while reading a file")); } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportActionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportActionTests.java index 0c7bdf6d..339c65cb 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportActionTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportActionTests.java @@ -6,14 +6,22 @@ package org.opensearch.geospatial.ip2geo.action; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.time.Instant; import lombok.SneakyThrows; import org.junit.Before; import org.mockito.ArgumentCaptor; +import org.opensearch.OpenSearchException; import org.opensearch.ResourceAlreadyExistsException; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; @@ -24,6 +32,7 @@ import org.opensearch.geospatial.ip2geo.common.DatasourceState; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; import org.opensearch.index.engine.VersionConflictEngineException; +import org.opensearch.jobscheduler.spi.LockModel; import org.opensearch.tasks.Task; public class PutDatasourceTransportActionTests extends Ip2GeoTestCase { @@ -31,19 +40,75 @@ public class PutDatasourceTransportActionTests extends Ip2GeoTestCase { @Before public void init() { - action = new PutDatasourceTransportAction(transportService, actionFilters, threadPool, datasourceFacade, datasourceUpdateService); + action = new PutDatasourceTransportAction( + transportService, + actionFilters, + threadPool, + datasourceFacade, + datasourceUpdateService, + ip2GeoLockService + ); + } + + @SneakyThrows + public void testDoExecute_whenFailedToAcquireLock_thenError() { + validateDoExecute(null, null); } @SneakyThrows public void testDoExecute_whenValidInput_thenSucceed() { + String jobIndexName = GeospatialTestHelper.randomLowerCaseString(); + String jobId = GeospatialTestHelper.randomLowerCaseString(); + LockModel lockModel = new LockModel(jobIndexName, jobId, Instant.now(), randomPositiveLong(), false); + validateDoExecute(lockModel, null); + } + + @SneakyThrows + public void testDoExecute_whenException_thenError() { + validateDoExecute(null, new RuntimeException()); + } + + private void validateDoExecute(final LockModel lockModel, final Exception exception) throws IOException { Task task = mock(Task.class); + Datasource datasource = randomDatasource(); + when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource); + PutDatasourceRequest request = new PutDatasourceRequest(datasource.getName()); + ActionListener listener = mock(ActionListener.class); + + // Run + action.doExecute(task, request, listener); + + // Verify + ArgumentCaptor> captor = ArgumentCaptor.forClass(ActionListener.class); + verify(ip2GeoLockService).acquireLock(eq(datasource.getName()), anyLong(), captor.capture()); + + if (exception == null) { + // Run + captor.getValue().onResponse(lockModel); + + // Verify + if (lockModel == null) { + verify(listener).onFailure(any(OpenSearchException.class)); + } else { + verify(ip2GeoLockService).releaseLock(eq(lockModel), any(ActionListener.class)); + } + } else { + // Run + captor.getValue().onFailure(exception); + // Verify + verify(listener).onFailure(exception); + } + } + + @SneakyThrows + public void testInternalDoExecute_whenValidInput_thenSucceed() { PutDatasourceRequest request = new PutDatasourceRequest(GeospatialTestHelper.randomLowerCaseString()); request.setEndpoint(sampleManifestUrl()); request.setUpdateInterval(TimeValue.timeValueDays(1)); ActionListener listener = mock(ActionListener.class); // Run - action.doExecute(task, request, listener); + action.internalDoExecute(request, randomLockModel(), listener); // Verify ArgumentCaptor captor = ArgumentCaptor.forClass(StepListener.class); @@ -68,7 +133,7 @@ public void testDoExecute_whenValidInput_thenSucceed() { public void testGetIndexResponseListener_whenVersionConflict_thenFailure() { Datasource datasource = new Datasource(); ActionListener listener = mock(ActionListener.class); - action.getIndexResponseListener(datasource, listener) + action.getIndexResponseListener(datasource, mock(Runnable.class), listener) .onFailure( new VersionConflictEngineException( null, @@ -86,21 +151,22 @@ public void testCreateDatasource_whenInvalidState_thenUpdateStateAsFailed() { datasource.getUpdateStats().setLastFailedAt(null); // Run - action.createDatasource(datasource); + action.createDatasource(datasource, mock(Runnable.class)); // Verify assertEquals(DatasourceState.CREATE_FAILED, datasource.getState()); assertNotNull(datasource.getUpdateStats().getLastFailedAt()); verify(datasourceFacade).updateDatasource(datasource); + verify(datasourceUpdateService, never()).updateOrCreateGeoIpData(any(Datasource.class), any(Runnable.class)); } @SneakyThrows public void testCreateDatasource_whenExceptionHappens_thenUpdateStateAsFailed() { Datasource datasource = new Datasource(); - doThrow(new RuntimeException()).when(datasourceUpdateService).updateOrCreateGeoIpData(datasource); + doThrow(new RuntimeException()).when(datasourceUpdateService).updateOrCreateGeoIpData(any(Datasource.class), any(Runnable.class)); // Run - action.createDatasource(datasource); + action.createDatasource(datasource, mock(Runnable.class)); // Verify assertEquals(DatasourceState.CREATE_FAILED, datasource.getState()); @@ -112,11 +178,12 @@ public void testCreateDatasource_whenExceptionHappens_thenUpdateStateAsFailed() public void testCreateDatasource_whenValidInput_thenUpdateStateAsCreating() { Datasource datasource = new Datasource(); + Runnable renewLock = mock(Runnable.class); // Run - action.createDatasource(datasource); + action.createDatasource(datasource, renewLock); // Verify - verify(datasourceUpdateService).updateOrCreateGeoIpData(datasource); + verify(datasourceUpdateService).updateOrCreateGeoIpData(datasource, renewLock); assertEquals(DatasourceState.CREATING, datasource.getState()); } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacadeTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacadeTests.java index d86c2795..54c98a41 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacadeTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacadeTests.java @@ -25,6 +25,8 @@ import java.util.Locale; import java.util.Map; +import lombok.SneakyThrows; + import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVRecord; @@ -157,7 +159,8 @@ public void testDeleteIp2GeoDataIndexWithNonIp2GeoDataIndex() { verify(verifyingClient, never()).index(any()); } - public void testPutGeoIpData() throws Exception { + @SneakyThrows + public void testPutGeoIpData_whenValidInput_thenSucceed() { String index = GeospatialTestHelper.randomLowerCaseString(); verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { if (actionRequest instanceof BulkRequest) { @@ -188,10 +191,12 @@ public void testPutGeoIpData() throws Exception { throw new RuntimeException("invalid request is called"); } }); + Runnable renewLock = mock(Runnable.class); try (CSVParser csvParser = CSVParser.parse(sampleIp2GeoFile(), StandardCharsets.UTF_8, CSVFormat.RFC4180)) { Iterator iterator = csvParser.iterator(); String[] fields = iterator.next().values(); - verifyingGeoIpDataFacade.putGeoIpData(index, fields, iterator, 1); + verifyingGeoIpDataFacade.putGeoIpData(index, fields, iterator, 1, renewLock); + verify(renewLock, times(2)).run(); } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockServiceTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockServiceTests.java index a746f314..f0bc8ce7 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockServiceTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockServiceTests.java @@ -6,41 +6,103 @@ package org.opensearch.geospatial.ip2geo.common; import static org.mockito.Mockito.mock; +import static org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService.LOCK_DURATION_IN_SECONDS; +import static org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService.RENEW_AFTER_IN_SECONDS; import java.time.Instant; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Before; import org.opensearch.action.ActionListener; -import org.opensearch.common.unit.TimeValue; +import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.update.UpdateRequest; +import org.opensearch.action.update.UpdateResponse; import org.opensearch.geospatial.GeospatialTestHelper; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; +import org.opensearch.index.shard.ShardId; +import org.opensearch.jobscheduler.spi.LockModel; public class Ip2GeoLockServiceTests extends Ip2GeoTestCase { private Ip2GeoLockService ip2GeoLockService; + private Ip2GeoLockService noOpsLockService; @Before public void init() { - ip2GeoLockService = new Ip2GeoLockService(clusterService, client); + ip2GeoLockService = new Ip2GeoLockService(clusterService, verifyingClient); + noOpsLockService = new Ip2GeoLockService(clusterService, client); } public void testAcquireLock_whenValidInput_thenSucceed() { // Cannot test because LockService is final class // Simply calling method to increase coverage - ip2GeoLockService.acquireLock(GeospatialTestHelper.randomLowerCaseString(), randomPositiveLong(), mock(ActionListener.class)); + noOpsLockService.acquireLock(GeospatialTestHelper.randomLowerCaseString(), randomPositiveLong(), mock(ActionListener.class)); } public void testReleaseLock_whenValidInput_thenSucceed() { // Cannot test because LockService is final class // Simply calling method to increase coverage - ip2GeoLockService.releaseLock(null, mock(ActionListener.class)); + noOpsLockService.releaseLock(null, mock(ActionListener.class)); } public void testRenewLock_whenCalled_thenNotBlocked() { - long timeoutInMillis = 10000; long expectedDurationInMillis = 1000; Instant before = Instant.now(); - assertNull(ip2GeoLockService.renewLock(null, TimeValue.timeValueMillis(timeoutInMillis))); + assertNull(ip2GeoLockService.renewLock(null)); Instant after = Instant.now(); assertTrue(after.toEpochMilli() - before.toEpochMilli() < expectedDurationInMillis); } + + public void testGetRenewLockRunnable_whenLockIsFresh_thenDoNotRenew() { + LockModel lockModel = new LockModel( + GeospatialTestHelper.randomLowerCaseString(), + GeospatialTestHelper.randomLowerCaseString(), + Instant.now(), + LOCK_DURATION_IN_SECONDS, + false + ); + + verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { + // Verifying + assertTrue(actionRequest instanceof UpdateRequest); + return new UpdateResponse( + mock(ShardId.class), + GeospatialTestHelper.randomLowerCaseString(), + randomPositiveLong(), + randomPositiveLong(), + randomPositiveLong(), + DocWriteResponse.Result.UPDATED + ); + }); + + AtomicReference reference = new AtomicReference<>(lockModel); + ip2GeoLockService.getRenewLockRunnable(reference).run(); + assertEquals(lockModel, reference.get()); + } + + public void testGetRenewLockRunnable_whenLockIsStale_thenRenew() { + LockModel lockModel = new LockModel( + GeospatialTestHelper.randomLowerCaseString(), + GeospatialTestHelper.randomLowerCaseString(), + Instant.now().minusSeconds(RENEW_AFTER_IN_SECONDS), + LOCK_DURATION_IN_SECONDS, + false + ); + + verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { + // Verifying + assertTrue(actionRequest instanceof UpdateRequest); + return new UpdateResponse( + mock(ShardId.class), + GeospatialTestHelper.randomLowerCaseString(), + randomPositiveLong(), + randomPositiveLong(), + randomPositiveLong(), + DocWriteResponse.Result.UPDATED + ); + }); + + AtomicReference reference = new AtomicReference<>(lockModel); + ip2GeoLockService.getRenewLockRunnable(reference).run(); + assertNotEquals(lockModel, reference.get()); + } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java index 274689ac..1ccd88a5 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java @@ -6,6 +6,8 @@ package org.opensearch.geospatial.ip2geo.jobscheduler; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -14,52 +16,111 @@ import static org.mockito.internal.verification.VerificationModeFactory.times; import static org.opensearch.geospatial.GeospatialTestHelper.randomLowerCaseString; +import java.io.IOException; import java.time.Instant; +import lombok.SneakyThrows; + import org.junit.Before; +import org.mockito.ArgumentCaptor; +import org.opensearch.action.ActionListener; +import org.opensearch.geospatial.GeospatialTestHelper; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; import org.opensearch.geospatial.ip2geo.common.DatasourceState; +import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService; import org.opensearch.jobscheduler.spi.JobDocVersion; import org.opensearch.jobscheduler.spi.JobExecutionContext; +import org.opensearch.jobscheduler.spi.LockModel; import org.opensearch.jobscheduler.spi.ScheduledJobParameter; public class DatasourceRunnerTests extends Ip2GeoTestCase { @Before public void init() { - DatasourceRunner.getJobRunnerInstance().initialize(clusterService, datasourceUpdateService, ip2GeoExecutor, datasourceFacade); + DatasourceRunner.getJobRunnerInstance() + .initialize(clusterService, datasourceUpdateService, ip2GeoExecutor, datasourceFacade, ip2GeoLockService); } - public void testRunJobInvalidClass() { + public void testRunJob_whenInvalidClass_thenThrowException() { JobExecutionContext jobExecutionContext = mock(JobExecutionContext.class); ScheduledJobParameter jobParameter = mock(ScheduledJobParameter.class); expectThrows(IllegalStateException.class, () -> DatasourceRunner.getJobRunnerInstance().runJob(jobParameter, jobExecutionContext)); } - public void testRunJob() { + public void testRunJob_whenValidInput_thenSucceed() { JobDocVersion jobDocVersion = new JobDocVersion(randomInt(), randomInt(), randomInt()); String jobIndexName = randomLowerCaseString(); String jobId = randomLowerCaseString(); JobExecutionContext jobExecutionContext = new JobExecutionContext(Instant.now(), jobDocVersion, lockService, jobIndexName, jobId); - Datasource datasource = new Datasource(); + Datasource datasource = randomDatasource(); // Run DatasourceRunner.getJobRunnerInstance().runJob(datasource, jobExecutionContext); // Verify - verify(executorService).submit(any(Runnable.class)); + verify(ip2GeoLockService).acquireLock( + eq(datasource.getName()), + eq(Ip2GeoLockService.LOCK_DURATION_IN_SECONDS), + any(ActionListener.class) + ); + } + + @SneakyThrows + public void testUpdateDatasourceRunner_whenFailedToAcquireLock_thenError() { + validateDoExecute(null, null); + } + + @SneakyThrows + public void testUpdateDatasourceRunner_whenValidInput_thenSucceed() { + String jobIndexName = GeospatialTestHelper.randomLowerCaseString(); + String jobId = GeospatialTestHelper.randomLowerCaseString(); + LockModel lockModel = new LockModel(jobIndexName, jobId, Instant.now(), randomPositiveLong(), false); + validateDoExecute(lockModel, null); + } + + @SneakyThrows + public void testUpdateDatasourceRunner_whenException_thenError() { + validateDoExecute(null, new RuntimeException()); + } + + private void validateDoExecute(final LockModel lockModel, final Exception exception) throws IOException { + ScheduledJobParameter jobParameter = mock(ScheduledJobParameter.class); + when(jobParameter.getName()).thenReturn(GeospatialTestHelper.randomLowerCaseString()); + + // Run + DatasourceRunner.getJobRunnerInstance().updateDatasourceRunner(jobParameter).run(); + + // Verify + ArgumentCaptor> captor = ArgumentCaptor.forClass(ActionListener.class); + verify(ip2GeoLockService).acquireLock(eq(jobParameter.getName()), anyLong(), captor.capture()); + + if (exception == null) { + // Run + captor.getValue().onResponse(lockModel); + + // Verify + verify(ip2GeoLockService, lockModel == null ? never() : times(1)).releaseLock(eq(lockModel), any(ActionListener.class)); + } else { + // Run + captor.getValue().onFailure(exception); + + // Verify + verify(ip2GeoLockService, never()).releaseLock(eq(lockModel), any(ActionListener.class)); + } } - public void testUpdateDatasourceNull() throws Exception { + @SneakyThrows + public void testUpdateDatasource_whenDatasourceDoesNotExist_thenDoNothing() { Datasource datasource = new Datasource(); // Run - DatasourceRunner.getJobRunnerInstance().updateDatasource(datasource); + DatasourceRunner.getJobRunnerInstance().updateDatasource(datasource, mock(Runnable.class)); // Verify verify(datasourceUpdateService, never()).deleteUnusedIndices(any()); } - public void testUpdateDatasourceInvalidState() throws Exception { + @SneakyThrows + public void testUpdateDatasource_whenInvalidState_thenUpdateLastFailedAt() { Datasource datasource = new Datasource(); datasource.enable(); datasource.getUpdateStats().setLastFailedAt(null); @@ -67,7 +128,7 @@ public void testUpdateDatasourceInvalidState() throws Exception { when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource); // Run - DatasourceRunner.getJobRunnerInstance().updateDatasource(datasource); + DatasourceRunner.getJobRunnerInstance().updateDatasource(datasource, mock(Runnable.class)); // Verify assertFalse(datasource.isEnabled()); @@ -75,21 +136,24 @@ public void testUpdateDatasourceInvalidState() throws Exception { verify(datasourceFacade).updateDatasource(datasource); } - public void testUpdateDatasource() throws Exception { + @SneakyThrows + public void testUpdateDatasource_whenValidInput_thenSucceed() { Datasource datasource = new Datasource(); datasource.setState(DatasourceState.AVAILABLE); datasource.setName(randomLowerCaseString()); when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource); + Runnable renewLock = mock(Runnable.class); // Run - DatasourceRunner.getJobRunnerInstance().updateDatasource(datasource); + DatasourceRunner.getJobRunnerInstance().updateDatasource(datasource, renewLock); // Verify verify(datasourceUpdateService, times(2)).deleteUnusedIndices(datasource); - verify(datasourceUpdateService).updateOrCreateGeoIpData(datasource); + verify(datasourceUpdateService).updateOrCreateGeoIpData(datasource, renewLock); } - public void testUpdateDatasourceExceptionHandling() throws Exception { + @SneakyThrows + public void testUpdateDatasourceExceptionHandling() { Datasource datasource = new Datasource(); datasource.setName(randomLowerCaseString()); datasource.getUpdateStats().setLastFailedAt(null); @@ -97,7 +161,7 @@ public void testUpdateDatasourceExceptionHandling() throws Exception { doThrow(new RuntimeException("test failure")).when(datasourceUpdateService).deleteUnusedIndices(any()); // Run - DatasourceRunner.getJobRunnerInstance().updateDatasource(datasource); + DatasourceRunner.getJobRunnerInstance().updateDatasource(datasource, mock(Runnable.class)); // Verify assertNotNull(datasource.getUpdateStats().getLastFailedAt()); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java index f7ca7d2e..6e51c926 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java @@ -6,6 +6,10 @@ package org.opensearch.geospatial.ip2geo.jobscheduler; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -14,6 +18,9 @@ import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.Arrays; +import java.util.Iterator; + +import lombok.SneakyThrows; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVParser; @@ -35,7 +42,8 @@ public void init() { datasourceUpdateService = new DatasourceUpdateService(clusterService, datasourceFacade, geoIpDataFacade); } - public void testUpdateDatasourceSkip() throws Exception { + @SneakyThrows + public void testUpdateOrCreateGeoIpData_whenHashValueIsSame_thenSkipUpdate() { File manifestFile = new File(this.getClass().getClassLoader().getResource("ip2geo/manifest.json").getFile()); DatasourceManifest manifest = DatasourceManifest.Builder.build(manifestFile.toURI().toURL()); @@ -47,14 +55,15 @@ public void testUpdateDatasourceSkip() throws Exception { datasource.setEndpoint(manifestFile.toURI().toURL().toExternalForm()); // Run - datasourceUpdateService.updateOrCreateGeoIpData(datasource); + datasourceUpdateService.updateOrCreateGeoIpData(datasource, mock(Runnable.class)); // Verify assertNotNull(datasource.getUpdateStats().getLastSkippedAt()); verify(datasourceFacade).updateDatasource(datasource); } - public void testUpdateDatasourceInvalidFile() throws Exception { + @SneakyThrows + public void testUpdateOrCreateGeoIpData_whenInvalidData_thenThrowException() { File manifestFile = new File(this.getClass().getClassLoader().getResource("ip2geo/manifest.json").getFile()); DatasourceManifest manifest = DatasourceManifest.Builder.build(manifestFile.toURI().toURL()); @@ -71,10 +80,11 @@ public void testUpdateDatasourceInvalidFile() throws Exception { datasource.setEndpoint(manifestFile.toURI().toURL().toExternalForm()); // Run - expectThrows(OpenSearchException.class, () -> datasourceUpdateService.updateOrCreateGeoIpData(datasource)); + expectThrows(OpenSearchException.class, () -> datasourceUpdateService.updateOrCreateGeoIpData(datasource, mock(Runnable.class))); } - public void testUpdateDatasourceIncompatibleFields() throws Exception { + @SneakyThrows + public void testUpdateOrCreateGeoIpData_whenIncompatibleFields_thenThrowException() { File manifestFile = new File(this.getClass().getClassLoader().getResource("ip2geo/manifest.json").getFile()); DatasourceManifest manifest = DatasourceManifest.Builder.build(manifestFile.toURI().toURL()); @@ -89,10 +99,11 @@ public void testUpdateDatasourceIncompatibleFields() throws Exception { datasource.setEndpoint(manifestFile.toURI().toURL().toExternalForm()); // Run - expectThrows(OpenSearchException.class, () -> datasourceUpdateService.updateOrCreateGeoIpData(datasource)); + expectThrows(OpenSearchException.class, () -> datasourceUpdateService.updateOrCreateGeoIpData(datasource, mock(Runnable.class))); } - public void testUpdateDatasource() throws Exception { + @SneakyThrows + public void testUpdateOrCreateGeoIpData_whenValidInput_thenSucceed() { File manifestFile = new File(this.getClass().getClassLoader().getResource("ip2geo/manifest.json").getFile()); DatasourceManifest manifest = DatasourceManifest.Builder.build(manifestFile.toURI().toURL()); @@ -109,7 +120,7 @@ public void testUpdateDatasource() throws Exception { datasource.getUpdateStats().setLastProcessingTimeInMillis(null); // Run - datasourceUpdateService.updateOrCreateGeoIpData(datasource); + datasourceUpdateService.updateOrCreateGeoIpData(datasource, mock(Runnable.class)); // Verify assertEquals(manifest.getProvider(), datasource.getDatabase().getProvider()); @@ -119,9 +130,17 @@ public void testUpdateDatasource() throws Exception { assertNotNull(datasource.getUpdateStats().getLastSucceededAt()); assertNotNull(datasource.getUpdateStats().getLastProcessingTimeInMillis()); verify(datasourceFacade, times(2)).updateDatasource(datasource); + verify(geoIpDataFacade).putGeoIpData( + eq(datasource.currentIndexName()), + isA(String[].class), + any(Iterator.class), + anyInt(), + any(Runnable.class) + ); } - public void testDeleteUnusedIndices() throws Exception { + @SneakyThrows + public void testDeleteUnusedIndices_whenValidInput_thenSucceed() { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); String indexPrefix = String.format(".ip2geo-data.%s.", datasourceName); Instant now = Instant.now(); diff --git a/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java b/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java index 3917e7c7..cc31f5a7 100644 --- a/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java +++ b/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java @@ -37,6 +37,7 @@ import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade; import org.opensearch.geospatial.ip2geo.common.Ip2GeoExecutor; +import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService; import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService; import org.opensearch.geospatial.processor.FeatureProcessor; @@ -71,7 +72,8 @@ public class GeospatialPluginTests extends OpenSearchTestCase { DatasourceUpdateService.class, DatasourceFacade.class, Ip2GeoExecutor.class, - GeoIpDataFacade.class + GeoIpDataFacade.class, + Ip2GeoLockService.class ); @Mock