From 432136171b9042ad3e58d5f68dee05d56bd809bc Mon Sep 17 00:00:00 2001 From: Bhavana Ramaram Date: Tue, 23 Jul 2024 18:16:32 -0500 Subject: [PATCH] add acknowledge check for index creation in missing places Signed-off-by: Bhavana Ramaram --- .../ml/engine/encryptor/EncryptorImpl.java | 105 ++++++++++-------- .../engine/encryptor/EncryptorImplTest.java | 16 +++ .../upload_chunk/MLModelChunkUploader.java | 4 + .../opensearch/ml/cluster/MLSyncUpCron.java | 26 +++-- .../ml/model/MLModelGroupManager.java | 4 + .../opensearch/ml/model/MLModelManager.java | 16 +++ .../MLModelChunkUploaderTests.java | 14 +++ .../ml/model/MLModelGroupManagerTests.java | 17 +++ .../ml/model/MLModelManagerTests.java | 14 +++ 9 files changed, 159 insertions(+), 57 deletions(-) diff --git a/ml-algorithms/src/main/java/org/opensearch/ml/engine/encryptor/EncryptorImpl.java b/ml-algorithms/src/main/java/org/opensearch/ml/engine/encryptor/EncryptorImpl.java index 1c02a7f915..6bf2adfa55 100644 --- a/ml-algorithms/src/main/java/org/opensearch/ml/engine/encryptor/EncryptorImpl.java +++ b/ml-algorithms/src/main/java/org/opensearch/ml/engine/encryptor/EncryptorImpl.java @@ -114,58 +114,65 @@ private void initMasterKey() { CountDownLatch latch = new CountDownLatch(1); mlIndicesHandler.initMLConfigIndex(ActionListener.wrap(r -> { - GetRequest getRequest = new GetRequest(ML_CONFIG_INDEX).id(MASTER_KEY); - try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { - client.get(getRequest, ActionListener.wrap(getResponse -> { - if (getResponse == null || !getResponse.isExists()) { - IndexRequest indexRequest = new IndexRequest(ML_CONFIG_INDEX).id(MASTER_KEY); - final String generatedMasterKey = generateMasterKey(); - indexRequest - .source(ImmutableMap.of(MASTER_KEY, generatedMasterKey, CREATE_TIME_FIELD, Instant.now().toEpochMilli())); - indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - indexRequest.opType(DocWriteRequest.OpType.CREATE); - client.index(indexRequest, ActionListener.wrap(indexResponse -> { - this.masterKey = generatedMasterKey; - log.info("ML encryption master key initialized successfully"); - latch.countDown(); - }, e -> { - - if (ExceptionUtils.getRootCause(e) instanceof VersionConflictEngineException) { - GetRequest getMasterKeyRequest = new GetRequest(ML_CONFIG_INDEX).id(MASTER_KEY); - try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) { - client.get(getMasterKeyRequest, ActionListener.wrap(getMasterKeyResponse -> { - if (getMasterKeyResponse != null && getMasterKeyResponse.isExists()) { - final String masterKey = (String) getMasterKeyResponse.getSourceAsMap().get(MASTER_KEY); - this.masterKey = masterKey; - log.info("ML encryption master key already initialized, no action needed"); - latch.countDown(); - } else { - exceptionRef.set(new ResourceNotFoundException(MASTER_KEY_NOT_READY_ERROR)); + if (!r) { + exceptionRef.set(new RuntimeException("No response to create ML Config index")); + latch.countDown(); + } else { + GetRequest getRequest = new GetRequest(ML_CONFIG_INDEX).id(MASTER_KEY); + try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { + client.get(getRequest, ActionListener.wrap(getResponse -> { + if (getResponse == null || !getResponse.isExists()) { + IndexRequest indexRequest = new IndexRequest(ML_CONFIG_INDEX).id(MASTER_KEY); + final String generatedMasterKey = generateMasterKey(); + indexRequest + .source(ImmutableMap.of(MASTER_KEY, generatedMasterKey, CREATE_TIME_FIELD, Instant.now().toEpochMilli())); + indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + indexRequest.opType(DocWriteRequest.OpType.CREATE); + client.index(indexRequest, ActionListener.wrap(indexResponse -> { + this.masterKey = generatedMasterKey; + log.info("ML encryption master key initialized successfully"); + latch.countDown(); + }, e -> { + + if (ExceptionUtils.getRootCause(e) instanceof VersionConflictEngineException) { + GetRequest getMasterKeyRequest = new GetRequest(ML_CONFIG_INDEX).id(MASTER_KEY); + try ( + ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext() + ) { + client.get(getMasterKeyRequest, ActionListener.wrap(getMasterKeyResponse -> { + if (getMasterKeyResponse != null && getMasterKeyResponse.isExists()) { + final String masterKey = (String) getMasterKeyResponse.getSourceAsMap().get(MASTER_KEY); + this.masterKey = masterKey; + log.info("ML encryption master key already initialized, no action needed"); + latch.countDown(); + } else { + exceptionRef.set(new ResourceNotFoundException(MASTER_KEY_NOT_READY_ERROR)); + latch.countDown(); + } + }, error -> { + log.debug("Failed to get ML encryption master key", e); + exceptionRef.set(error); latch.countDown(); - } - }, error -> { - log.debug("Failed to get ML encryption master key", e); - exceptionRef.set(error); - latch.countDown(); - })); + })); + } + } else { + log.debug("Failed to index ML encryption master key", e); + exceptionRef.set(e); + latch.countDown(); } - } else { - log.debug("Failed to index ML encryption master key", e); - exceptionRef.set(e); - latch.countDown(); - } - })); - } else { - final String masterKey = (String) getResponse.getSourceAsMap().get(MASTER_KEY); - this.masterKey = masterKey; - log.info("ML encryption master key already initialized, no action needed"); + })); + } else { + final String masterKey = (String) getResponse.getSourceAsMap().get(MASTER_KEY); + this.masterKey = masterKey; + log.info("ML encryption master key already initialized, no action needed"); + latch.countDown(); + } + }, e -> { + log.debug("Failed to get ML encryption master key from config index", e); + exceptionRef.set(e); latch.countDown(); - } - }, e -> { - log.debug("Failed to get ML encryption master key from config index", e); - exceptionRef.set(e); - latch.countDown(); - })); + })); + } } }, e -> { log.debug("Failed to init ML config index", e); diff --git a/ml-algorithms/src/test/java/org/opensearch/ml/engine/encryptor/EncryptorImplTest.java b/ml-algorithms/src/test/java/org/opensearch/ml/engine/encryptor/EncryptorImplTest.java index bd228fb665..ba3b3a9167 100644 --- a/ml-algorithms/src/test/java/org/opensearch/ml/engine/encryptor/EncryptorImplTest.java +++ b/ml-algorithms/src/test/java/org/opensearch/ml/engine/encryptor/EncryptorImplTest.java @@ -499,6 +499,22 @@ public void decrypt_NullMasterKey_GetMasterKey_Exception() { encryptor.decrypt("test"); } + @Test + public void decrypt_NoResponseToInitConfigIndex() { + exceptionRule.expect(RuntimeException.class); + exceptionRule.expectMessage("No response to create ML Config index"); + + doAnswer(invocation -> { + ActionListener actionListener = (ActionListener) invocation.getArgument(0); + actionListener.onResponse(false); + return null; + }).when(mlIndicesHandler).initMLConfigIndex(any()); + + Encryptor encryptor = new EncryptorImpl(clusterService, client, mlIndicesHandler); + Assert.assertNull(encryptor.getMasterKey()); + encryptor.decrypt("test"); + } + @Test public void decrypt_MLConfigIndexNotFound() { exceptionRule.expect(ResourceNotFoundException.class); diff --git a/plugin/src/main/java/org/opensearch/ml/action/upload_chunk/MLModelChunkUploader.java b/plugin/src/main/java/org/opensearch/ml/action/upload_chunk/MLModelChunkUploader.java index 683536e21e..ff912ccda0 100644 --- a/plugin/src/main/java/org/opensearch/ml/action/upload_chunk/MLModelChunkUploader.java +++ b/plugin/src/main/java/org/opensearch/ml/action/upload_chunk/MLModelChunkUploader.java @@ -103,6 +103,10 @@ public void uploadModelChunk(MLUploadModelChunkInput uploadModelChunkInput, Acti throw new Exception("Chunk size exceeds 10MB"); } mlIndicesHandler.initModelIndexIfAbsent(ActionListener.wrap(res -> { + if (!res) { + wrappedListener.onFailure(new RuntimeException("No response to create ML Model index")); + return; + } int chunkNum = uploadModelChunkInput.getChunkNumber(); MLModel mlModel = MLModel .builder() diff --git a/plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java b/plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java index 44d75638f4..07098996c8 100644 --- a/plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java +++ b/plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java @@ -186,10 +186,13 @@ public void run() { return; } // refresh model status - mlIndicesHandler - .initModelIndexIfAbsent(ActionListener.wrap(res -> { refreshModelState(modelWorkerNodes, deployingModels); }, e -> { - log.error("Failed to init model index", e); - })); + mlIndicesHandler.initModelIndexIfAbsent(ActionListener.wrap(res -> { + if (!res) { + log.error("No response to create ML model index"); + return; + } + refreshModelState(modelWorkerNodes, deployingModels); + }, e -> { log.error("Failed to init model index", e); })); }, ex -> { log.error("Failed to sync model routing", ex); })); }, e -> { log.error("Failed to sync model routing", e); })); } @@ -211,10 +214,13 @@ private void undeployExpiredModels( log.debug("Received failures in undeploying expired models", mlUndeployModelNodesResponse.failures()); } - mlIndicesHandler - .initModelIndexIfAbsent(ActionListener.wrap(res -> { refreshModelState(modelWorkerNodes, deployingModels); }, e -> { - log.error("Failed to init model index", e); - })); + mlIndicesHandler.initModelIndexIfAbsent(ActionListener.wrap(res -> { + if (!res) { + log.error("No response to create ML model index"); + return; + } + refreshModelState(modelWorkerNodes, deployingModels); + }, e -> { log.error("Failed to init model index", e); })); }, e -> { log.error("Failed to undeploy models {}", expiredModels, e); })); } @@ -224,6 +230,10 @@ void initMLConfig() { return; } mlIndicesHandler.initMLConfigIndex(ActionListener.wrap(r -> { + if (!r) { + log.debug("Failed to initialize or update ML Config index"); + return; + } GetRequest getRequest = new GetRequest(ML_CONFIG_INDEX).id(MASTER_KEY); try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { client.get(getRequest, ActionListener.wrap(getResponse -> { diff --git a/plugin/src/main/java/org/opensearch/ml/model/MLModelGroupManager.java b/plugin/src/main/java/org/opensearch/ml/model/MLModelGroupManager.java index 2187a4577e..73c48e776a 100644 --- a/plugin/src/main/java/org/opensearch/ml/model/MLModelGroupManager.java +++ b/plugin/src/main/java/org/opensearch/ml/model/MLModelGroupManager.java @@ -112,6 +112,10 @@ public void createModelGroup(MLRegisterModelGroupInput input, ActionListener { + if (!res) { + wrappedListener.onFailure(new RuntimeException("No response to create ML Model Group index")); + return; + } IndexRequest indexRequest = new IndexRequest(ML_MODEL_GROUP_INDEX); indexRequest .source( diff --git a/plugin/src/main/java/org/opensearch/ml/model/MLModelManager.java b/plugin/src/main/java/org/opensearch/ml/model/MLModelManager.java index daccdf1569..32378ef7bf 100644 --- a/plugin/src/main/java/org/opensearch/ml/model/MLModelManager.java +++ b/plugin/src/main/java/org/opensearch/ml/model/MLModelManager.java @@ -292,6 +292,10 @@ private void uploadMLModelMeta(MLRegisterModelMetaInput mlRegisterModelMetaInput ActionListener wrappedListener = ActionListener.runBefore(listener, () -> context.restore()); String modelName = mlRegisterModelMetaInput.getName(); mlIndicesHandler.initModelIndexIfAbsent(ActionListener.wrap(res -> { + if (!res) { + wrappedListener.onFailure(new RuntimeException("No response to create ML Model index")); + return; + } Instant now = Instant.now(); MLModel mlModelMeta = MLModel .builder() @@ -528,6 +532,10 @@ private void indexRemoteModel( } mlIndicesHandler.initModelIndexIfAbsent(ActionListener.wrap(boolResponse -> { + if (!boolResponse) { + listener.onFailure(new RuntimeException("No response to create ML Model index")); + return; + } MLModel mlModelMeta = MLModel .builder() .name(modelName) @@ -596,6 +604,10 @@ void indexRemoteModel(MLRegisterModelInput registerModelInput, MLTask mlTask, St registerModelInput.getConnector().encrypt(mlEngine::encrypt); } mlIndicesHandler.initModelIndexIfAbsent(ActionListener.runBefore(ActionListener.wrap(res -> { + if (!res) { + handleException(functionName, taskId, new RuntimeException("No response to create ML Model index")); + return; + } MLModel mlModelMeta = MLModel .builder() .name(modelName) @@ -666,6 +678,10 @@ private void registerModelFromUrl(MLRegisterModelInput registerModelInput, MLTas String modelGroupId = registerModelInput.getModelGroupId(); Instant now = Instant.now(); mlIndicesHandler.initModelIndexIfAbsent(ActionListener.runBefore(ActionListener.wrap(res -> { + if (!res) { + handleException(functionName, taskId, new RuntimeException("No response to create ML Model index")); + return; + } MLModel mlModelMeta = MLModel .builder() .name(modelName) diff --git a/plugin/src/test/java/org/opensearch/ml/action/upload_chunk/MLModelChunkUploaderTests.java b/plugin/src/test/java/org/opensearch/ml/action/upload_chunk/MLModelChunkUploaderTests.java index 292183f8ed..adb65eabee 100644 --- a/plugin/src/test/java/org/opensearch/ml/action/upload_chunk/MLModelChunkUploaderTests.java +++ b/plugin/src/test/java/org/opensearch/ml/action/upload_chunk/MLModelChunkUploaderTests.java @@ -151,6 +151,20 @@ public void testUploadModelChunk() { verify(actionListener).onResponse(argumentCaptor.capture()); } + public void test_NoResponseInitModelIndex() { + doAnswer(invocation -> { + ActionListener actionListener = invocation.getArgument(0); + actionListener.onResponse(false); + return null; + }).when(mlIndicesHandler).initModelIndexIfAbsent(any()); + + MLUploadModelChunkInput uploadModelChunkInput = prepareRequest(); + mlModelChunkUploader.uploadModelChunk(uploadModelChunkInput, actionListener); + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Exception.class); + verify(actionListener).onFailure(argumentCaptor.capture()); + assertEquals("No response to create ML Model index", argumentCaptor.getValue().getMessage()); + } + private MLUploadModelChunkInput prepareRequest() { final byte[] content = new byte[] { 1, 2, 3, 4 }; MLUploadModelChunkInput input = MLUploadModelChunkInput.builder().chunkNumber(0).modelId("someModelId").content(content).build(); diff --git a/plugin/src/test/java/org/opensearch/ml/model/MLModelGroupManagerTests.java b/plugin/src/test/java/org/opensearch/ml/model/MLModelGroupManagerTests.java index ce01b44026..f284f54a5e 100644 --- a/plugin/src/test/java/org/opensearch/ml/model/MLModelGroupManagerTests.java +++ b/plugin/src/test/java/org/opensearch/ml/model/MLModelGroupManagerTests.java @@ -389,6 +389,23 @@ public void test_NotFoundGetModelGroup() throws IOException { assertEquals("Failed to find model group with ID: testModelGroupID", argumentCaptor.getValue().getMessage()); } + public void test_NoResponseoInitModelGroup() throws IOException { + doAnswer(invocation -> { + ActionListener actionListener = invocation.getArgument(0); + actionListener.onResponse(false); + return null; + }).when(mlIndicesHandler).initModelGroupIndexIfAbsent(any()); + + when(modelAccessControlHelper.isSecurityEnabledAndModelAccessControlEnabled(any())).thenReturn(false); + + MLRegisterModelGroupInput mlRegisterModelGroupInput = prepareRequest(null, null, null); + mlModelGroupManager.createModelGroup(mlRegisterModelGroupInput, actionListener); + + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Exception.class); + verify(actionListener).onFailure(argumentCaptor.capture()); + assertEquals("No response to create ML Model Group index", argumentCaptor.getValue().getMessage()); + } + private MLRegisterModelGroupInput prepareRequest(List backendRoles, AccessMode modelAccessMode, Boolean isAddAllBackendRoles) { return MLRegisterModelGroupInput .builder() diff --git a/plugin/src/test/java/org/opensearch/ml/model/MLModelManagerTests.java b/plugin/src/test/java/org/opensearch/ml/model/MLModelManagerTests.java index 01aee3fa04..8868aaa0b4 100644 --- a/plugin/src/test/java/org/opensearch/ml/model/MLModelManagerTests.java +++ b/plugin/src/test/java/org/opensearch/ml/model/MLModelManagerTests.java @@ -1158,6 +1158,20 @@ public void testRegisterModelMeta_FailedToInitIndexIfPresent() { verify(actionListener).onFailure(argumentCaptor.capture()); } + public void testRegisterModelMeta_NoResponseToInitIndex() { + setupForModelMeta(); + doAnswer(invocation -> { + ActionListener actionListener = invocation.getArgument(0); + actionListener.onResponse(false); + return null; + }).when(mlIndicesHandler).initModelIndexIfAbsent(any()); + MLRegisterModelMetaInput mlUploadInput = prepareRequest(); + modelManager.registerModelMeta(mlUploadInput, actionListener); + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Exception.class); + verify(actionListener).onFailure(argumentCaptor.capture()); + assertEquals("No response to create ML Model index", argumentCaptor.getValue().getMessage()); + } + public void test_trackPredictDuration_sync() { Supplier mockResult = () -> { try {