diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/ml/CheckpointDao.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/ml/CheckpointDao.java index f4c41de3..d6c388e4 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/ml/CheckpointDao.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/ml/CheckpointDao.java @@ -151,8 +151,10 @@ public CheckpointDao( this.clock = clock; this.checkpointInterval = checkpointInterval; this.indexUtil = indexUtil; + // each checkpoint with model initialized is roughly 250 KB if we are using shingle size 1 with 1 feature + // 1k limit will send 250 KB * 1000 = 250 MB this.maxBulkRequestSize = maxBulkRequestSize; - // 1 bulk request per minute. 1 / 60 seconds = 0. 02 + // 1 bulk request per 1/bulkPerSecond seconds. this.bulkRateLimiter = RateLimiter.create(bulkPerSecond); } @@ -283,6 +285,10 @@ private void flush(BulkRequest bulkRequest) { clientUtil.execute(BulkAction.INSTANCE, bulkRequest, ActionListener.wrap(r -> { if (r.hasFailures()) { requests.addAll(BulkUtil.getIndexRequestToRetry(bulkRequest, r)); + } else if (requests.size() >= maxBulkRequestSize / 2) { + // during maintenance, we may have much more waiting in the queue. + // trigger another flush if that's the case. + flush(); } }, e -> { logger.error("Failed bulking checkpoints", e); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/EntityResultTransportAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/EntityResultTransportAction.java index 655b1feb..cbf23734 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/EntityResultTransportAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/EntityResultTransportAction.java @@ -93,7 +93,8 @@ public EntityResultTransportAction( @Override protected void doExecute(Task task, EntityResultRequest request, ActionListener listener) { if (adCircuitBreakerService.isOpen()) { - listener.onFailure(new LimitExceededException(request.getDetectorId(), CommonErrorMessages.MEMORY_CIRCUIT_BROKEN_ERR_MSG)); + listener + .onFailure(new LimitExceededException(request.getDetectorId(), CommonErrorMessages.MEMORY_CIRCUIT_BROKEN_ERR_MSG, false)); return; }