Skip to content

Commit

Permalink
fix release lock for previous threat intel
Browse files Browse the repository at this point in the history
Signed-off-by: Joanne Wang <[email protected]>
  • Loading branch information
jowg-amazon committed Aug 16, 2024
1 parent 7c18019 commit 78b3351
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,24 @@ protected Runnable updateJobRunner(final ScheduledJobParameter jobParameter) {
ActionListener.wrap(lock -> {
updateJobParameter(jobParameter, lockService.getRenewLockRunnable(new AtomicReference<>(lock)),
ActionListener.wrap(
r -> lockService.releaseLock(lock),
r -> lockService.releaseLockEventDriven(lock, ActionListener.wrap(
response -> {
log.debug("Released tif job parameter lock with id [{}]", lock.getLockId());
},
ex -> {
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), jobParameter.getName()), ex);
}
)),
e -> {
log.error("Failed to update job parameter " + jobParameter.getName(), e);
lockService.releaseLock(lock);
lockService.releaseLockEventDriven(lock, ActionListener.wrap(
response -> {
log.debug("Released tif job parameter lock with id [{}]", lock.getLockId());
},
ex -> {
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), jobParameter.getName()), ex);
}
));
}
));
}, e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,17 @@ protected void doExecute(final Task task, final PutTIFJobRequest request, final
try {
internalDoExecute(request, lock, listener);
} catch (Exception e) {
lockService.releaseLock(lock);
listener.onFailure(e);
log.error("listener failed when executing", e);
log.error("Failed execution to put tif job action", e);
lockService.releaseLockEventDriven(lock, ActionListener.wrap(
r -> {
log.debug("Released tif job parameter lock with id [{}]", lock.getLockId());
listener.onFailure(e);
},
ex -> {
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), request.getName()), ex);
listener.onFailure(e);
}
));
}
}, exception -> {
listener.onFailure(exception);
Expand Down Expand Up @@ -138,9 +146,17 @@ protected void internalDoExecute(
listener.onFailure(e);
}
}, exception -> {
lockService.releaseLock(lock);
log.error("failed to release lock", exception);
listener.onFailure(exception);
log.error("Failed to save tif job parameter", exception);
lockService.releaseLockEventDriven(lock, ActionListener.wrap(
r -> {
log.debug("Released tif job parameter lock with id [{}]", lock.getLockId());
listener.onFailure(exception);
},
ex -> {
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), request.getName()), ex);
listener.onFailure(exception);
}
));
});
tifJobParameterService.createJobIndexIfNotExists(createIndexStepListener);
}
Expand All @@ -160,22 +176,40 @@ protected ActionListener<IndexResponse> postIndexingTifJobParameter(
createThreatIntelFeedData(tifJobParameter, lockService.getRenewLockRunnable(lockReference), ActionListener.wrap(
threatIntelIndicesResponse -> {
if (threatIntelIndicesResponse.isAcknowledged()) {
lockService.releaseLock(lockReference.get());
listener.onResponse(new AcknowledgedResponse(true));
lockService.releaseLockEventDriven(lockReference.get(), ActionListener.wrap(
r -> {
log.debug("Released tif job parameter lock with id [{}]", lock.getLockId());
listener.onResponse(new AcknowledgedResponse(true));
},
ex -> {
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), tifJobParameter.getName()), ex);
listener.onFailure(ex);
}
));
} else {
listener.onFailure(new OpenSearchStatusException("creation of threat intel feed data failed", RestStatus.INTERNAL_SERVER_ERROR));
}
}, listener::onFailure
));
}, e -> {
lockService.releaseLock(lock);
Exception exception;
if (e instanceof VersionConflictEngineException) {
log.error("tifJobParameter already exists");
listener.onFailure(new ResourceAlreadyExistsException("tifJobParameter [{}] already exists", tifJobParameter.getName()));
exception = new ResourceAlreadyExistsException("tifJobParameter [{}] already exists", tifJobParameter.getName());
} else {
log.error("Internal server error");
listener.onFailure(e);
exception = e;
}
lockService.releaseLockEventDriven(lock, ActionListener.wrap(
r -> {
log.debug("Released tif job parameter lock with id [{}]", lock.getLockId());
listener.onFailure(exception);
},
ex -> {
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), tifJobParameter.getName()), ex);
listener.onFailure(exception);
}
));
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.time.Instant;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Assert;
import org.junit.Before;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.update.UpdateRequest;
Expand Down Expand Up @@ -59,7 +60,9 @@ public void testReleaseLock_whenValidInput_thenSucceed() {
LOCK_DURATION_IN_SECONDS,
false
);
noOpsLockService.releaseLock(lockModel);
noOpsLockService.releaseLockEventDriven(lockModel, ActionListener.wrap(
Assert::assertFalse, e -> fail()
));
}

public void testRenewLock_whenCalled_thenNotBlocked() {
Expand Down

0 comments on commit 78b3351

Please sign in to comment.