Skip to content

Commit

Permalink
Create datasource API
Browse files Browse the repository at this point in the history
Signed-off-by: vamsi-amazon <[email protected]>
  • Loading branch information
vmmusings committed Mar 23, 2023
1 parent aa92f99 commit 621ee4e
Show file tree
Hide file tree
Showing 49 changed files with 1,780 additions and 370 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ gen
/.prom.pid.lock

.java-version
.worktrees
.worktrees
http-client.env.json
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ buildscript {
version_tokens = opensearch_version.tokenize('-')
opensearch_build = version_tokens[0] + '.0'
prometheus_binary_version = "2.37.2"
common_utils_version = System.getProperty("common_utils.version", opensearch_build)
if (buildVersionQualifier) {
opensearch_build += "-${buildVersionQualifier}"
}
Expand Down
3 changes: 2 additions & 1 deletion common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ dependencies {
api "org.antlr:antlr4-runtime:4.7.1"
api group: 'com.google.guava', name: 'guava', version: '31.0.1-jre'
api group: 'org.apache.logging.log4j', name: 'log4j-core', version:'2.17.1'
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.10'
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0'
api 'com.amazonaws:aws-encryption-sdk-java:2.4.0'

testImplementation group: 'junit', name: 'junit', version: '4.13.2'
testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.9.1'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.common.encryptor;

public interface Encryptor {

/**
* Takes plaintext and returns encrypted text.
*
* @param plainText plainText.
* @return String encryptedText.
*/
String encrypt(String plainText);

/**
* Takes encryptedText and returns plain text.
*
* @param encryptedText encryptedText.
* @return String plainText.
*/
String decrypt(String encryptedText);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.common.encryptor;

import com.amazonaws.encryptionsdk.AwsCrypto;
import com.amazonaws.encryptionsdk.CommitmentPolicy;
import com.amazonaws.encryptionsdk.CryptoResult;
import com.amazonaws.encryptionsdk.jce.JceMasterKey;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import javax.crypto.spec.SecretKeySpec;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
public class EncryptorImpl implements Encryptor {

private final String masterKey;

@Override
public String encrypt(String plainText) {

final AwsCrypto crypto = AwsCrypto.builder()
.withCommitmentPolicy(CommitmentPolicy.RequireEncryptRequireDecrypt)
.build();

JceMasterKey jceMasterKey
= JceMasterKey.getInstance(new SecretKeySpec(masterKey.getBytes(), "AES"), "Custom", "",
"AES/GCM/NoPadding");

final CryptoResult<byte[], JceMasterKey> encryptResult = crypto.encryptData(jceMasterKey,
plainText.getBytes(StandardCharsets.UTF_8));
return Base64.getEncoder().encodeToString(encryptResult.getResult());
}

@Override
public String decrypt(String encryptedText) {
final AwsCrypto crypto = AwsCrypto.builder()
.withCommitmentPolicy(CommitmentPolicy.RequireEncryptRequireDecrypt)
.build();

JceMasterKey jceMasterKey
= JceMasterKey.getInstance(new SecretKeySpec(masterKey.getBytes(), "AES"), "Custom", "",
"AES/GCM/NoPadding");

final CryptoResult<byte[], JceMasterKey> decryptedResult
= crypto.decryptData(jceMasterKey, Base64.getDecoder().decode(encryptedText));
return new String(decryptedResult.getResult());
}

}
3 changes: 2 additions & 1 deletion core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ pitest {

dependencies {
api group: 'com.google.guava', name: 'guava', version: '31.0.1-jre'
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.10'
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0'
api group: 'com.facebook.presto', name: 'presto-matching', version: '0.240'
api group: 'org.apache.commons', name: 'commons-math3', version: '3.6.1'
api "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
api group: 'com.google.code.gson', name: 'gson', version: '2.8.9'
api project(':common')

testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
Expand Down
24 changes: 16 additions & 8 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package org.opensearch.sql.analysis;

import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;
import static org.opensearch.sql.ast.tree.Sort.NullOrder.NULL_FIRST;
import static org.opensearch.sql.ast.tree.Sort.NullOrder.NULL_LAST;
import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC;
Expand Down Expand Up @@ -134,10 +135,7 @@ public LogicalPlan analyze(UnresolvedPlan unresolved, AnalysisContext context) {
@Override
public LogicalPlan visitRelation(Relation node, AnalysisContext context) {
QualifiedName qualifiedName = node.getTableQualifiedName();
Set<String> allowedDataSourceNames = dataSourceService.getDataSourceMetadataSet()
.stream()
.map(DataSourceMetadata::getName)
.collect(Collectors.toSet());
Set<String> allowedDataSourceNames = getAllowedDataSources(qualifiedName);
DataSourceSchemaIdentifierNameResolver dataSourceSchemaIdentifierNameResolver
= new DataSourceSchemaIdentifierNameResolver(qualifiedName.getParts(),
allowedDataSourceNames);
Expand Down Expand Up @@ -182,10 +180,7 @@ public LogicalPlan visitRelationSubquery(RelationSubquery node, AnalysisContext
@Override
public LogicalPlan visitTableFunction(TableFunction node, AnalysisContext context) {
QualifiedName qualifiedName = node.getFunctionName();
Set<String> allowedDataSourceNames = dataSourceService.getDataSourceMetadataSet()
.stream()
.map(DataSourceMetadata::getName)
.collect(Collectors.toSet());
Set<String> allowedDataSourceNames = getAllowedDataSources(qualifiedName);
DataSourceSchemaIdentifierNameResolver dataSourceSchemaIdentifierNameResolver
= new DataSourceSchemaIdentifierNameResolver(qualifiedName.getParts(),
allowedDataSourceNames);
Expand All @@ -210,6 +205,19 @@ public LogicalPlan visitTableFunction(TableFunction node, AnalysisContext contex
tableFunctionImplementation.applyArguments());
}

// This method has optimization to avoid metadata search calls to OpenSearch
// by checking if the name has only one part in it.
private Set<String> getAllowedDataSources(QualifiedName qualifiedName) {
Set<String> allowedDataSourceNames = ImmutableSet.of();
if (qualifiedName.getParts().size() != 1) {
allowedDataSourceNames = dataSourceService.getDataSourceMetadataSet()
.stream()
.map(DataSourceMetadata::getName)
.collect(Collectors.toSet());
}
return allowedDataSourceNames;
}


@Override
public LogicalPlan visitLimit(Limit node, AnalysisContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ public interface DataSourceService {
/**
* Register {@link DataSource} defined by {@link DataSourceMetadata}.
*
* @param metadatas list of {@link DataSourceMetadata}.
* @param metadata {@link DataSourceMetadata}.
*/
void createDataSource(DataSourceMetadata... metadatas);
void createDataSource(DataSourceMetadata metadata);

/**
* Updates {@link DataSource} corresponding to dataSourceMetadata.
Expand All @@ -52,15 +52,4 @@ public interface DataSourceService {
* @param dataSourceName name of the {@link DataSource}.
*/
void deleteDataSource(String dataSourceName);

/**
* This method is to bootstrap
* datasources during the startup of the plugin.
*/
void bootstrapDataSources();

/**
* remove all the registered {@link DataSource}.
*/
void clear();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@

package org.opensearch.sql.datasource;

import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
Expand All @@ -32,46 +35,62 @@ public class DataSourceServiceImpl implements DataSourceService {

private static String DATASOURCE_NAME_REGEX = "[@*A-Za-z]+?[*a-zA-Z_\\-0-9]*";

private final ConcurrentHashMap<String, DataSource> dataSourceMap;
private final ConcurrentHashMap<DataSourceMetadata, DataSource> dataSourceMap;

private final Map<DataSourceType, DataSourceFactory> dataSourceFactoryMap;

private final DataSourceMetadataStorage dataSourceMetadataStorage;

private final DataSourceUserAuthorizationHelper dataSourceUserAuthorizationHelper;

/**
* Construct from the set of {@link DataSourceFactory} at bootstrap time.
*/
public DataSourceServiceImpl(Set<DataSourceFactory> dataSourceFactories) {
public DataSourceServiceImpl(Set<DataSourceFactory> dataSourceFactories,
DataSourceMetadataStorage dataSourceMetadataStorage,
DataSourceUserAuthorizationHelper
dataSourceUserAuthorizationHelper) {
dataSourceFactoryMap =
dataSourceFactories.stream()
.collect(Collectors.toMap(DataSourceFactory::getDataSourceType, f -> f));
dataSourceMap = new ConcurrentHashMap<>();
this.dataSourceMetadataStorage = dataSourceMetadataStorage;
this.dataSourceUserAuthorizationHelper = dataSourceUserAuthorizationHelper;
}

@Override
public Set<DataSourceMetadata> getDataSourceMetadataSet() {
return dataSourceMap.values().stream()
.map(dataSource
-> new DataSourceMetadata(dataSource.getName(),
dataSource.getConnectorType(), ImmutableMap.of()))
.collect(Collectors.toSet());
List<DataSourceMetadata> dataSourceMetadataList
= this.dataSourceMetadataStorage.getDataSourceMetadata();
Set<DataSourceMetadata> dataSourceMetadataSet = new HashSet<>(dataSourceMetadataList);
dataSourceMetadataSet.add(DataSourceMetadata.defaultOpenSearchDataSourceMetadata());
return dataSourceMetadataSet;
}


@Override
public DataSource getDataSource(String dataSourceName) {
if (!dataSourceMap.containsKey(dataSourceName)) {
Optional<DataSourceMetadata>
dataSourceMetadataOptional = getDataSourceMetadata(dataSourceName);
if (dataSourceMetadataOptional.isEmpty()) {
throw new IllegalArgumentException(
String.format("DataSource with name %s doesn't exist.", dataSourceName));
} else {
DataSourceMetadata dataSourceMetadata = dataSourceMetadataOptional.get();
this.dataSourceUserAuthorizationHelper
.authorizeDataSource(dataSourceMetadata);
return getDataSourceFromMetadata(dataSourceMetadata);
}
return dataSourceMap.get(dataSourceName);
}

@Override
public void createDataSource(DataSourceMetadata... metadatas) {
for (DataSourceMetadata metadata : metadatas) {
validateDataSourceMetaData(metadata);
dataSourceMap.put(
metadata.getName(),
dataSourceFactoryMap.get(metadata.getConnector()).createDataSource(metadata));
public void createDataSource(DataSourceMetadata metadata) {
validateDataSourceMetaData(metadata);
if (!metadata.getName().equals(DEFAULT_DATASOURCE_NAME)) {
this.dataSourceMetadataStorage.createDataSourceMetadata(metadata);
}
dataSourceMap.put(metadata,
dataSourceFactoryMap.get(metadata.getConnector()).createDataSource(metadata));
}

@Override
Expand All @@ -84,15 +103,6 @@ public void deleteDataSource(String dataSourceName) {
throw new UnsupportedOperationException("will be supported in future");
}

@Override
public void bootstrapDataSources() {
throw new UnsupportedOperationException("will be supported in future");
}

@Override
public void clear() {
dataSourceMap.clear();
}

/**
* This can be moved to a different validator class when we introduce more connectors.
Expand All @@ -103,18 +113,38 @@ private void validateDataSourceMetaData(DataSourceMetadata metadata) {
Preconditions.checkArgument(
!Strings.isNullOrEmpty(metadata.getName()),
"Missing Name Field from a DataSource. Name is a required parameter.");
Preconditions.checkArgument(
!dataSourceMap.containsKey(metadata.getName()),
StringUtils.format(
"Datasource name should be unique, Duplicate datasource found %s.",
metadata.getName()));
Preconditions.checkArgument(
metadata.getName().matches(DATASOURCE_NAME_REGEX),
StringUtils.format(
"DataSource Name: %s contains illegal characters. Allowed characters: a-zA-Z0-9_-*@.",
metadata.getName()));
Preconditions.checkArgument(
!Objects.isNull(metadata.getProperties()),
"Missing properties field in catalog configuration. Properties are required parameters.");
"Missing properties field in datasource configuration."
+ " Properties are required parameters.");
}

private Optional<DataSourceMetadata> getDataSourceMetadata(String dataSourceName) {
if (dataSourceName.equals(DEFAULT_DATASOURCE_NAME)) {
return Optional.of(DataSourceMetadata.defaultOpenSearchDataSourceMetadata());
} else {
return this.dataSourceMetadataStorage.getDataSourceMetadata(dataSourceName);
}
}

private DataSource getDataSourceFromMetadata(DataSourceMetadata dataSourceMetadata) {
if (!dataSourceMap.containsKey(dataSourceMetadata)) {
clearDataSource(dataSourceMetadata);
dataSourceMap.put(dataSourceMetadata,
dataSourceFactoryMap.get(dataSourceMetadata.getConnector())
.createDataSource(dataSourceMetadata));
}
return dataSourceMap.get(dataSourceMetadata);
}

private void clearDataSource(DataSourceMetadata dataSourceMetadata) {
dataSourceMap.entrySet()
.removeIf(entry -> entry.getKey().getName().equals(dataSourceMetadata.getName()));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.opensearch.sql.datasource;

import java.util.List;
import org.opensearch.sql.datasource.model.DataSourceMetadata;

/**
* Interface for datasource authorization helper.
* The implementation of this class helps in determining
* if authorization is required and the roles associated with the user.
*/
public interface DataSourceUserAuthorizationHelper {

/**
* Authorizes DataSource within the current context.
*
* @param dataSourceMetadata {@link DataSourceMetadata}
*/
void authorizeDataSource(DataSourceMetadata dataSourceMetadata);
}
Loading

0 comments on commit 621ee4e

Please sign in to comment.