From 5450544fba3ef619f2656d095bbc3e33043215a6 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Wed, 18 Dec 2024 12:03:23 -0800 Subject: [PATCH] Pass tenantId to IndicesHandler and use in EncryptorUtils Signed-off-by: Daniel Widdis --- .../indices/FlowFrameworkIndicesHandler.java | 5 +- .../CreateWorkflowTransportAction.java | 2 +- .../flowframework/util/EncryptorUtils.java | 55 +++++++++++-------- .../CreateWorkflowTransportActionTests.java | 30 ++++------ .../util/EncryptorUtilsTests.java | 48 ++++++++-------- 5 files changed, 74 insertions(+), 66 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java index 0dd2d934..72df5600 100644 --- a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java @@ -359,15 +359,16 @@ public void putTemplateToGlobalContext(Template template, ActionListener listener) { + public void initializeConfigIndex(String tenantId, ActionListener listener) { initConfigIndexIfAbsent(ActionListener.wrap(indexCreated -> { if (!indexCreated) { listener.onFailure(new FlowFrameworkException("No response to create config index", INTERNAL_SERVER_ERROR)); return; } - encryptorUtils.initializeMasterKey(listener); + encryptorUtils.initializeMasterKey(tenantId, listener); }, createIndexException -> { logger.error("Failed to create config index"); listener.onFailure(createIndexException); diff --git a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java index 3d87fcd4..8d4e62e6 100644 --- a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java @@ -268,7 +268,7 @@ private void createExecute(WorkflowRequest request, User user, String tenantId, return; } else { // Initialize config index and create new global context and state index entries - flowFrameworkIndicesHandler.initializeConfigIndex(ActionListener.wrap(isInitialized -> { + flowFrameworkIndicesHandler.initializeConfigIndex(tenantId, ActionListener.wrap(isInitialized -> { if (FALSE.equals(isInitialized)) { listener.onFailure( new FlowFrameworkException("Failed to initalize config index", RestStatus.INTERNAL_SERVER_ERROR) diff --git a/src/main/java/org/opensearch/flowframework/util/EncryptorUtils.java b/src/main/java/org/opensearch/flowframework/util/EncryptorUtils.java index 5748c7ae..1038475d 100644 --- a/src/main/java/org/opensearch/flowframework/util/EncryptorUtils.java +++ b/src/main/java/org/opensearch/flowframework/util/EncryptorUtils.java @@ -16,6 +16,7 @@ import org.opensearch.action.support.WriteRequest; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Nullable; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.commons.authuser.User; @@ -42,7 +43,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.function.Function; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiFunction; import com.amazonaws.encryptionsdk.AwsCrypto; import com.amazonaws.encryptionsdk.CommitmentPolicy; @@ -67,10 +70,13 @@ public class EncryptorUtils { // https://github.com/aws/aws-encryption-sdk-java/issues/1879 private static final String WRAPPING_ALGORITHM = "AES/GCM/NOPADDING"; + // concurrent map can't have null as a key. This key is to support single tenancy + public static final String DEFAULT_TENANT_ID = "03000200-0400-0500-0006-000700080009"; + private final ClusterService clusterService; private final Client client; private final SdkClient sdkClient; - private String masterKey; + private final Map tenantMasterKeys; private final NamedXContentRegistry xContentRegistry; /** @@ -80,7 +86,7 @@ public class EncryptorUtils { * @param xContentRegistry the OpenSearch XContent Registry */ public EncryptorUtils(ClusterService clusterService, Client client, SdkClient sdkClient, NamedXContentRegistry xContentRegistry) { - this.masterKey = null; + this.tenantMasterKeys = new ConcurrentHashMap<>(); this.clusterService = clusterService; this.client = client; this.sdkClient = sdkClient; @@ -89,18 +95,20 @@ public EncryptorUtils(ClusterService clusterService, Client client, SdkClient sd /** * Sets the master key + * @param tenantId The tenant id. If null, sets the key for the default id. * @param masterKey the master key */ - void setMasterKey(String masterKey) { - this.masterKey = masterKey; + void setMasterKey(@Nullable String tenantId, String masterKey) { + this.tenantMasterKeys.put(Objects.requireNonNullElse(tenantId, DEFAULT_TENANT_ID), masterKey); } /** * Returns the master key + * @param tenantId The tenant id. If null, gets the key for the default id. * @return the master key */ - String getMasterKey() { - return this.masterKey; + String getMasterKey(@Nullable String tenantId) { + return tenantMasterKeys.get(Objects.requireNonNullElse(tenantId, DEFAULT_TENANT_ID)); } /** @@ -138,7 +146,7 @@ public Template decryptTemplateCredentials(Template template) { * @param cipherFunction the encryption/decryption function to apply on credential values * @return template with encrypted credentials */ - private Template processTemplateCredentials(Template template, Function cipherFunction) { + private Template processTemplateCredentials(Template template, BiFunction cipherFunction) { Map processedWorkflows = new HashMap<>(); for (Map.Entry entry : template.workflows().entrySet()) { @@ -148,7 +156,7 @@ private Template processTemplateCredentials(Template template, Function credentials = new HashMap<>((Map) node.userInputs().get(CREDENTIAL_FIELD)); - credentials.replaceAll((key, cred) -> cipherFunction.apply(cred)); + credentials.replaceAll((key, cred) -> cipherFunction.apply(cred, template.getTenantId())); // Replace credentials field in node user inputs Map processedUserInputs = new HashMap<>(); @@ -178,12 +186,13 @@ private Template processTemplateCredentials(Template template, Function encryptResult = crypto.encryptData( jceMasterKey, @@ -195,13 +204,13 @@ String encrypt(final String credential) { /** * Decrypts the given credential * @param encryptedCredential the credential to decrypt + * @param tenantId The tenant id. If null, decrypts for the default tenant id. * @return the decrypted credential */ - String decrypt(final String encryptedCredential) { - initializeMasterKeyIfAbsent(); + String decrypt(final String encryptedCredential, @Nullable String tenantId) { + initializeMasterKeyIfAbsent(tenantId); final AwsCrypto crypto = AwsCrypto.builder().withCommitmentPolicy(CommitmentPolicy.RequireEncryptRequireDecrypt).build(); - - byte[] bytes = Base64.getDecoder().decode(masterKey); + byte[] bytes = Base64.getDecoder().decode(getMasterKey(tenantId)); JceMasterKey jceMasterKey = JceMasterKey.getInstance(new SecretKeySpec(bytes, ALGORITHM), PROVIDER, "", WRAPPING_ALGORITHM); final CryptoResult decryptedResult = crypto.decryptData( @@ -251,9 +260,10 @@ public Template redactTemplateSecuredFields(User user, Template template) { /** * Retrieves an existing master key or generates a new key to index + * @param tenantId The tenant id. If null, initializes the key for the default tenant id. * @param listener the action listener */ - public void initializeMasterKey(ActionListener listener) { + public void initializeMasterKey(@Nullable String tenantId, ActionListener listener) { // Index has either been created or it already exists, need to check if master key has been initalized already, if not then // generate // This is necessary in case of global context index restoration from snapshot, will need to use the same master key to decrypt @@ -273,7 +283,7 @@ public void initializeMasterKey(ActionListener listener) { context.restore(); // Set generated key to master logger.info("Config has been initialized successfully"); - this.masterKey = config.masterKey(); + setMasterKey(tenantId, config.masterKey()); listener.onResponse(true); }, indexException -> { logger.error("Failed to index config", indexException); @@ -292,7 +302,7 @@ public void initializeMasterKey(ActionListener listener) { ) { ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); Config config = Config.parse(parser); - this.masterKey = config.masterKey(); + setMasterKey(tenantId, config.masterKey()); listener.onResponse(true); } catch (FlowFrameworkException e) { listener.onFailure(e); @@ -311,9 +321,10 @@ public void initializeMasterKey(ActionListener listener) { /** * Retrieves master key from system index if not yet set + * @param tenantId The tenant id. If null, initializes the key for the default id. */ - void initializeMasterKeyIfAbsent() { - if (masterKey != null) { + void initializeMasterKeyIfAbsent(@Nullable String tenantId) { + if (this.tenantMasterKeys.containsKey(Objects.requireNonNullElse(tenantId, DEFAULT_TENANT_ID))) { return; } @@ -333,7 +344,7 @@ void initializeMasterKeyIfAbsent() { ) { ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); Config config = Config.parse(parser); - this.masterKey = config.masterKey(); + setMasterKey(tenantId, config.masterKey()); } } else { throw new FlowFrameworkException("Master key has not been initialized in config index", RestStatus.NOT_FOUND); diff --git a/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java index e3cf73ee..50f8b90f 100644 --- a/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java @@ -84,6 +84,7 @@ import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; @@ -321,10 +322,10 @@ public void testFailedToCreateNewWorkflow() { // Bypass initializeConfigIndex and force onResponse doAnswer(invocation -> { - ActionListener initalizeMasterKeyIndexListener = invocation.getArgument(0); + ActionListener initalizeMasterKeyIndexListener = invocation.getArgument(1); initalizeMasterKeyIndexListener.onResponse(true); return null; - }).when(flowFrameworkIndicesHandler).initializeConfigIndex(any()); + }).when(flowFrameworkIndicesHandler).initializeConfigIndex(nullable(String.class), any()); doAnswer(invocation -> { ActionListener responseListener = invocation.getArgument(1); @@ -352,10 +353,10 @@ public void testCreateNewWorkflow() { // Bypass initializeConfigIndex and force onResponse doAnswer(invocation -> { - ActionListener initalizeMasterKeyIndexListener = invocation.getArgument(0); + ActionListener initalizeMasterKeyIndexListener = invocation.getArgument(1); initalizeMasterKeyIndexListener.onResponse(true); return null; - }).when(flowFrameworkIndicesHandler).initializeConfigIndex(any()); + }).when(flowFrameworkIndicesHandler).initializeConfigIndex(nullable(String.class), any()); // Bypass putTemplateToGlobalContext and force onResponse doAnswer(invocation -> { @@ -417,10 +418,10 @@ public void testCreateWithUserAndFilterOn() { // Bypass initializeConfigIndex and force onResponse doAnswer(invocation -> { - ActionListener initalizeMasterKeyIndexListener = invocation.getArgument(0); + ActionListener initalizeMasterKeyIndexListener = invocation.getArgument(1); initalizeMasterKeyIndexListener.onResponse(true); return null; - }).when(flowFrameworkIndicesHandler).initializeConfigIndex(any()); + }).when(flowFrameworkIndicesHandler).initializeConfigIndex(nullable(String.class), any()); // Bypass putTemplateToGlobalContext and force onResponse doAnswer(invocation -> { @@ -610,14 +611,7 @@ public void testFailedToUpdateNonExistingWorkflow() throws IOException, Interrup PlainActionFuture future = PlainActionFuture.newFuture(); future.onFailure(new Exception("Failed to retrieve template (2) from global context.")); when(client.get(any(GetRequest.class))).thenReturn(future); - // for the createExecute() method get (not yet migrated) - /* - doAnswer(invocation -> { - ActionListener getListener = (ActionListener) invocation.getArguments()[1]; - getListener.onFailure(new Exception("Failed to retrieve template (2) from global context.")); - return null; - }).when(client).get(any(GetRequest.class), any()); - */ + doAnswer(invocation -> { ActionListener responseListener = invocation.getArgument(2); responseListener.onFailure(new Exception("failed")); @@ -773,10 +767,10 @@ public void testCreateWorkflow_withValidation_withProvision_Success() throws Exc // Bypass initializeConfigIndex and force onResponse doAnswer(invocation -> { - ActionListener initalizeMasterKeyIndexListener = invocation.getArgument(0); + ActionListener initalizeMasterKeyIndexListener = invocation.getArgument(1); initalizeMasterKeyIndexListener.onResponse(true); return null; - }).when(flowFrameworkIndicesHandler).initializeConfigIndex(any()); + }).when(flowFrameworkIndicesHandler).initializeConfigIndex(nullable(String.class), any()); // Bypass putTemplateToGlobalContext and force onResponse doAnswer(invocation -> { @@ -833,10 +827,10 @@ public void testCreateWorkflow_withValidation_withProvision_FailedProvisioning() // Bypass initializeConfigIndex and force onResponse doAnswer(invocation -> { - ActionListener initalizeMasterKeyIndexListener = invocation.getArgument(0); + ActionListener initalizeMasterKeyIndexListener = invocation.getArgument(1); initalizeMasterKeyIndexListener.onResponse(true); return null; - }).when(flowFrameworkIndicesHandler).initializeConfigIndex(any()); + }).when(flowFrameworkIndicesHandler).initializeConfigIndex(nullable(String.class), any()); // Bypass putTemplateToGlobalContext and force onResponse doAnswer(invocation -> { diff --git a/src/test/java/org/opensearch/flowframework/util/EncryptorUtilsTests.java b/src/test/java/org/opensearch/flowframework/util/EncryptorUtilsTests.java index 5bd9c333..40e3159d 100644 --- a/src/test/java/org/opensearch/flowframework/util/EncryptorUtilsTests.java +++ b/src/test/java/org/opensearch/flowframework/util/EncryptorUtilsTests.java @@ -114,40 +114,40 @@ public void setUp() throws Exception { public void testGenerateMasterKey() { String generatedMasterKey = encryptorUtils.generateMasterKey(); - encryptorUtils.setMasterKey(generatedMasterKey); - assertEquals(generatedMasterKey, encryptorUtils.getMasterKey()); + encryptorUtils.setMasterKey(null, generatedMasterKey); + assertEquals(generatedMasterKey, encryptorUtils.getMasterKey(null)); } public void testEncryptDecrypt() { - encryptorUtils.setMasterKey(testMasterKey); + encryptorUtils.setMasterKey(null, testMasterKey); String testString = "test"; - String encrypted = encryptorUtils.encrypt(testString); + String encrypted = encryptorUtils.encrypt(testString, null); assertNotNull(encrypted); - String decrypted = encryptorUtils.decrypt(encrypted); + String decrypted = encryptorUtils.decrypt(encrypted, null); assertEquals(testString, decrypted); } public void testEncryptWithDifferentMasterKey() { - encryptorUtils.setMasterKey(testMasterKey); + encryptorUtils.setMasterKey(null, testMasterKey); String testString = "test"; - String encrypted1 = encryptorUtils.encrypt(testString); + String encrypted1 = encryptorUtils.encrypt(testString, null); assertNotNull(encrypted1); // Change the master key prior to encryption String differentMasterKey = encryptorUtils.generateMasterKey(); - encryptorUtils.setMasterKey(differentMasterKey); - String encrypted2 = encryptorUtils.encrypt(testString); + encryptorUtils.setMasterKey(null, differentMasterKey); + String encrypted2 = encryptorUtils.encrypt(testString, null); assertNotEquals(encrypted1, encrypted2); } public void testInitializeMasterKeySuccess() throws IOException { - encryptorUtils.setMasterKey(null); - String masterKey = encryptorUtils.generateMasterKey(); // Index exists case + // reinitialize with blank master key + this.encryptorUtils = new EncryptorUtils(clusterService, client, sdkClient, xContentRegistry); BytesReference bytesRef; try (XContentBuilder builder = XContentFactory.jsonBuilder()) { Config config = new Config(masterKey, Instant.now()); @@ -167,15 +167,16 @@ public void testInitializeMasterKeySuccess() throws IOException { }).when(client).get(any(GetRequest.class), any()); ActionListener listener = ActionListener.wrap(b -> {}, e -> {}); - encryptorUtils.initializeMasterKey(listener); - assertEquals(masterKey, encryptorUtils.getMasterKey()); + encryptorUtils.initializeMasterKey(null, listener); + assertEquals(masterKey, encryptorUtils.getMasterKey(null)); // Test ifAbsent version - encryptorUtils.setMasterKey(null); - assertNull(encryptorUtils.getMasterKey()); + // reinitialize with blank master key + this.encryptorUtils = new EncryptorUtils(clusterService, client, sdkClient, xContentRegistry); + assertNull(encryptorUtils.getMasterKey(null)); - encryptorUtils.initializeMasterKeyIfAbsent(); - assertEquals(masterKey, encryptorUtils.getMasterKey()); + encryptorUtils.initializeMasterKeyIfAbsent(null); + assertEquals(masterKey, encryptorUtils.getMasterKey(null)); // No index exists case doAnswer(invocation -> { @@ -193,14 +194,15 @@ public void testInitializeMasterKeySuccess() throws IOException { }).when(client).index(any(IndexRequest.class), any()); listener = ActionListener.wrap(b -> {}, e -> {}); - encryptorUtils.initializeMasterKey(listener); + encryptorUtils.initializeMasterKey(null, listener); // This will generate a new master key 32 bytes -> base64 encoded - assertNotEquals(masterKey, encryptorUtils.getMasterKey()); - assertEquals(masterKey.length(), encryptorUtils.getMasterKey().length()); + assertNotEquals(masterKey, encryptorUtils.getMasterKey(null)); + assertEquals(masterKey.length(), encryptorUtils.getMasterKey(null).length()); } public void testInitializeMasterKeyFailure() { - encryptorUtils.setMasterKey(null); + // reinitialize with blank master key + this.encryptorUtils = new EncryptorUtils(clusterService, client, sdkClient, xContentRegistry); doAnswer(invocation -> { ActionListener getRequestActionListener = invocation.getArgument(1); @@ -212,12 +214,12 @@ public void testInitializeMasterKeyFailure() { return null; }).when(client).get(any(GetRequest.class), any()); - FlowFrameworkException ex = expectThrows(FlowFrameworkException.class, () -> encryptorUtils.initializeMasterKeyIfAbsent()); + FlowFrameworkException ex = expectThrows(FlowFrameworkException.class, () -> encryptorUtils.initializeMasterKeyIfAbsent(null)); assertEquals("Failed to get master key from config index", ex.getMessage()); } public void testEncryptDecryptTemplateCredential() { - encryptorUtils.setMasterKey(testMasterKey); + encryptorUtils.setMasterKey(null, testMasterKey); // Ecnrypt template with credential field Template processedtemplate = encryptorUtils.encryptTemplateCredentials(testTemplate);