Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Auto flush checkpoint queue if too many are waiting (#279)
Browse files Browse the repository at this point in the history
Our performance testing finds that our checkpoint queue can increase quickly.  This can happen during maintenance if there are a lot of entities in cache and very few cache swap outs happen in the past hour.  When an entity's state is swapped out, we try to save a checkpoint.  If we haven't done so for an entity within one hour, we put the checkpoint to a buffer and do a flush at the end of maintenance.  Since we only flush the 1st 1000 queued requeues to disk, a lot of requests may still wait in the queue until the next flush happens.  This is not ideal and can cause memory outages.

This PR triggers another flush after the previous flush finishes if there are a lot of queued requests.

This PR also corrects the LImitExceededException when a circuit breaker is open: previously, we send a LImitExceededException that stops the detector immediately, which leaves no room for the detector to recover.  This PR fixes that by changing the LImitExceededException's stop now flag to be false to give the detector a few more intervals to recover.
  • Loading branch information
kaituo authored Oct 19, 2020
1 parent 9a8d1f2 commit 1c02172
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,10 @@ public CheckpointDao(
this.clock = clock;
this.checkpointInterval = checkpointInterval;
this.indexUtil = indexUtil;
// each checkpoint with model initialized is roughly 250 KB if we are using shingle size 1 with 1 feature
// 1k limit will send 250 KB * 1000 = 250 MB
this.maxBulkRequestSize = maxBulkRequestSize;
// 1 bulk request per minute. 1 / 60 seconds = 0. 02
// 1 bulk request per 1/bulkPerSecond seconds.
this.bulkRateLimiter = RateLimiter.create(bulkPerSecond);
}

Expand Down Expand Up @@ -283,6 +285,10 @@ private void flush(BulkRequest bulkRequest) {
clientUtil.<BulkRequest, BulkResponse>execute(BulkAction.INSTANCE, bulkRequest, ActionListener.wrap(r -> {
if (r.hasFailures()) {
requests.addAll(BulkUtil.getIndexRequestToRetry(bulkRequest, r));
} else if (requests.size() >= maxBulkRequestSize / 2) {
// during maintenance, we may have much more waiting in the queue.
// trigger another flush if that's the case.
flush();
}
}, e -> {
logger.error("Failed bulking checkpoints", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ public EntityResultTransportAction(
@Override
protected void doExecute(Task task, EntityResultRequest request, ActionListener<AcknowledgedResponse> listener) {
if (adCircuitBreakerService.isOpen()) {
listener.onFailure(new LimitExceededException(request.getDetectorId(), CommonErrorMessages.MEMORY_CIRCUIT_BROKEN_ERR_MSG));
listener
.onFailure(new LimitExceededException(request.getDetectorId(), CommonErrorMessages.MEMORY_CIRCUIT_BROKEN_ERR_MSG, false));
return;
}

Expand Down

0 comments on commit 1c02172

Please sign in to comment.