Skip to content

Commit

Permalink
Make threat intel source config release lock event driven (#1254)
Browse files Browse the repository at this point in the history
* threat intel release lock event driven

Signed-off-by: Joanne Wang <[email protected]>

* fix release lock for previous threat intel

Signed-off-by: Joanne Wang <[email protected]>

---------

Signed-off-by: Joanne Wang <[email protected]>
(cherry picked from commit 890493a)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Aug 20, 2024
1 parent 2e4d666 commit bc85fc4
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,24 @@ protected Runnable updateJobRunner(final ScheduledJobParameter jobParameter) {
ActionListener.wrap(lock -> {
updateJobParameter(jobParameter, lockService.getRenewLockRunnable(new AtomicReference<>(lock)),
ActionListener.wrap(
r -> lockService.releaseLock(lock),
r -> lockService.releaseLockEventDriven(lock, ActionListener.wrap(

Check warning on line 118 in src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunner.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunner.java#L118

Added line #L118 was not covered by tests
response -> {
log.debug("Released tif job parameter lock with id [{}]", lock.getLockId());
},

Check warning on line 121 in src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunner.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunner.java#L120-L121

Added lines #L120 - L121 were not covered by tests
ex -> {
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), jobParameter.getName()), ex);
}

Check warning on line 124 in src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunner.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunner.java#L123-L124

Added lines #L123 - L124 were not covered by tests
)),
e -> {
log.error("Failed to update job parameter " + jobParameter.getName(), e);
lockService.releaseLock(lock);
lockService.releaseLockEventDriven(lock, ActionListener.wrap(

Check warning on line 128 in src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunner.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunner.java#L128

Added line #L128 was not covered by tests
response -> {
log.debug("Released tif job parameter lock with id [{}]", lock.getLockId());
},

Check warning on line 131 in src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunner.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunner.java#L130-L131

Added lines #L130 - L131 were not covered by tests
ex -> {
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), jobParameter.getName()), ex);
}

Check warning on line 134 in src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunner.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunner.java#L133-L134

Added lines #L133 - L134 were not covered by tests
));
}
));
}, e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,26 @@ protected Runnable retrieveLockAndUpdateConfig(final SATIFSourceConfig saTifSour
ActionListener.wrap(lock -> {
updateSourceConfigAndIOCs(saTifSourceConfig, lockService.getRenewLockRunnable(new AtomicReference<>(lock)),
ActionListener.wrap(
r -> lockService.releaseLock(lock),
r -> {
lockService.releaseLockEventDriven(lock, ActionListener.wrap(

Check warning on line 111 in src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFSourceConfigRunner.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFSourceConfigRunner.java#L111

Added line #L111 was not covered by tests
response -> {
log.debug("Released threat intel source config lock with id [{}]", lock.getLockId());
},

Check warning on line 114 in src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFSourceConfigRunner.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFSourceConfigRunner.java#L113-L114

Added lines #L113 - L114 were not covered by tests
ex -> {
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif source config [%s].", lock.getLockId(), saTifSourceConfig.getId()), ex);
}

Check warning on line 117 in src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFSourceConfigRunner.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFSourceConfigRunner.java#L116-L117

Added lines #L116 - L117 were not covered by tests
));
},

Check warning on line 119 in src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFSourceConfigRunner.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFSourceConfigRunner.java#L119

Added line #L119 was not covered by tests
e -> {
log.error("Failed to update threat intel source config " + saTifSourceConfig.getName(), e);
lockService.releaseLock(lock);
lockService.releaseLockEventDriven(lock, ActionListener.wrap(

Check warning on line 122 in src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFSourceConfigRunner.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFSourceConfigRunner.java#L122

Added line #L122 was not covered by tests
response -> {
log.debug("Released threat intel source config lock with id [{}]", lock.getLockId());
},

Check warning on line 125 in src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFSourceConfigRunner.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFSourceConfigRunner.java#L124-L125

Added lines #L124 - L125 were not covered by tests
ex -> {
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif source config [%s].", lock.getLockId(), saTifSourceConfig.getId()), ex);
}

Check warning on line 128 in src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFSourceConfigRunner.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFSourceConfigRunner.java#L127-L128

Added lines #L127 - L128 were not covered by tests
));
}
));
}, e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,17 @@ public void indexTIFSourceConfig(SATIFSourceConfig saTifSourceConfig,
actionListener.onFailure(e);
}
}, exception -> {
lockService.releaseLock(lock);
log.error("Failed to release lock", exception);
actionListener.onFailure(exception);
log.error("Failed to create threat intel source config index", exception);
lockService.releaseLockEventDriven(lock, ActionListener.wrap(

Check warning on line 141 in src/main/java/org/opensearch/securityanalytics/threatIntel/service/SATIFSourceConfigService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/service/SATIFSourceConfigService.java#L140-L141

Added lines #L140 - L141 were not covered by tests
r -> {
log.debug("Released threat intel source config lock with id [{}]", lock.getLockId());
actionListener.onFailure(exception);
},

Check warning on line 145 in src/main/java/org/opensearch/securityanalytics/threatIntel/service/SATIFSourceConfigService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/service/SATIFSourceConfigService.java#L143-L145

Added lines #L143 - L145 were not covered by tests
ex -> {
log.error(String.format("Unexpected failure while trying to release lock [%s] for threat intel source config.", lock.getLockId()), ex);
actionListener.onFailure(exception);
}

Check warning on line 149 in src/main/java/org/opensearch/securityanalytics/threatIntel/service/SATIFSourceConfigService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/service/SATIFSourceConfigService.java#L147-L149

Added lines #L147 - L149 were not covered by tests
));
});
createJobIndexIfNotExists(createIndexStepListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,15 @@ private void retrieveLockAndCreateTIFConfig(SAIndexTIFSourceConfigRequest reques
ActionListener.wrap(
saTifSourceConfigDtoResponse -> {
lockService.releaseLockEventDriven(lock, ActionListener.wrap(
r -> listener.onResponse(new SAIndexTIFSourceConfigResponse(
saTifSourceConfigDtoResponse.getId(),
saTifSourceConfigDtoResponse.getVersion(),
RestStatus.OK,
saTifSourceConfigDtoResponse
)),
r -> {
log.debug("Released threat intel source config lock with id [{}]", lock.getLockId());
listener.onResponse(new SAIndexTIFSourceConfigResponse(
saTifSourceConfigDtoResponse.getId(),
saTifSourceConfigDtoResponse.getVersion(),
RestStatus.OK,
saTifSourceConfigDtoResponse
));
},
e -> {
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif source config [%s].", lock.getLockId(), saTifSourceConfigDto.getId()), e);
listener.onResponse(new SAIndexTIFSourceConfigResponse(
Expand All @@ -124,15 +127,15 @@ private void retrieveLockAndCreateTIFConfig(SAIndexTIFSourceConfigRequest reques
}
));
}, e -> {
String action = RestRequest.Method.PUT.equals(request.getMethod()) ? "update" : "create";
log.error(String.format("Failed to %s IOCs and threat intel source config", action), e);
lockService.releaseLockEventDriven(lock, ActionListener.wrap(
r -> {
log.error("Failed to create IOCs and threat intel source config", e);
log.debug("Released threat intel source config lock with id [{}]", lock.getLockId());
listener.onFailure(e);
},
ex -> {
String action = RestRequest.Method.PUT.equals(request.getMethod()) ? "update" : "create";
log.error(String.format("Failed to %s IOCs and threat intel source config", action), e);
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif source config.", lock.getLockId()), e);
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif source config.", lock.getLockId()), ex);

Check warning on line 138 in src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportIndexTIFSourceConfigAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportIndexTIFSourceConfigAction.java#L138

Added line #L138 was not covered by tests
listener.onFailure(e);
}
));
Expand All @@ -141,16 +144,15 @@ private void retrieveLockAndCreateTIFConfig(SAIndexTIFSourceConfigRequest reques
)
);
} catch (Exception e) {
log.error("listener failed when executing", e);
String action = RestRequest.Method.PUT.equals(request.getMethod()) ? "update" : "create";
log.error(String.format("Failed to %s IOCs and threat intel source config", action), e);

Check warning on line 148 in src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportIndexTIFSourceConfigAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportIndexTIFSourceConfigAction.java#L148

Added line #L148 was not covered by tests
lockService.releaseLockEventDriven(lock, ActionListener.wrap(
r -> {
log.error("Failed to create IOCs and threat intel source config", e);
log.debug("Released threat intel source config lock with id [{}]", lock.getLockId());

Check warning on line 151 in src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportIndexTIFSourceConfigAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportIndexTIFSourceConfigAction.java#L151

Added line #L151 was not covered by tests
listener.onFailure(e);
},
ex -> {
String action = RestRequest.Method.PUT.equals(request.getMethod()) ? "update" : "create";
log.error(String.format("Failed to %s IOCs and threat intel source config", action), e);
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif source config.", lock.getLockId()), e);
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif source config.", lock.getLockId()), ex);

Check warning on line 155 in src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportIndexTIFSourceConfigAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportIndexTIFSourceConfigAction.java#L155

Added line #L155 was not covered by tests
listener.onFailure(e);
}
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,17 @@ protected void doExecute(final Task task, final PutTIFJobRequest request, final
try {
internalDoExecute(request, lock, listener);
} catch (Exception e) {
lockService.releaseLock(lock);
listener.onFailure(e);
log.error("listener failed when executing", e);
log.error("Failed execution to put tif job action", e);
lockService.releaseLockEventDriven(lock, ActionListener.wrap(

Check warning on line 110 in src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java#L109-L110

Added lines #L109 - L110 were not covered by tests
r -> {
log.debug("Released tif job parameter lock with id [{}]", lock.getLockId());
listener.onFailure(e);
},

Check warning on line 114 in src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java#L112-L114

Added lines #L112 - L114 were not covered by tests
ex -> {
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), request.getName()), ex);
listener.onFailure(e);
}

Check warning on line 118 in src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java#L116-L118

Added lines #L116 - L118 were not covered by tests
));
}
}, exception -> {
listener.onFailure(exception);
Expand Down Expand Up @@ -138,9 +146,17 @@ protected void internalDoExecute(
listener.onFailure(e);
}
}, exception -> {
lockService.releaseLock(lock);
log.error("failed to release lock", exception);
listener.onFailure(exception);
log.error("Failed to save tif job parameter", exception);
lockService.releaseLockEventDriven(lock, ActionListener.wrap(

Check warning on line 150 in src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java#L149-L150

Added lines #L149 - L150 were not covered by tests
r -> {
log.debug("Released tif job parameter lock with id [{}]", lock.getLockId());
listener.onFailure(exception);
},

Check warning on line 154 in src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java#L152-L154

Added lines #L152 - L154 were not covered by tests
ex -> {
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), request.getName()), ex);
listener.onFailure(exception);
}

Check warning on line 158 in src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java#L156-L158

Added lines #L156 - L158 were not covered by tests
));
});
tifJobParameterService.createJobIndexIfNotExists(createIndexStepListener);
}
Expand All @@ -160,22 +176,40 @@ protected ActionListener<IndexResponse> postIndexingTifJobParameter(
createThreatIntelFeedData(tifJobParameter, lockService.getRenewLockRunnable(lockReference), ActionListener.wrap(
threatIntelIndicesResponse -> {
if (threatIntelIndicesResponse.isAcknowledged()) {
lockService.releaseLock(lockReference.get());
listener.onResponse(new AcknowledgedResponse(true));
lockService.releaseLockEventDriven(lockReference.get(), ActionListener.wrap(
r -> {
log.debug("Released tif job parameter lock with id [{}]", lock.getLockId());
listener.onResponse(new AcknowledgedResponse(true));
},
ex -> {
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), tifJobParameter.getName()), ex);
listener.onFailure(ex);
}

Check warning on line 187 in src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java#L185-L187

Added lines #L185 - L187 were not covered by tests
));
} else {
listener.onFailure(new OpenSearchStatusException("creation of threat intel feed data failed", RestStatus.INTERNAL_SERVER_ERROR));
}
}, listener::onFailure
));
}, e -> {
lockService.releaseLock(lock);
Exception exception;
if (e instanceof VersionConflictEngineException) {
log.error("tifJobParameter already exists");
listener.onFailure(new ResourceAlreadyExistsException("tifJobParameter [{}] already exists", tifJobParameter.getName()));
exception = new ResourceAlreadyExistsException("tifJobParameter [{}] already exists", tifJobParameter.getName());

Check warning on line 198 in src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java#L198

Added line #L198 was not covered by tests
} else {
log.error("Internal server error");
listener.onFailure(e);
exception = e;

Check warning on line 201 in src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java#L201

Added line #L201 was not covered by tests
}
lockService.releaseLockEventDriven(lock, ActionListener.wrap(

Check warning on line 203 in src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java#L203

Added line #L203 was not covered by tests
r -> {
log.debug("Released tif job parameter lock with id [{}]", lock.getLockId());
listener.onFailure(exception);
},

Check warning on line 207 in src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java#L205-L207

Added lines #L205 - L207 were not covered by tests
ex -> {
log.error(String.format("Unexpected failure while trying to release lock [%s] for tif job parameter [%s].", lock.getLockId(), tifJobParameter.getName()), ex);
listener.onFailure(exception);
}

Check warning on line 211 in src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/transport/TransportPutTIFJobAction.java#L209-L211

Added lines #L209 - L211 were not covered by tests
));
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.time.Instant;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Assert;
import org.junit.Before;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.update.UpdateRequest;
Expand Down Expand Up @@ -59,7 +60,9 @@ public void testReleaseLock_whenValidInput_thenSucceed() {
LOCK_DURATION_IN_SECONDS,
false
);
noOpsLockService.releaseLock(lockModel);
noOpsLockService.releaseLockEventDriven(lockModel, ActionListener.wrap(
Assert::assertFalse, e -> fail()
));
}

public void testRenewLock_whenCalled_thenNotBlocked() {
Expand Down

0 comments on commit bc85fc4

Please sign in to comment.