From 78b3351a8a5c97cb3bd0afec15843d732b53cc5d Mon Sep 17 00:00:00 2001 From: Joanne Wang Date: Fri, 16 Aug 2024 13:54:47 -0700 Subject: [PATCH] fix release lock for previous threat intel Signed-off-by: Joanne Wang --- .../jobscheduler/TIFJobRunner.java | 18 +++++- .../transport/TransportPutTIFJobAction.java | 56 +++++++++++++++---- .../common/ThreatIntelLockServiceTests.java | 5 +- 3 files changed, 65 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunner.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunner.java index 65d7e46e5..9e9022294 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunner.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunner.java @@ -115,10 +115,24 @@ protected Runnable updateJobRunner(final ScheduledJobParameter jobParameter) { ActionListener.wrap(lock -> { updateJobParameter(jobParameter, lockService.getRenewLockRunnable(new AtomicReference<>(lock)), ActionListener.wrap( - r -> lockService.releaseLock(lock), + r -> lockService.releaseLockEventDriven(lock, ActionListener.wrap( + response -> { + log.debug("Released tif job parameter lock with id [{}]", lock.getLockId()); + }, + ex -> { + log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), jobParameter.getName()), ex); + } + )), e -> { log.error("Failed to update job parameter " + jobParameter.getName(), e); - lockService.releaseLock(lock); + lockService.releaseLockEventDriven(lock, ActionListener.wrap( + response -> { + log.debug("Released tif job parameter lock with id [{}]", lock.getLockId()); + }, + ex -> { + log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), jobParameter.getName()), ex); + } + )); } )); }, e -> { diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java index 41bad5b1a..a72844e40 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java @@ -106,9 +106,17 @@ protected void doExecute(final Task task, final PutTIFJobRequest request, final try { internalDoExecute(request, lock, listener); } catch (Exception e) { - lockService.releaseLock(lock); - listener.onFailure(e); - log.error("listener failed when executing", e); + log.error("Failed execution to put tif job action", e); + lockService.releaseLockEventDriven(lock, ActionListener.wrap( + r -> { + log.debug("Released tif job parameter lock with id [{}]", lock.getLockId()); + listener.onFailure(e); + }, + ex -> { + log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), request.getName()), ex); + listener.onFailure(e); + } + )); } }, exception -> { listener.onFailure(exception); @@ -138,9 +146,17 @@ protected void internalDoExecute( listener.onFailure(e); } }, exception -> { - lockService.releaseLock(lock); - log.error("failed to release lock", exception); - listener.onFailure(exception); + log.error("Failed to save tif job parameter", exception); + lockService.releaseLockEventDriven(lock, ActionListener.wrap( + r -> { + log.debug("Released tif job parameter lock with id [{}]", lock.getLockId()); + listener.onFailure(exception); + }, + ex -> { + log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), request.getName()), ex); + listener.onFailure(exception); + } + )); }); tifJobParameterService.createJobIndexIfNotExists(createIndexStepListener); } @@ -160,22 +176,40 @@ protected ActionListener postIndexingTifJobParameter( createThreatIntelFeedData(tifJobParameter, lockService.getRenewLockRunnable(lockReference), ActionListener.wrap( threatIntelIndicesResponse -> { if (threatIntelIndicesResponse.isAcknowledged()) { - lockService.releaseLock(lockReference.get()); - listener.onResponse(new AcknowledgedResponse(true)); + lockService.releaseLockEventDriven(lockReference.get(), ActionListener.wrap( + r -> { + log.debug("Released tif job parameter lock with id [{}]", lock.getLockId()); + listener.onResponse(new AcknowledgedResponse(true)); + }, + ex -> { + log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), tifJobParameter.getName()), ex); + listener.onFailure(ex); + } + )); } else { listener.onFailure(new OpenSearchStatusException("creation of threat intel feed data failed", RestStatus.INTERNAL_SERVER_ERROR)); } }, listener::onFailure )); }, e -> { - lockService.releaseLock(lock); + Exception exception; if (e instanceof VersionConflictEngineException) { log.error("tifJobParameter already exists"); - listener.onFailure(new ResourceAlreadyExistsException("tifJobParameter [{}] already exists", tifJobParameter.getName())); + exception = new ResourceAlreadyExistsException("tifJobParameter [{}] already exists", tifJobParameter.getName()); } else { log.error("Internal server error"); - listener.onFailure(e); + exception = e; } + lockService.releaseLockEventDriven(lock, ActionListener.wrap( + r -> { + log.debug("Released tif job parameter lock with id [{}]", lock.getLockId()); + listener.onFailure(exception); + }, + ex -> { + log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), tifJobParameter.getName()), ex); + listener.onFailure(exception); + } + )); } ); } diff --git a/src/test/java/org/opensearch/securityanalytics/threatIntel/common/ThreatIntelLockServiceTests.java b/src/test/java/org/opensearch/securityanalytics/threatIntel/common/ThreatIntelLockServiceTests.java index ba6f54926..ec6041322 100644 --- a/src/test/java/org/opensearch/securityanalytics/threatIntel/common/ThreatIntelLockServiceTests.java +++ b/src/test/java/org/opensearch/securityanalytics/threatIntel/common/ThreatIntelLockServiceTests.java @@ -12,6 +12,7 @@ import java.time.Instant; import java.util.concurrent.atomic.AtomicReference; +import org.junit.Assert; import org.junit.Before; import org.opensearch.action.DocWriteResponse; import org.opensearch.action.update.UpdateRequest; @@ -59,7 +60,9 @@ public void testReleaseLock_whenValidInput_thenSucceed() { LOCK_DURATION_IN_SECONDS, false ); - noOpsLockService.releaseLock(lockModel); + noOpsLockService.releaseLockEventDriven(lockModel, ActionListener.wrap( + Assert::assertFalse, e -> fail() + )); } public void testRenewLock_whenCalled_thenNotBlocked() {