Skip to content

Commit

Permalink
Fix init encryption master key (opensearch-project#2554)
Browse files Browse the repository at this point in the history
* fix init master key

Signed-off-by: Yaliang Wu <[email protected]>
  • Loading branch information
ylwu-amzn authored Jun 14, 2024
1 parent 4c04854 commit 487f33a
Show file tree
Hide file tree
Showing 4 changed files with 412 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,36 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.opensearch.ml.common.CommonValue.MASTER_KEY;
import static org.opensearch.ml.common.CommonValue.ML_CONFIG_INDEX;
import static org.opensearch.ml.common.MLConfig.CREATE_TIME_FIELD;

import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.time.Instant;
import java.util.Base64;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import javax.crypto.spec.SecretKeySpec;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.ml.common.exception.MLException;
import org.opensearch.ml.engine.indices.MLIndicesHandler;

import com.amazonaws.encryptionsdk.AwsCrypto;
import com.amazonaws.encryptionsdk.CommitmentPolicy;
import com.amazonaws.encryptionsdk.CryptoResult;
import com.amazonaws.encryptionsdk.jce.JceMasterKey;
import com.google.common.collect.ImmutableMap;

import lombok.extern.log4j.Log4j2;

Expand All @@ -42,11 +49,13 @@ public class EncryptorImpl implements Encryptor {
private ClusterService clusterService;
private Client client;
private volatile String masterKey;
private MLIndicesHandler mlIndicesHandler;

public EncryptorImpl(ClusterService clusterService, Client client) {
public EncryptorImpl(ClusterService clusterService, Client client, MLIndicesHandler mlIndicesHandler) {
this.masterKey = null;
this.clusterService = clusterService;
this.client = client;
this.mlIndicesHandler = mlIndicesHandler;
}

public EncryptorImpl(String masterKey) {
Expand Down Expand Up @@ -104,28 +113,68 @@ private void initMasterKey() {
AtomicReference<Exception> exceptionRef = new AtomicReference<>();

CountDownLatch latch = new CountDownLatch(1);
if (clusterService.state().metadata().hasIndex(ML_CONFIG_INDEX)) {
mlIndicesHandler.initMLConfigIndex(ActionListener.wrap(r -> {
GetRequest getRequest = new GetRequest(ML_CONFIG_INDEX).id(MASTER_KEY);
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
GetRequest getRequest = new GetRequest(ML_CONFIG_INDEX).id(MASTER_KEY);
client.get(getRequest, ActionListener.runBefore(new LatchedActionListener(ActionListener.<GetResponse>wrap(r -> {
if (r.isExists()) {
String masterKey = (String) r.getSourceAsMap().get(MASTER_KEY);
this.masterKey = masterKey;
client.get(getRequest, ActionListener.wrap(getResponse -> {
if (getResponse == null || !getResponse.isExists()) {
IndexRequest indexRequest = new IndexRequest(ML_CONFIG_INDEX).id(MASTER_KEY);
final String generatedMasterKey = generateMasterKey();
indexRequest
.source(ImmutableMap.of(MASTER_KEY, generatedMasterKey, CREATE_TIME_FIELD, Instant.now().toEpochMilli()));
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
indexRequest.opType(DocWriteRequest.OpType.CREATE);
client.index(indexRequest, ActionListener.wrap(indexResponse -> {
this.masterKey = generatedMasterKey;
log.info("ML encryption master key initialized successfully");
latch.countDown();
}, e -> {

if (ExceptionUtils.getRootCause(e) instanceof VersionConflictEngineException) {
GetRequest getMasterKeyRequest = new GetRequest(ML_CONFIG_INDEX).id(MASTER_KEY);
try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) {
client.get(getMasterKeyRequest, ActionListener.wrap(getMasterKeyResponse -> {
if (getMasterKeyResponse != null && getMasterKeyResponse.isExists()) {
final String masterKey = (String) getMasterKeyResponse.getSourceAsMap().get(MASTER_KEY);
this.masterKey = masterKey;
log.info("ML encryption master key already initialized, no action needed");
latch.countDown();
} else {
exceptionRef.set(new ResourceNotFoundException(MASTER_KEY_NOT_READY_ERROR));
latch.countDown();
}
}, error -> {
log.debug("Failed to get ML encryption master key", e);
exceptionRef.set(error);
latch.countDown();
}));
}
} else {
log.debug("Failed to index ML encryption master key", e);
exceptionRef.set(e);
latch.countDown();
}
}));
} else {
exceptionRef.set(new ResourceNotFoundException(MASTER_KEY_NOT_READY_ERROR));
final String masterKey = (String) getResponse.getSourceAsMap().get(MASTER_KEY);
this.masterKey = masterKey;
log.info("ML encryption master key already initialized, no action needed");
latch.countDown();
}
}, e -> {
log.error("Failed to get ML encryption master key", e);
log.debug("Failed to get ML encryption master key from config index", e);
exceptionRef.set(e);
}), latch), () -> context.restore()));
latch.countDown();
}));
}
} else {
exceptionRef.set(new ResourceNotFoundException(MASTER_KEY_NOT_READY_ERROR));
}, e -> {
log.debug("Failed to init ML config index", e);
exceptionRef.set(e);
latch.countDown();
}
}));

try {
latch.await(5, SECONDS);
latch.await(1, SECONDS);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
Expand All @@ -142,4 +191,5 @@ private void initMasterKey() {
throw new ResourceNotFoundException(MASTER_KEY_NOT_READY_ERROR);
}
}

}
Loading

0 comments on commit 487f33a

Please sign in to comment.