Skip to content

Commit

Permalink
Pass tenantId to IndicesHandler and use in EncryptorUtils
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Dec 18, 2024
1 parent a41b639 commit 5450544
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -359,15 +359,16 @@ public void putTemplateToGlobalContext(Template template, ActionListener<IndexRe

/**
* Initializes config index and EncryptorUtils
* @param tenantId the tenant id
* @param listener action listener
*/
public void initializeConfigIndex(ActionListener<Boolean> listener) {
public void initializeConfigIndex(String tenantId, ActionListener<Boolean> listener) {
initConfigIndexIfAbsent(ActionListener.wrap(indexCreated -> {
if (!indexCreated) {
listener.onFailure(new FlowFrameworkException("No response to create config index", INTERNAL_SERVER_ERROR));
return;
}
encryptorUtils.initializeMasterKey(listener);
encryptorUtils.initializeMasterKey(tenantId, listener);
}, createIndexException -> {
logger.error("Failed to create config index");
listener.onFailure(createIndexException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ private void createExecute(WorkflowRequest request, User user, String tenantId,
return;
} else {
// Initialize config index and create new global context and state index entries
flowFrameworkIndicesHandler.initializeConfigIndex(ActionListener.wrap(isInitialized -> {
flowFrameworkIndicesHandler.initializeConfigIndex(tenantId, ActionListener.wrap(isInitialized -> {
if (FALSE.equals(isInitialized)) {
listener.onFailure(
new FlowFrameworkException("Failed to initalize config index", RestStatus.INTERNAL_SERVER_ERROR)
Expand Down
55 changes: 33 additions & 22 deletions src/main/java/org/opensearch/flowframework/util/EncryptorUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.commons.authuser.User;
Expand All @@ -42,7 +43,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;

import com.amazonaws.encryptionsdk.AwsCrypto;
import com.amazonaws.encryptionsdk.CommitmentPolicy;
Expand All @@ -67,10 +70,13 @@ public class EncryptorUtils {
// https://github.com/aws/aws-encryption-sdk-java/issues/1879
private static final String WRAPPING_ALGORITHM = "AES/GCM/NOPADDING";

// concurrent map can't have null as a key. This key is to support single tenancy
public static final String DEFAULT_TENANT_ID = "03000200-0400-0500-0006-000700080009";

private final ClusterService clusterService;
private final Client client;
private final SdkClient sdkClient;
private String masterKey;
private final Map<String, String> tenantMasterKeys;
private final NamedXContentRegistry xContentRegistry;

/**
Expand All @@ -80,7 +86,7 @@ public class EncryptorUtils {
* @param xContentRegistry the OpenSearch XContent Registry
*/
public EncryptorUtils(ClusterService clusterService, Client client, SdkClient sdkClient, NamedXContentRegistry xContentRegistry) {
this.masterKey = null;
this.tenantMasterKeys = new ConcurrentHashMap<>();
this.clusterService = clusterService;
this.client = client;
this.sdkClient = sdkClient;
Expand All @@ -89,18 +95,20 @@ public EncryptorUtils(ClusterService clusterService, Client client, SdkClient sd

/**
* Sets the master key
* @param tenantId The tenant id. If null, sets the key for the default id.
* @param masterKey the master key
*/
void setMasterKey(String masterKey) {
this.masterKey = masterKey;
void setMasterKey(@Nullable String tenantId, String masterKey) {
this.tenantMasterKeys.put(Objects.requireNonNullElse(tenantId, DEFAULT_TENANT_ID), masterKey);
}

/**
* Returns the master key
* @param tenantId The tenant id. If null, gets the key for the default id.
* @return the master key
*/
String getMasterKey() {
return this.masterKey;
String getMasterKey(@Nullable String tenantId) {
return tenantMasterKeys.get(Objects.requireNonNullElse(tenantId, DEFAULT_TENANT_ID));
}

/**
Expand Down Expand Up @@ -138,7 +146,7 @@ public Template decryptTemplateCredentials(Template template) {
* @param cipherFunction the encryption/decryption function to apply on credential values
* @return template with encrypted credentials
*/
private Template processTemplateCredentials(Template template, Function<String, String> cipherFunction) {
private Template processTemplateCredentials(Template template, BiFunction<String, String, String> cipherFunction) {
Map<String, Workflow> processedWorkflows = new HashMap<>();
for (Map.Entry<String, Workflow> entry : template.workflows().entrySet()) {

Expand All @@ -148,7 +156,7 @@ private Template processTemplateCredentials(Template template, Function<String,
// Apply the cipher funcion on all values within credential field
@SuppressWarnings("unchecked")
Map<String, String> credentials = new HashMap<>((Map<String, String>) node.userInputs().get(CREDENTIAL_FIELD));
credentials.replaceAll((key, cred) -> cipherFunction.apply(cred));
credentials.replaceAll((key, cred) -> cipherFunction.apply(cred, template.getTenantId()));

// Replace credentials field in node user inputs
Map<String, Object> processedUserInputs = new HashMap<>();
Expand Down Expand Up @@ -178,12 +186,13 @@ private Template processTemplateCredentials(Template template, Function<String,
/**
* Encrypts the given credential
* @param credential the credential to encrypt
* @param tenantId The tenant id. If null, encrypts for the default tenant id.
* @return the encrypted credential
*/
String encrypt(final String credential) {
initializeMasterKeyIfAbsent();
String encrypt(final String credential, @Nullable String tenantId) {
initializeMasterKeyIfAbsent(tenantId);
final AwsCrypto crypto = AwsCrypto.builder().withCommitmentPolicy(CommitmentPolicy.RequireEncryptRequireDecrypt).build();
byte[] bytes = Base64.getDecoder().decode(masterKey);
byte[] bytes = Base64.getDecoder().decode(getMasterKey(tenantId));
JceMasterKey jceMasterKey = JceMasterKey.getInstance(new SecretKeySpec(bytes, ALGORITHM), PROVIDER, "", WRAPPING_ALGORITHM);
final CryptoResult<byte[], JceMasterKey> encryptResult = crypto.encryptData(
jceMasterKey,
Expand All @@ -195,13 +204,13 @@ String encrypt(final String credential) {
/**
* Decrypts the given credential
* @param encryptedCredential the credential to decrypt
* @param tenantId The tenant id. If null, decrypts for the default tenant id.
* @return the decrypted credential
*/
String decrypt(final String encryptedCredential) {
initializeMasterKeyIfAbsent();
String decrypt(final String encryptedCredential, @Nullable String tenantId) {
initializeMasterKeyIfAbsent(tenantId);
final AwsCrypto crypto = AwsCrypto.builder().withCommitmentPolicy(CommitmentPolicy.RequireEncryptRequireDecrypt).build();

byte[] bytes = Base64.getDecoder().decode(masterKey);
byte[] bytes = Base64.getDecoder().decode(getMasterKey(tenantId));
JceMasterKey jceMasterKey = JceMasterKey.getInstance(new SecretKeySpec(bytes, ALGORITHM), PROVIDER, "", WRAPPING_ALGORITHM);

final CryptoResult<byte[], JceMasterKey> decryptedResult = crypto.decryptData(
Expand Down Expand Up @@ -251,9 +260,10 @@ public Template redactTemplateSecuredFields(User user, Template template) {

/**
* Retrieves an existing master key or generates a new key to index
* @param tenantId The tenant id. If null, initializes the key for the default tenant id.
* @param listener the action listener
*/
public void initializeMasterKey(ActionListener<Boolean> listener) {
public void initializeMasterKey(@Nullable String tenantId, ActionListener<Boolean> listener) {
// Index has either been created or it already exists, need to check if master key has been initalized already, if not then
// generate
// This is necessary in case of global context index restoration from snapshot, will need to use the same master key to decrypt
Expand All @@ -273,7 +283,7 @@ public void initializeMasterKey(ActionListener<Boolean> listener) {
context.restore();
// Set generated key to master
logger.info("Config has been initialized successfully");
this.masterKey = config.masterKey();
setMasterKey(tenantId, config.masterKey());
listener.onResponse(true);
}, indexException -> {
logger.error("Failed to index config", indexException);
Expand All @@ -292,7 +302,7 @@ public void initializeMasterKey(ActionListener<Boolean> listener) {
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
Config config = Config.parse(parser);
this.masterKey = config.masterKey();
setMasterKey(tenantId, config.masterKey());
listener.onResponse(true);
} catch (FlowFrameworkException e) {
listener.onFailure(e);
Expand All @@ -311,9 +321,10 @@ public void initializeMasterKey(ActionListener<Boolean> listener) {

/**
* Retrieves master key from system index if not yet set
* @param tenantId The tenant id. If null, initializes the key for the default id.
*/
void initializeMasterKeyIfAbsent() {
if (masterKey != null) {
void initializeMasterKeyIfAbsent(@Nullable String tenantId) {
if (this.tenantMasterKeys.containsKey(Objects.requireNonNullElse(tenantId, DEFAULT_TENANT_ID))) {
return;
}

Expand All @@ -333,7 +344,7 @@ void initializeMasterKeyIfAbsent() {
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
Config config = Config.parse(parser);
this.masterKey = config.masterKey();
setMasterKey(tenantId, config.masterKey());
}
} else {
throw new FlowFrameworkException("Master key has not been initialized in config index", RestStatus.NOT_FOUND);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
Expand Down Expand Up @@ -321,10 +322,10 @@ public void testFailedToCreateNewWorkflow() {

// Bypass initializeConfigIndex and force onResponse
doAnswer(invocation -> {
ActionListener<Boolean> initalizeMasterKeyIndexListener = invocation.getArgument(0);
ActionListener<Boolean> initalizeMasterKeyIndexListener = invocation.getArgument(1);
initalizeMasterKeyIndexListener.onResponse(true);
return null;
}).when(flowFrameworkIndicesHandler).initializeConfigIndex(any());
}).when(flowFrameworkIndicesHandler).initializeConfigIndex(nullable(String.class), any());

doAnswer(invocation -> {
ActionListener<IndexResponse> responseListener = invocation.getArgument(1);
Expand Down Expand Up @@ -352,10 +353,10 @@ public void testCreateNewWorkflow() {

// Bypass initializeConfigIndex and force onResponse
doAnswer(invocation -> {
ActionListener<Boolean> initalizeMasterKeyIndexListener = invocation.getArgument(0);
ActionListener<Boolean> initalizeMasterKeyIndexListener = invocation.getArgument(1);
initalizeMasterKeyIndexListener.onResponse(true);
return null;
}).when(flowFrameworkIndicesHandler).initializeConfigIndex(any());
}).when(flowFrameworkIndicesHandler).initializeConfigIndex(nullable(String.class), any());

// Bypass putTemplateToGlobalContext and force onResponse
doAnswer(invocation -> {
Expand Down Expand Up @@ -417,10 +418,10 @@ public void testCreateWithUserAndFilterOn() {

// Bypass initializeConfigIndex and force onResponse
doAnswer(invocation -> {
ActionListener<Boolean> initalizeMasterKeyIndexListener = invocation.getArgument(0);
ActionListener<Boolean> initalizeMasterKeyIndexListener = invocation.getArgument(1);
initalizeMasterKeyIndexListener.onResponse(true);
return null;
}).when(flowFrameworkIndicesHandler).initializeConfigIndex(any());
}).when(flowFrameworkIndicesHandler).initializeConfigIndex(nullable(String.class), any());

// Bypass putTemplateToGlobalContext and force onResponse
doAnswer(invocation -> {
Expand Down Expand Up @@ -610,14 +611,7 @@ public void testFailedToUpdateNonExistingWorkflow() throws IOException, Interrup
PlainActionFuture<GetResponse> future = PlainActionFuture.newFuture();
future.onFailure(new Exception("Failed to retrieve template (2) from global context."));
when(client.get(any(GetRequest.class))).thenReturn(future);
// for the createExecute() method get (not yet migrated)
/*
doAnswer(invocation -> {
ActionListener<GetResponse> getListener = (ActionListener<GetResponse>) invocation.getArguments()[1];
getListener.onFailure(new Exception("Failed to retrieve template (2) from global context."));
return null;
}).when(client).get(any(GetRequest.class), any());
*/

doAnswer(invocation -> {
ActionListener<IndexResponse> responseListener = invocation.getArgument(2);
responseListener.onFailure(new Exception("failed"));
Expand Down Expand Up @@ -773,10 +767,10 @@ public void testCreateWorkflow_withValidation_withProvision_Success() throws Exc

// Bypass initializeConfigIndex and force onResponse
doAnswer(invocation -> {
ActionListener<Boolean> initalizeMasterKeyIndexListener = invocation.getArgument(0);
ActionListener<Boolean> initalizeMasterKeyIndexListener = invocation.getArgument(1);
initalizeMasterKeyIndexListener.onResponse(true);
return null;
}).when(flowFrameworkIndicesHandler).initializeConfigIndex(any());
}).when(flowFrameworkIndicesHandler).initializeConfigIndex(nullable(String.class), any());

// Bypass putTemplateToGlobalContext and force onResponse
doAnswer(invocation -> {
Expand Down Expand Up @@ -833,10 +827,10 @@ public void testCreateWorkflow_withValidation_withProvision_FailedProvisioning()

// Bypass initializeConfigIndex and force onResponse
doAnswer(invocation -> {
ActionListener<Boolean> initalizeMasterKeyIndexListener = invocation.getArgument(0);
ActionListener<Boolean> initalizeMasterKeyIndexListener = invocation.getArgument(1);
initalizeMasterKeyIndexListener.onResponse(true);
return null;
}).when(flowFrameworkIndicesHandler).initializeConfigIndex(any());
}).when(flowFrameworkIndicesHandler).initializeConfigIndex(nullable(String.class), any());

// Bypass putTemplateToGlobalContext and force onResponse
doAnswer(invocation -> {
Expand Down
Loading

0 comments on commit 5450544

Please sign in to comment.