diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunner.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunner.java index 65d7e46e5..9e9022294 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunner.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunner.java @@ -115,10 +115,24 @@ protected Runnable updateJobRunner(final ScheduledJobParameter jobParameter) { ActionListener.wrap(lock -> { updateJobParameter(jobParameter, lockService.getRenewLockRunnable(new AtomicReference<>(lock)), ActionListener.wrap( - r -> lockService.releaseLock(lock), + r -> lockService.releaseLockEventDriven(lock, ActionListener.wrap( + response -> { + log.debug("Released tif job parameter lock with id [{}]", lock.getLockId()); + }, + ex -> { + log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), jobParameter.getName()), ex); + } + )), e -> { log.error("Failed to update job parameter " + jobParameter.getName(), e); - lockService.releaseLock(lock); + lockService.releaseLockEventDriven(lock, ActionListener.wrap( + response -> { + log.debug("Released tif job parameter lock with id [{}]", lock.getLockId()); + }, + ex -> { + log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), jobParameter.getName()), ex); + } + )); } )); }, e -> { diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFSourceConfigRunner.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFSourceConfigRunner.java index 2b729f0f4..fc01bbad7 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFSourceConfigRunner.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFSourceConfigRunner.java @@ -107,10 +107,26 @@ protected Runnable retrieveLockAndUpdateConfig(final SATIFSourceConfig saTifSour ActionListener.wrap(lock -> { updateSourceConfigAndIOCs(saTifSourceConfig, lockService.getRenewLockRunnable(new AtomicReference<>(lock)), ActionListener.wrap( - r -> lockService.releaseLock(lock), + r -> { + lockService.releaseLockEventDriven(lock, ActionListener.wrap( + response -> { + log.debug("Released threat intel source config lock with id [{}]", lock.getLockId()); + }, + ex -> { + log.error(String.format("Unexpected failure while trying to release lock [%s] for tif source config [%s].", lock.getLockId(), saTifSourceConfig.getId()), ex); + } + )); + }, e -> { log.error("Failed to update threat intel source config " + saTifSourceConfig.getName(), e); - lockService.releaseLock(lock); + lockService.releaseLockEventDriven(lock, ActionListener.wrap( + response -> { + log.debug("Released threat intel source config lock with id [{}]", lock.getLockId()); + }, + ex -> { + log.error(String.format("Unexpected failure while trying to release lock [%s] for tif source config [%s].", lock.getLockId(), saTifSourceConfig.getId()), ex); + } + )); } )); }, e -> { diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/service/SATIFSourceConfigService.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/service/SATIFSourceConfigService.java index ed58e3371..bf5722a0f 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/service/SATIFSourceConfigService.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/service/SATIFSourceConfigService.java @@ -137,9 +137,17 @@ public void indexTIFSourceConfig(SATIFSourceConfig saTifSourceConfig, actionListener.onFailure(e); } }, exception -> { - lockService.releaseLock(lock); - log.error("Failed to release lock", exception); - actionListener.onFailure(exception); + log.error("Failed to create threat intel source config index", exception); + lockService.releaseLockEventDriven(lock, ActionListener.wrap( + r -> { + log.debug("Released threat intel source config lock with id [{}]", lock.getLockId()); + actionListener.onFailure(exception); + }, + ex -> { + log.error(String.format("Unexpected failure while trying to release lock [%s] for threat intel source config.", lock.getLockId()), ex); + actionListener.onFailure(exception); + } + )); }); createJobIndexIfNotExists(createIndexStepListener); } diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportIndexTIFSourceConfigAction.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportIndexTIFSourceConfigAction.java index d12e4542e..ecdeaa683 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportIndexTIFSourceConfigAction.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportIndexTIFSourceConfigAction.java @@ -107,12 +107,15 @@ private void retrieveLockAndCreateTIFConfig(SAIndexTIFSourceConfigRequest reques ActionListener.wrap( saTifSourceConfigDtoResponse -> { lockService.releaseLockEventDriven(lock, ActionListener.wrap( - r -> listener.onResponse(new SAIndexTIFSourceConfigResponse( - saTifSourceConfigDtoResponse.getId(), - saTifSourceConfigDtoResponse.getVersion(), - RestStatus.OK, - saTifSourceConfigDtoResponse - )), + r -> { + log.debug("Released threat intel source config lock with id [{}]", lock.getLockId()); + listener.onResponse(new SAIndexTIFSourceConfigResponse( + saTifSourceConfigDtoResponse.getId(), + saTifSourceConfigDtoResponse.getVersion(), + RestStatus.OK, + saTifSourceConfigDtoResponse + )); + }, e -> { log.error(String.format("Unexpected failure while trying to release lock [%s] for tif source config [%s].", lock.getLockId(), saTifSourceConfigDto.getId()), e); listener.onResponse(new SAIndexTIFSourceConfigResponse( @@ -124,15 +127,15 @@ private void retrieveLockAndCreateTIFConfig(SAIndexTIFSourceConfigRequest reques } )); }, e -> { + String action = RestRequest.Method.PUT.equals(request.getMethod()) ? "update" : "create"; + log.error(String.format("Failed to %s IOCs and threat intel source config", action), e); lockService.releaseLockEventDriven(lock, ActionListener.wrap( r -> { - log.error("Failed to create IOCs and threat intel source config", e); + log.debug("Released threat intel source config lock with id [{}]", lock.getLockId()); listener.onFailure(e); }, ex -> { - String action = RestRequest.Method.PUT.equals(request.getMethod()) ? "update" : "create"; - log.error(String.format("Failed to %s IOCs and threat intel source config", action), e); - log.error(String.format("Unexpected failure while trying to release lock [%s] for tif source config.", lock.getLockId()), e); + log.error(String.format("Unexpected failure while trying to release lock [%s] for tif source config.", lock.getLockId()), ex); listener.onFailure(e); } )); @@ -141,16 +144,15 @@ private void retrieveLockAndCreateTIFConfig(SAIndexTIFSourceConfigRequest reques ) ); } catch (Exception e) { - log.error("listener failed when executing", e); + String action = RestRequest.Method.PUT.equals(request.getMethod()) ? "update" : "create"; + log.error(String.format("Failed to %s IOCs and threat intel source config", action), e); lockService.releaseLockEventDriven(lock, ActionListener.wrap( r -> { - log.error("Failed to create IOCs and threat intel source config", e); + log.debug("Released threat intel source config lock with id [{}]", lock.getLockId()); listener.onFailure(e); }, ex -> { - String action = RestRequest.Method.PUT.equals(request.getMethod()) ? "update" : "create"; - log.error(String.format("Failed to %s IOCs and threat intel source config", action), e); - log.error(String.format("Unexpected failure while trying to release lock [%s] for tif source config.", lock.getLockId()), e); + log.error(String.format("Unexpected failure while trying to release lock [%s] for tif source config.", lock.getLockId()), ex); listener.onFailure(e); } )); diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java index 41bad5b1a..a72844e40 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java @@ -106,9 +106,17 @@ protected void doExecute(final Task task, final PutTIFJobRequest request, final try { internalDoExecute(request, lock, listener); } catch (Exception e) { - lockService.releaseLock(lock); - listener.onFailure(e); - log.error("listener failed when executing", e); + log.error("Failed execution to put tif job action", e); + lockService.releaseLockEventDriven(lock, ActionListener.wrap( + r -> { + log.debug("Released tif job parameter lock with id [{}]", lock.getLockId()); + listener.onFailure(e); + }, + ex -> { + log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), request.getName()), ex); + listener.onFailure(e); + } + )); } }, exception -> { listener.onFailure(exception); @@ -138,9 +146,17 @@ protected void internalDoExecute( listener.onFailure(e); } }, exception -> { - lockService.releaseLock(lock); - log.error("failed to release lock", exception); - listener.onFailure(exception); + log.error("Failed to save tif job parameter", exception); + lockService.releaseLockEventDriven(lock, ActionListener.wrap( + r -> { + log.debug("Released tif job parameter lock with id [{}]", lock.getLockId()); + listener.onFailure(exception); + }, + ex -> { + log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), request.getName()), ex); + listener.onFailure(exception); + } + )); }); tifJobParameterService.createJobIndexIfNotExists(createIndexStepListener); } @@ -160,22 +176,40 @@ protected ActionListener postIndexingTifJobParameter( createThreatIntelFeedData(tifJobParameter, lockService.getRenewLockRunnable(lockReference), ActionListener.wrap( threatIntelIndicesResponse -> { if (threatIntelIndicesResponse.isAcknowledged()) { - lockService.releaseLock(lockReference.get()); - listener.onResponse(new AcknowledgedResponse(true)); + lockService.releaseLockEventDriven(lockReference.get(), ActionListener.wrap( + r -> { + log.debug("Released tif job parameter lock with id [{}]", lock.getLockId()); + listener.onResponse(new AcknowledgedResponse(true)); + }, + ex -> { + log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), tifJobParameter.getName()), ex); + listener.onFailure(ex); + } + )); } else { listener.onFailure(new OpenSearchStatusException("creation of threat intel feed data failed", RestStatus.INTERNAL_SERVER_ERROR)); } }, listener::onFailure )); }, e -> { - lockService.releaseLock(lock); + Exception exception; if (e instanceof VersionConflictEngineException) { log.error("tifJobParameter already exists"); - listener.onFailure(new ResourceAlreadyExistsException("tifJobParameter [{}] already exists", tifJobParameter.getName())); + exception = new ResourceAlreadyExistsException("tifJobParameter [{}] already exists", tifJobParameter.getName()); } else { log.error("Internal server error"); - listener.onFailure(e); + exception = e; } + lockService.releaseLockEventDriven(lock, ActionListener.wrap( + r -> { + log.debug("Released tif job parameter lock with id [{}]", lock.getLockId()); + listener.onFailure(exception); + }, + ex -> { + log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), tifJobParameter.getName()), ex); + listener.onFailure(exception); + } + )); } ); } diff --git a/src/test/java/org/opensearch/securityanalytics/threatIntel/common/ThreatIntelLockServiceTests.java b/src/test/java/org/opensearch/securityanalytics/threatIntel/common/ThreatIntelLockServiceTests.java index ba6f54926..ec6041322 100644 --- a/src/test/java/org/opensearch/securityanalytics/threatIntel/common/ThreatIntelLockServiceTests.java +++ b/src/test/java/org/opensearch/securityanalytics/threatIntel/common/ThreatIntelLockServiceTests.java @@ -12,6 +12,7 @@ import java.time.Instant; import java.util.concurrent.atomic.AtomicReference; +import org.junit.Assert; import org.junit.Before; import org.opensearch.action.DocWriteResponse; import org.opensearch.action.update.UpdateRequest; @@ -59,7 +60,9 @@ public void testReleaseLock_whenValidInput_thenSucceed() { LOCK_DURATION_IN_SECONDS, false ); - noOpsLockService.releaseLock(lockModel); + noOpsLockService.releaseLockEventDriven(lockModel, ActionListener.wrap( + Assert::assertFalse, e -> fail() + )); } public void testRenewLock_whenCalled_thenNotBlocked() {