From 2047ae6f3e328da0294236ac465a22bd0b50683b Mon Sep 17 00:00:00 2001 From: Heemin Kim Date: Thu, 4 May 2023 20:16:49 -0700 Subject: [PATCH] Add wrapper class of job scheduler lock service Signed-off-by: Heemin Kim --- .../ip2geo/common/Ip2GeoLockService.java | 98 +++++++++++++++++++ .../ip2geo/common/Ip2GeoLockServiceTests.java | 46 +++++++++ 2 files changed, 144 insertions(+) create mode 100644 src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockService.java create mode 100644 src/test/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockServiceTests.java diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockService.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockService.java new file mode 100644 index 00000000..cd41251a --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockService.java @@ -0,0 +1,98 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.common; + +import static org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension.JOB_INDEX_NAME; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.opensearch.action.ActionListener; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.jobscheduler.spi.LockModel; +import org.opensearch.jobscheduler.spi.utils.LockService; + +/** + * A wrapper of job scheduler's lock service for datasource + */ +public class Ip2GeoLockService { + private final ClusterService clusterService; + private final Client client; + private final LockService lockService; + + /** + * Constructor + * + * @param clusterService the cluster service + * @param client the client + */ + @Inject + public Ip2GeoLockService(final ClusterService clusterService, final Client client) { + this.clusterService = clusterService; + this.client = client; + this.lockService = new LockService(client, clusterService); + } + + /** + * Wrapper method of LockService#acquireLockWithId + * + * Datasource use its name as doc id in job scheduler. Therefore, we can use datasource name to acquire + * a lock on a datasource. + * + * @param datasourceName datasourceName to acquire lock on + * @param lockDurationSeconds the lock duration in seconds + * @param listener the listener + */ + public void acquireLock(final String datasourceName, final Long lockDurationSeconds, final ActionListener listener) { + lockService.acquireLockWithId(JOB_INDEX_NAME, lockDurationSeconds, datasourceName, listener); + } + + /** + * Wrapper method of LockService#release + * + * @param lockModel the lock model + * @param listener the listener + */ + public void releaseLock(final LockModel lockModel, final ActionListener listener) { + lockService.release(lockModel, listener); + } + + /** + * Synchronous method of LockService#renewLock + * + * @param lockModel lock to renew + * @param timeout timeout in milliseconds precise + * @return renewed lock if renew succeed and null otherwise + */ + public LockModel renewLock(final LockModel lockModel, final TimeValue timeout) { + AtomicReference lockReference = new AtomicReference(); + CountDownLatch countDownLatch = new CountDownLatch(1); + lockService.renewLock(lockModel, new ActionListener<>() { + @Override + public void onResponse(final LockModel lockModel) { + lockReference.set(lockModel); + countDownLatch.countDown(); + } + + @Override + public void onFailure(final Exception e) { + lockReference.set(null); + countDownLatch.countDown(); + } + }); + + try { + countDownLatch.await(timeout.getMillis(), TimeUnit.MILLISECONDS); + return lockReference.get(); + } catch (InterruptedException e) { + return null; + } + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockServiceTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockServiceTests.java new file mode 100644 index 00000000..a746f314 --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockServiceTests.java @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.common; + +import static org.mockito.Mockito.mock; + +import java.time.Instant; + +import org.junit.Before; +import org.opensearch.action.ActionListener; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; + +public class Ip2GeoLockServiceTests extends Ip2GeoTestCase { + private Ip2GeoLockService ip2GeoLockService; + + @Before + public void init() { + ip2GeoLockService = new Ip2GeoLockService(clusterService, client); + } + + public void testAcquireLock_whenValidInput_thenSucceed() { + // Cannot test because LockService is final class + // Simply calling method to increase coverage + ip2GeoLockService.acquireLock(GeospatialTestHelper.randomLowerCaseString(), randomPositiveLong(), mock(ActionListener.class)); + } + + public void testReleaseLock_whenValidInput_thenSucceed() { + // Cannot test because LockService is final class + // Simply calling method to increase coverage + ip2GeoLockService.releaseLock(null, mock(ActionListener.class)); + } + + public void testRenewLock_whenCalled_thenNotBlocked() { + long timeoutInMillis = 10000; + long expectedDurationInMillis = 1000; + Instant before = Instant.now(); + assertNull(ip2GeoLockService.renewLock(null, TimeValue.timeValueMillis(timeoutInMillis))); + Instant after = Instant.now(); + assertTrue(after.toEpochMilli() - before.toEpochMilli() < expectedDurationInMillis); + } +}