From 5a9ea810d6a1f55cec6fdcf693a3d109ce3d80fc Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 20 Dec 2018 15:53:44 +0000 Subject: [PATCH] [ML] Wait for job updates in AutoDetectResultProcessor.awaitCompletion (#36856) There was a race where the job update in `AutoDetectResultProcessor.updateEstablishedModelMemoryOnJob` could execute after `AutoDetectResultProcessor.awaitCompletion` returned. This was because ` jobUpdateSemaphore` was acquired after the call to `jobResultsProvider.getEstablishedMemoryUsage` and during that call `awaitCompletion` is free to acquire and release the semaphore after which the method returns. This commit fixes the problem. Closes #36849 --- .../output/AutoDetectResultProcessor.java | 72 +++++++++++-------- 1 file changed, 41 insertions(+), 31 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index 28d6d76eb8395..ab45c7f6017c5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -432,41 +432,51 @@ private void updateEstablishedModelMemoryOnJob() { // We need to make all results written up to and including these stats available for the established memory calculation persister.commitResultWrites(jobId); + try { + jobUpdateSemaphore.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.info("[{}] Interrupted acquiring update established model memory semaphore", jobId); + return; + } + jobResultsProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStatsForCalc, establishedModelMemory -> { if (latestEstablishedModelMemory != establishedModelMemory) { - client.threadPool().executor(MachineLearning.UTILITY_THREAD_POOL_NAME).submit(() -> { - try { - jobUpdateSemaphore.acquire(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOGGER.info("[{}] Interrupted acquiring update established model memory semaphore", jobId); - return; - } - - JobUpdate update = new JobUpdate.Builder(jobId).setEstablishedModelMemory(establishedModelMemory).build(); - UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update); - updateRequest.setWaitForAck(false); - - executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, - new ActionListener() { - @Override - public void onResponse(PutJobAction.Response response) { - jobUpdateSemaphore.release(); - latestEstablishedModelMemory = establishedModelMemory; - LOGGER.debug("[{}] Updated job with established model memory [{}]", jobId, establishedModelMemory); - } - - @Override - public void onFailure(Exception e) { - jobUpdateSemaphore.release(); - LOGGER.error("[" + jobId + "] Failed to update job with new established model memory [" + - establishedModelMemory + "]", e); - } - }); - }); + try { + client.threadPool().executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> { + JobUpdate update = new JobUpdate.Builder(jobId).setEstablishedModelMemory(establishedModelMemory).build(); + UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update); + updateRequest.setWaitForAck(false); + + executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, + new ActionListener() { + @Override + public void onResponse(PutJobAction.Response response) { + jobUpdateSemaphore.release(); + latestEstablishedModelMemory = establishedModelMemory; + LOGGER.debug("[{}] Updated job with established model memory [{}]", jobId, establishedModelMemory); + } + + @Override + public void onFailure(Exception e) { + jobUpdateSemaphore.release(); + LOGGER.error("[" + jobId + "] Failed to update job with new established model memory [" + + establishedModelMemory + "]", e); + } + }); + }); + } catch (Exception e) { + jobUpdateSemaphore.release(); + LOGGER.error("[" + jobId + "] error submitting established model memory update action", e); + } + } else { + jobUpdateSemaphore.release(); } - }, e -> LOGGER.error("[" + jobId + "] Failed to calculate established model memory", e)); + }, e -> { + jobUpdateSemaphore.release(); + LOGGER.error("[" + jobId + "] Failed to calculate established model memory", e); + }); } public void awaitCompletion() throws TimeoutException {