From 918f7513bbc7087715a2d5bcff565736c56a0d38 Mon Sep 17 00:00:00 2001 From: Heemin Kim Date: Mon, 15 May 2023 14:35:01 -0700 Subject: [PATCH] Add ConcurrentModificationException Signed-off-by: Heemin Kim --- .../ConcurrentModificationException.java | 37 +++++++++++++++++++ .../DeleteDatasourceTransportAction.java | 9 ++--- .../action/PutDatasourceTransportAction.java | 15 ++++---- .../UpdateDatasourceTransportAction.java | 18 ++++----- .../ConcurrentModificationExceptionTests.java | 16 ++++++++ .../geospatial/ip2geo/Ip2GeoTestCase.java | 2 +- 6 files changed, 74 insertions(+), 23 deletions(-) create mode 100644 src/main/java/org/opensearch/geospatial/exceptions/ConcurrentModificationException.java create mode 100644 src/test/java/org/opensearch/geospatial/exceptions/ConcurrentModificationExceptionTests.java diff --git a/src/main/java/org/opensearch/geospatial/exceptions/ConcurrentModificationException.java b/src/main/java/org/opensearch/geospatial/exceptions/ConcurrentModificationException.java new file mode 100644 index 00000000..579f22f4 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/exceptions/ConcurrentModificationException.java @@ -0,0 +1,37 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.exceptions; + +import java.io.IOException; + +import org.opensearch.OpenSearchException; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.rest.RestStatus; + +/** + * General ConcurrentModificationException corresponding to the {@link RestStatus#BAD_REQUEST} status code + * + * The exception is thrown when multiple mutation API is called for a same resource at the same time + */ +public class ConcurrentModificationException extends OpenSearchException { + + public ConcurrentModificationException(String msg, Object... args) { + super(msg, args); + } + + public ConcurrentModificationException(String msg, Throwable cause, Object... args) { + super(msg, cause, args); + } + + public ConcurrentModificationException(StreamInput in) throws IOException { + super(in); + } + + @Override + public final RestStatus status() { + return RestStatus.BAD_REQUEST; + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java index 753c107d..e187f243 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java @@ -5,11 +5,7 @@ package org.opensearch.geospatial.ip2geo.action; -import java.io.IOException; - import lombok.extern.log4j.Log4j2; - -import org.opensearch.OpenSearchException; import org.opensearch.ResourceNotFoundException; import org.opensearch.action.ActionListener; import org.opensearch.action.support.ActionFilters; @@ -17,6 +13,7 @@ import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.common.inject.Inject; import org.opensearch.geospatial.annotation.VisibleForTesting; +import org.opensearch.geospatial.exceptions.ConcurrentModificationException; import org.opensearch.geospatial.exceptions.ResourceInUseException; import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; import org.opensearch.geospatial.ip2geo.common.DatasourceState; @@ -27,6 +24,8 @@ import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; +import java.io.IOException; + /** * Transport action to delete datasource */ @@ -73,7 +72,7 @@ public DeleteDatasourceTransportAction( protected void doExecute(final Task task, final DeleteDatasourceRequest request, final ActionListener listener) { lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> { if (lock == null) { - listener.onFailure(new OpenSearchException("another processor is holding a lock on the resource. Try again later")); + listener.onFailure(new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later")); return; } try { diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java index fa3fcf48..36e2aa6a 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java @@ -5,14 +5,7 @@ package org.opensearch.geospatial.ip2geo.action; -import static org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService.LOCK_DURATION_IN_SECONDS; - -import java.time.Instant; -import java.util.concurrent.atomic.AtomicReference; - import lombok.extern.log4j.Log4j2; - -import org.opensearch.OpenSearchException; import org.opensearch.ResourceAlreadyExistsException; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; @@ -22,6 +15,7 @@ import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.common.inject.Inject; import org.opensearch.geospatial.annotation.VisibleForTesting; +import org.opensearch.geospatial.exceptions.ConcurrentModificationException; import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; import org.opensearch.geospatial.ip2geo.common.DatasourceState; import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService; @@ -33,6 +27,11 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; +import java.time.Instant; +import java.util.concurrent.atomic.AtomicReference; + +import static org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService.LOCK_DURATION_IN_SECONDS; + /** * Transport action to create datasource */ @@ -72,7 +71,7 @@ public PutDatasourceTransportAction( protected void doExecute(final Task task, final PutDatasourceRequest request, final ActionListener listener) { lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> { if (lock == null) { - listener.onFailure(new OpenSearchException("another processor is holding a lock on the resource. Try again later")); + listener.onFailure(new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later")); return; } try { diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java index 20634552..20375d4d 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java @@ -5,15 +5,7 @@ package org.opensearch.geospatial.ip2geo.action; -import java.io.IOException; -import java.net.URL; -import java.security.InvalidParameterException; -import java.time.temporal.ChronoUnit; -import java.util.List; -import java.util.Locale; - import lombok.extern.log4j.Log4j2; - import org.opensearch.OpenSearchException; import org.opensearch.ResourceNotFoundException; import org.opensearch.action.ActionListener; @@ -21,6 +13,7 @@ import org.opensearch.action.support.HandledTransportAction; import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.common.inject.Inject; +import org.opensearch.geospatial.exceptions.ConcurrentModificationException; import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService; @@ -30,6 +23,13 @@ import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; +import java.io.IOException; +import java.net.URL; +import java.security.InvalidParameterException; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Locale; + /** * Transport action to update datasource */ @@ -74,7 +74,7 @@ public UpdateDatasourceTransportAction( protected void doExecute(final Task task, final UpdateDatasourceRequest request, final ActionListener listener) { lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> { if (lock == null) { - listener.onFailure(new OpenSearchException("another processor is holding a lock on the resource. Try again later")); + listener.onFailure(new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later")); return; } try { diff --git a/src/test/java/org/opensearch/geospatial/exceptions/ConcurrentModificationExceptionTests.java b/src/test/java/org/opensearch/geospatial/exceptions/ConcurrentModificationExceptionTests.java new file mode 100644 index 00000000..53e6ce10 --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/exceptions/ConcurrentModificationExceptionTests.java @@ -0,0 +1,16 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.exceptions; + +import org.opensearch.rest.RestStatus; +import org.opensearch.test.OpenSearchTestCase; + +public class ConcurrentModificationExceptionTests extends OpenSearchTestCase { + public void testStatusCode() { + ConcurrentModificationException exception = new ConcurrentModificationException("Resource is being modified by another processor"); + assertEquals(RestStatus.BAD_REQUEST, exception.status()); + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java index 4e76d6d4..931dfdc1 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java @@ -183,7 +183,7 @@ protected Datasource randomDatasource() { Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); Datasource datasource = new Datasource(); datasource.setName(GeospatialTestHelper.randomLowerCaseString()); - datasource.setSchedule(new IntervalSchedule(now, Randomness.get().nextInt(29), ChronoUnit.DAYS)); + datasource.setSchedule(new IntervalSchedule(now, Randomness.get().nextInt(28) + 1, ChronoUnit.DAYS)); datasource.setState(randomState()); datasource.setIndices(Arrays.asList(GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString())); datasource.setEndpoint(String.format(Locale.ROOT, "https://%s.com/manifest.json", GeospatialTestHelper.randomLowerCaseString()));