From ed6715af7746b0bf2f95b2c62c7feaa7c7a7b03a Mon Sep 17 00:00:00 2001 From: Vamsi Manohar Date: Mon, 27 Mar 2023 15:14:15 -0700 Subject: [PATCH] Create datasource API (#1458) Signed-off-by: vamsi-amazon --- .gitignore | 3 +- build.gradle | 1 + common/build.gradle | 3 +- .../sql/common/encryptor/Encryptor.java | 28 ++ .../sql/common/encryptor/EncryptorImpl.java | 55 ++++ core/build.gradle | 3 +- .../org/opensearch/sql/analysis/Analyzer.java | 17 +- ...ataSourceSchemaIdentifierNameResolver.java | 21 +- .../sql/datasource/DataSourceService.java | 17 +- .../sql/datasource/DataSourceServiceImpl.java | 94 ++++--- .../DataSourceUserAuthorizationHelper.java | 19 ++ .../datasource/model/DataSourceMetadata.java | 17 +- .../sql/datasource/model/DataSourceType.java | 31 ++- .../sql/analysis/AnalyzerTestBase.java | 26 +- ...ourceSchemaIdentifierNameResolverTest.java | 31 ++- .../datasource/DataSourceServiceImplTest.java | 206 ++++++++++----- .../datasource/DataSourceTableScanTest.java | 4 +- docs/user/dql/metadata.rst | 1 + docs/user/ppl/admin/datasources.rst | 95 ++++++- doctest/test_data/datasources.json | 1 + doctest/test_docs.py | 2 + .../sql/datasource/DataSourceAPIsIT.java | 49 ++++ .../sql/legacy/SQLIntegTestCase.java | 8 +- .../opensearch/sql/legacy/TestsConstants.java | 1 + .../sql/ppl/InformationSchemaCommandIT.java | 10 + .../ppl/PrometheusDataSourceCommandsIT.java | 6 + .../sql/ppl/ShowDataSourcesCommandIT.java | 4 + .../org/opensearch/sql/ppl/StandaloneIT.java | 46 +++- .../src/test/resources/datasources.json | 2 + .../datasources_index_mappings.json | 17 ++ legacy/build.gradle | 2 +- .../sql/legacy/metrics/MetricName.java | 5 +- plugin/build.gradle | 10 +- .../org/opensearch/sql/plugin/SQLPlugin.java | 79 ++---- .../plugin/datasource/DataSourceSettings.java | 6 + ...DataSourceUserAuthorizationHelperImpl.java | 56 ++++ .../OpenSearchDataSourceMetadataStorage.java | 244 ++++++++++++++++++ .../model/CreateDataSourceActionRequest.java | 49 ++++ .../model/CreateDataSourceActionResponse.java | 33 +++ .../rest/RestDataSourceQueryAction.java | 131 ++++++++++ .../TransportCreateDataSourceAction.java | 73 ++++++ .../sql/plugin/utils/Scheduler.java | 33 +++ .../sql/plugin/utils/XContentParserUtils.java | 116 +++++++++ .../resources/datasources-index-mapping.yml | 19 ++ .../resources/datasources-index-settings.yml | 11 + .../datasource/DataSourceMetaDataTest.java | 130 ---------- ...SourceUserAuthorizationHelperImplTest.java | 80 ++++++ ...enSearchDataSourceMetadataStorageTest.java | 220 ++++++++++++++++ .../sql/plugin/utils/SchedulerTest.java | 47 ++++ .../org.mockito.plugins.MockMaker | 1 + .../storage/PrometheusStorageFactory.java | 19 +- 51 files changed, 1815 insertions(+), 367 deletions(-) create mode 100644 common/src/main/java/org/opensearch/sql/common/encryptor/Encryptor.java create mode 100644 common/src/main/java/org/opensearch/sql/common/encryptor/EncryptorImpl.java create mode 100644 core/src/main/java/org/opensearch/sql/datasource/DataSourceUserAuthorizationHelper.java create mode 100644 doctest/test_data/datasources.json create mode 100644 integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java create mode 100644 integ-test/src/test/resources/datasources.json create mode 100644 integ-test/src/test/resources/indexDefinitions/datasources_index_mappings.json create mode 100644 plugin/src/main/java/org/opensearch/sql/plugin/datasource/DataSourceUserAuthorizationHelperImpl.java create mode 100644 plugin/src/main/java/org/opensearch/sql/plugin/datasource/OpenSearchDataSourceMetadataStorage.java create mode 100644 plugin/src/main/java/org/opensearch/sql/plugin/model/CreateDataSourceActionRequest.java create mode 100644 plugin/src/main/java/org/opensearch/sql/plugin/model/CreateDataSourceActionResponse.java create mode 100644 plugin/src/main/java/org/opensearch/sql/plugin/rest/RestDataSourceQueryAction.java create mode 100644 plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/TransportCreateDataSourceAction.java create mode 100644 plugin/src/main/java/org/opensearch/sql/plugin/utils/Scheduler.java create mode 100644 plugin/src/main/java/org/opensearch/sql/plugin/utils/XContentParserUtils.java create mode 100644 plugin/src/main/resources/datasources-index-mapping.yml create mode 100644 plugin/src/main/resources/datasources-index-settings.yml delete mode 100644 plugin/src/test/java/org/opensearch/sql/plugin/datasource/DataSourceMetaDataTest.java create mode 100644 plugin/src/test/java/org/opensearch/sql/plugin/datasource/DataSourceUserAuthorizationHelperImplTest.java create mode 100644 plugin/src/test/java/org/opensearch/sql/plugin/datasource/OpenSearchDataSourceMetadataStorageTest.java create mode 100644 plugin/src/test/java/org/opensearch/sql/plugin/utils/SchedulerTest.java create mode 100644 plugin/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker diff --git a/.gitignore b/.gitignore index 46d984ce4f..2b1f9be522 100644 --- a/.gitignore +++ b/.gitignore @@ -46,4 +46,5 @@ gen /.prom.pid.lock .java-version -.worktrees \ No newline at end of file +.worktrees +http-client.env.json \ No newline at end of file diff --git a/build.gradle b/build.gradle index fe5856f2e5..e42c6ea055 100644 --- a/build.gradle +++ b/build.gradle @@ -12,6 +12,7 @@ buildscript { version_tokens = opensearch_version.tokenize('-') opensearch_build = version_tokens[0] + '.0' prometheus_binary_version = "2.37.2" + common_utils_version = System.getProperty("common_utils.version", opensearch_build) if (buildVersionQualifier) { opensearch_build += "-${buildVersionQualifier}" } diff --git a/common/build.gradle b/common/build.gradle index 533fccd9b2..da6b591961 100644 --- a/common/build.gradle +++ b/common/build.gradle @@ -35,7 +35,8 @@ dependencies { api "org.antlr:antlr4-runtime:4.7.1" api group: 'com.google.guava', name: 'guava', version: '31.0.1-jre' api group: 'org.apache.logging.log4j', name: 'log4j-core', version:'2.17.1' - api group: 'org.apache.commons', name: 'commons-lang3', version: '3.10' + api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0' + api 'com.amazonaws:aws-encryption-sdk-java:2.4.0' testImplementation group: 'junit', name: 'junit', version: '4.13.2' testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.9.1' diff --git a/common/src/main/java/org/opensearch/sql/common/encryptor/Encryptor.java b/common/src/main/java/org/opensearch/sql/common/encryptor/Encryptor.java new file mode 100644 index 0000000000..a886b72328 --- /dev/null +++ b/common/src/main/java/org/opensearch/sql/common/encryptor/Encryptor.java @@ -0,0 +1,28 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.common.encryptor; + +public interface Encryptor { + + /** + * Takes plaintext and returns encrypted text. + * + * @param plainText plainText. + * @return String encryptedText. + */ + String encrypt(String plainText); + + /** + * Takes encryptedText and returns plain text. + * + * @param encryptedText encryptedText. + * @return String plainText. + */ + String decrypt(String encryptedText); + +} \ No newline at end of file diff --git a/common/src/main/java/org/opensearch/sql/common/encryptor/EncryptorImpl.java b/common/src/main/java/org/opensearch/sql/common/encryptor/EncryptorImpl.java new file mode 100644 index 0000000000..05a0d358fd --- /dev/null +++ b/common/src/main/java/org/opensearch/sql/common/encryptor/EncryptorImpl.java @@ -0,0 +1,55 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.common.encryptor; + +import com.amazonaws.encryptionsdk.AwsCrypto; +import com.amazonaws.encryptionsdk.CommitmentPolicy; +import com.amazonaws.encryptionsdk.CryptoResult; +import com.amazonaws.encryptionsdk.jce.JceMasterKey; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import javax.crypto.spec.SecretKeySpec; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +public class EncryptorImpl implements Encryptor { + + private final String masterKey; + + @Override + public String encrypt(String plainText) { + + final AwsCrypto crypto = AwsCrypto.builder() + .withCommitmentPolicy(CommitmentPolicy.RequireEncryptRequireDecrypt) + .build(); + + JceMasterKey jceMasterKey + = JceMasterKey.getInstance(new SecretKeySpec(masterKey.getBytes(), "AES"), "Custom", "", + "AES/GCM/NoPadding"); + + final CryptoResult encryptResult = crypto.encryptData(jceMasterKey, + plainText.getBytes(StandardCharsets.UTF_8)); + return Base64.getEncoder().encodeToString(encryptResult.getResult()); + } + + @Override + public String decrypt(String encryptedText) { + final AwsCrypto crypto = AwsCrypto.builder() + .withCommitmentPolicy(CommitmentPolicy.RequireEncryptRequireDecrypt) + .build(); + + JceMasterKey jceMasterKey + = JceMasterKey.getInstance(new SecretKeySpec(masterKey.getBytes(), "AES"), "Custom", "", + "AES/GCM/NoPadding"); + + final CryptoResult decryptedResult + = crypto.decryptData(jceMasterKey, Base64.getDecoder().decode(encryptedText)); + return new String(decryptedResult.getResult()); + } + +} \ No newline at end of file diff --git a/core/build.gradle b/core/build.gradle index f5a4a4907a..b3c900e538 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -35,12 +35,13 @@ repositories { dependencies { api group: 'com.google.guava', name: 'guava', version: '31.0.1-jre' - api group: 'org.apache.commons', name: 'commons-lang3', version: '3.10' + api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0' api group: 'com.facebook.presto', name: 'presto-matching', version: '0.240' api group: 'org.apache.commons', name: 'commons-math3', version: '3.6.1' api "com.fasterxml.jackson.core:jackson-core:${versions.jackson}" api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}" api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}" + api group: 'com.google.code.gson', name: 'gson', version: '2.8.9' api project(':common') testImplementation('org.junit.jupiter:junit-jupiter:5.6.2') diff --git a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java index 533977197f..ba40020782 100644 --- a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java +++ b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Objects; import java.util.Optional; -import java.util.Set; import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -64,7 +63,6 @@ import org.opensearch.sql.data.model.ExprMissingValue; import org.opensearch.sql.data.type.ExprCoreType; import org.opensearch.sql.datasource.DataSourceService; -import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.exception.SemanticCheckException; import org.opensearch.sql.expression.DSL; import org.opensearch.sql.expression.Expression; @@ -134,13 +132,8 @@ public LogicalPlan analyze(UnresolvedPlan unresolved, AnalysisContext context) { @Override public LogicalPlan visitRelation(Relation node, AnalysisContext context) { QualifiedName qualifiedName = node.getTableQualifiedName(); - Set allowedDataSourceNames = dataSourceService.getDataSourceMetadataSet() - .stream() - .map(DataSourceMetadata::getName) - .collect(Collectors.toSet()); DataSourceSchemaIdentifierNameResolver dataSourceSchemaIdentifierNameResolver - = new DataSourceSchemaIdentifierNameResolver(qualifiedName.getParts(), - allowedDataSourceNames); + = new DataSourceSchemaIdentifierNameResolver(dataSourceService, qualifiedName.getParts()); String tableName = dataSourceSchemaIdentifierNameResolver.getIdentifierName(); context.push(); TypeEnvironment curEnv = context.peek(); @@ -182,13 +175,9 @@ public LogicalPlan visitRelationSubquery(RelationSubquery node, AnalysisContext @Override public LogicalPlan visitTableFunction(TableFunction node, AnalysisContext context) { QualifiedName qualifiedName = node.getFunctionName(); - Set allowedDataSourceNames = dataSourceService.getDataSourceMetadataSet() - .stream() - .map(DataSourceMetadata::getName) - .collect(Collectors.toSet()); DataSourceSchemaIdentifierNameResolver dataSourceSchemaIdentifierNameResolver - = new DataSourceSchemaIdentifierNameResolver(qualifiedName.getParts(), - allowedDataSourceNames); + = new DataSourceSchemaIdentifierNameResolver(this.dataSourceService, + qualifiedName.getParts()); FunctionName functionName = FunctionName.of(dataSourceSchemaIdentifierNameResolver.getIdentifierName()); diff --git a/core/src/main/java/org/opensearch/sql/analysis/DataSourceSchemaIdentifierNameResolver.java b/core/src/main/java/org/opensearch/sql/analysis/DataSourceSchemaIdentifierNameResolver.java index f3552b029b..1bb8316907 100644 --- a/core/src/main/java/org/opensearch/sql/analysis/DataSourceSchemaIdentifierNameResolver.java +++ b/core/src/main/java/org/opensearch/sql/analysis/DataSourceSchemaIdentifierNameResolver.java @@ -7,8 +7,13 @@ package org.opensearch.sql.analysis; +import com.google.common.collect.ImmutableSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; +import org.opensearch.sql.ast.expression.QualifiedName; +import org.opensearch.sql.datasource.DataSourceService; +import org.opensearch.sql.datasource.model.DataSourceMetadata; public class DataSourceSchemaIdentifierNameResolver { @@ -19,6 +24,7 @@ public class DataSourceSchemaIdentifierNameResolver { private String dataSourceName = DEFAULT_DATASOURCE_NAME; private String schemaName = DEFAULT_SCHEMA_NAME; private String identifierName; + private DataSourceService dataSourceService; private static final String DOT = "."; @@ -28,13 +34,14 @@ public class DataSourceSchemaIdentifierNameResolver { * DataSourceSchemaTable name and DataSourceSchemaFunction in case of table * functions. * + * @param dataSourceService {@link DataSourceService}. * @param parts parts of qualifiedName. - * @param allowedDataSources allowedDataSources. */ - public DataSourceSchemaIdentifierNameResolver(List parts, - Set allowedDataSources) { + public DataSourceSchemaIdentifierNameResolver(DataSourceService dataSourceService, + List parts) { + this.dataSourceService = dataSourceService; List remainingParts - = captureSchemaName(captureDataSourceName(parts, allowedDataSources)); + = captureSchemaName(captureDataSourceName(parts)); identifierName = String.join(DOT, remainingParts); } @@ -53,9 +60,8 @@ public String getSchemaName() { // Capture datasource name and return remaining parts(schema name and table name) // from the fully qualified name. - private List captureDataSourceName(List parts, Set allowedDataSources) { - if (parts.size() > 1 && allowedDataSources.contains(parts.get(0)) - || DEFAULT_DATASOURCE_NAME.equals(parts.get(0))) { + private List captureDataSourceName(List parts) { + if (parts.size() > 1 && dataSourceService.dataSourceExists(parts.get(0))) { dataSourceName = parts.get(0); return parts.subList(1, parts.size()); } else { @@ -76,5 +82,4 @@ private List captureSchemaName(List parts) { } } - } diff --git a/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java b/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java index e45b9cd9c8..f621ce5c55 100644 --- a/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java +++ b/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java @@ -34,9 +34,9 @@ public interface DataSourceService { /** * Register {@link DataSource} defined by {@link DataSourceMetadata}. * - * @param metadatas list of {@link DataSourceMetadata}. + * @param metadata {@link DataSourceMetadata}. */ - void createDataSource(DataSourceMetadata... metadatas); + void createDataSource(DataSourceMetadata metadata); /** * Updates {@link DataSource} corresponding to dataSourceMetadata. @@ -54,13 +54,10 @@ public interface DataSourceService { void deleteDataSource(String dataSourceName); /** - * This method is to bootstrap - * datasources during the startup of the plugin. - */ - void bootstrapDataSources(); - - /** - * remove all the registered {@link DataSource}. + * Returns true {@link Boolean} if datasource with dataSourceName exists + * or else false {@link Boolean}. + * + * @param dataSourceName name of the {@link DataSource}. */ - void clear(); + Boolean dataSourceExists(String dataSourceName); } diff --git a/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java b/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java index 915e5aa287..51bad94af8 100644 --- a/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java +++ b/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java @@ -5,12 +5,15 @@ package org.opensearch.sql.datasource; +import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME; + import com.google.common.base.Preconditions; import com.google.common.base.Strings; -import com.google.common.collect.ImmutableMap; -import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -32,46 +35,62 @@ public class DataSourceServiceImpl implements DataSourceService { private static String DATASOURCE_NAME_REGEX = "[@*A-Za-z]+?[*a-zA-Z_\\-0-9]*"; - private final ConcurrentHashMap dataSourceMap; + private final ConcurrentHashMap dataSourceMap; private final Map dataSourceFactoryMap; + private final DataSourceMetadataStorage dataSourceMetadataStorage; + + private final DataSourceUserAuthorizationHelper dataSourceUserAuthorizationHelper; + /** * Construct from the set of {@link DataSourceFactory} at bootstrap time. */ - public DataSourceServiceImpl(Set dataSourceFactories) { + public DataSourceServiceImpl(Set dataSourceFactories, + DataSourceMetadataStorage dataSourceMetadataStorage, + DataSourceUserAuthorizationHelper + dataSourceUserAuthorizationHelper) { dataSourceFactoryMap = dataSourceFactories.stream() .collect(Collectors.toMap(DataSourceFactory::getDataSourceType, f -> f)); dataSourceMap = new ConcurrentHashMap<>(); + this.dataSourceMetadataStorage = dataSourceMetadataStorage; + this.dataSourceUserAuthorizationHelper = dataSourceUserAuthorizationHelper; } @Override public Set getDataSourceMetadataSet() { - return dataSourceMap.values().stream() - .map(dataSource - -> new DataSourceMetadata(dataSource.getName(), - dataSource.getConnectorType(), ImmutableMap.of())) - .collect(Collectors.toSet()); + List dataSourceMetadataList + = this.dataSourceMetadataStorage.getDataSourceMetadata(); + Set dataSourceMetadataSet = new HashSet<>(dataSourceMetadataList); + dataSourceMetadataSet.add(DataSourceMetadata.defaultOpenSearchDataSourceMetadata()); + return dataSourceMetadataSet; } + @Override public DataSource getDataSource(String dataSourceName) { - if (!dataSourceMap.containsKey(dataSourceName)) { + Optional + dataSourceMetadataOptional = getDataSourceMetadata(dataSourceName); + if (dataSourceMetadataOptional.isEmpty()) { throw new IllegalArgumentException( String.format("DataSource with name %s doesn't exist.", dataSourceName)); + } else { + DataSourceMetadata dataSourceMetadata = dataSourceMetadataOptional.get(); + this.dataSourceUserAuthorizationHelper + .authorizeDataSource(dataSourceMetadata); + return getDataSourceFromMetadata(dataSourceMetadata); } - return dataSourceMap.get(dataSourceName); } @Override - public void createDataSource(DataSourceMetadata... metadatas) { - for (DataSourceMetadata metadata : metadatas) { - validateDataSourceMetaData(metadata); - dataSourceMap.put( - metadata.getName(), - dataSourceFactoryMap.get(metadata.getConnector()).createDataSource(metadata)); + public void createDataSource(DataSourceMetadata metadata) { + validateDataSourceMetaData(metadata); + if (!metadata.getName().equals(DEFAULT_DATASOURCE_NAME)) { + this.dataSourceMetadataStorage.createDataSourceMetadata(metadata); } + dataSourceMap.put(metadata, + dataSourceFactoryMap.get(metadata.getConnector()).createDataSource(metadata)); } @Override @@ -85,14 +104,11 @@ public void deleteDataSource(String dataSourceName) { } @Override - public void bootstrapDataSources() { - throw new UnsupportedOperationException("will be supported in future"); + public Boolean dataSourceExists(String dataSourceName) { + return DEFAULT_DATASOURCE_NAME.equals(dataSourceName) + || this.dataSourceMetadataStorage.getDataSourceMetadata(dataSourceName).isPresent(); } - @Override - public void clear() { - dataSourceMap.clear(); - } /** * This can be moved to a different validator class when we introduce more connectors. @@ -103,11 +119,6 @@ private void validateDataSourceMetaData(DataSourceMetadata metadata) { Preconditions.checkArgument( !Strings.isNullOrEmpty(metadata.getName()), "Missing Name Field from a DataSource. Name is a required parameter."); - Preconditions.checkArgument( - !dataSourceMap.containsKey(metadata.getName()), - StringUtils.format( - "Datasource name should be unique, Duplicate datasource found %s.", - metadata.getName())); Preconditions.checkArgument( metadata.getName().matches(DATASOURCE_NAME_REGEX), StringUtils.format( @@ -115,6 +126,31 @@ private void validateDataSourceMetaData(DataSourceMetadata metadata) { metadata.getName())); Preconditions.checkArgument( !Objects.isNull(metadata.getProperties()), - "Missing properties field in catalog configuration. Properties are required parameters."); + "Missing properties field in datasource configuration." + + " Properties are required parameters."); } + + private Optional getDataSourceMetadata(String dataSourceName) { + if (dataSourceName.equals(DEFAULT_DATASOURCE_NAME)) { + return Optional.of(DataSourceMetadata.defaultOpenSearchDataSourceMetadata()); + } else { + return this.dataSourceMetadataStorage.getDataSourceMetadata(dataSourceName); + } + } + + private DataSource getDataSourceFromMetadata(DataSourceMetadata dataSourceMetadata) { + if (!dataSourceMap.containsKey(dataSourceMetadata)) { + clearDataSource(dataSourceMetadata); + dataSourceMap.put(dataSourceMetadata, + dataSourceFactoryMap.get(dataSourceMetadata.getConnector()) + .createDataSource(dataSourceMetadata)); + } + return dataSourceMap.get(dataSourceMetadata); + } + + private void clearDataSource(DataSourceMetadata dataSourceMetadata) { + dataSourceMap.entrySet() + .removeIf(entry -> entry.getKey().getName().equals(dataSourceMetadata.getName())); + } + } diff --git a/core/src/main/java/org/opensearch/sql/datasource/DataSourceUserAuthorizationHelper.java b/core/src/main/java/org/opensearch/sql/datasource/DataSourceUserAuthorizationHelper.java new file mode 100644 index 0000000000..dbbe82a527 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/datasource/DataSourceUserAuthorizationHelper.java @@ -0,0 +1,19 @@ +package org.opensearch.sql.datasource; + +import java.util.List; +import org.opensearch.sql.datasource.model.DataSourceMetadata; + +/** + * Interface for datasource authorization helper. + * The implementation of this class helps in determining + * if authorization is required and the roles associated with the user. + */ +public interface DataSourceUserAuthorizationHelper { + + /** + * Authorizes DataSource within the current context. + * + * @param dataSourceMetadata {@link DataSourceMetadata} + */ + void authorizeDataSource(DataSourceMetadata dataSourceMetadata); +} diff --git a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java index 854e489acd..27d06d8151 100644 --- a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java +++ b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java @@ -12,6 +12,10 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableMap; +import com.google.gson.Gson; +import java.io.IOException; +import java.util.Collections; +import java.util.List; import java.util.Map; import lombok.AllArgsConstructor; import lombok.EqualsAndHashCode; @@ -21,22 +25,25 @@ import lombok.Setter; import org.opensearch.sql.datasource.DataSourceService; -@JsonIgnoreProperties(ignoreUnknown = true) @Getter @Setter @AllArgsConstructor @NoArgsConstructor @EqualsAndHashCode +@JsonIgnoreProperties(ignoreUnknown = true) public class DataSourceMetadata { - @JsonProperty(required = true) + @JsonProperty private String name; - @JsonProperty(required = true) + @JsonProperty @JsonFormat(with = JsonFormat.Feature.ACCEPT_CASE_INSENSITIVE_PROPERTIES) private DataSourceType connector; - @JsonProperty(required = true) + @JsonProperty + private List allowedRoles; + + @JsonProperty private Map properties; /** @@ -45,6 +52,6 @@ public class DataSourceMetadata { */ public static DataSourceMetadata defaultOpenSearchDataSourceMetadata() { return new DataSourceMetadata(DEFAULT_DATASOURCE_NAME, - DataSourceType.OPENSEARCH, ImmutableMap.of()); + DataSourceType.OPENSEARCH, Collections.emptyList(), ImmutableMap.of()); } } diff --git a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java index 1d3ef6948d..48098b9741 100644 --- a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java +++ b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java @@ -6,7 +6,32 @@ package org.opensearch.sql.datasource.model; public enum DataSourceType { - PROMETHEUS, - OPENSEARCH, - JDBC + PROMETHEUS("prometheus"), + OPENSEARCH("opensearch"), + JDBC("jdbc"); + + private String text; + + DataSourceType(String text) { + this.text = text; + } + + public String getText() { + return this.text; + } + + /** + * Get DataSourceType from text. + * + * @param text text. + * @return DataSourceType {@link DataSourceType}. + */ + public static DataSourceType fromString(String text) { + for (DataSourceType dataSourceType : DataSourceType.values()) { + if (dataSourceType.text.equalsIgnoreCase(text)) { + return dataSourceType; + } + } + throw new IllegalArgumentException("No DataSourceType with text " + text + " found"); + } } diff --git a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java index 1203232d33..a040e2a53f 100644 --- a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java +++ b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java @@ -12,8 +12,6 @@ import static org.opensearch.sql.data.type.ExprCoreType.STRING; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -190,7 +188,13 @@ private class DefaultDataSourceService implements DataSourceService { public Set getDataSourceMetadataSet() { return Stream.of(opensearchDataSource, prometheusDataSource) .map(ds -> new DataSourceMetadata(ds.getName(), - ds.getConnectorType(), ImmutableMap.of())).collect(Collectors.toSet()); + ds.getConnectorType(),Collections.emptyList(), + ImmutableMap.of())).collect(Collectors.toSet()); + } + + @Override + public void createDataSource(DataSourceMetadata metadata) { + throw new UnsupportedOperationException("unsupported operation"); } @Override @@ -202,11 +206,6 @@ public DataSource getDataSource(String dataSourceName) { } } - @Override - public void createDataSource(DataSourceMetadata... metadatas) { - throw new UnsupportedOperationException(); - } - @Override public void updateDataSource(DataSourceMetadata dataSourceMetadata) { @@ -214,17 +213,12 @@ public void updateDataSource(DataSourceMetadata dataSourceMetadata) { @Override public void deleteDataSource(String dataSourceName) { - - } - - @Override - public void bootstrapDataSources() { - } @Override - public void clear() { - throw new UnsupportedOperationException(); + public Boolean dataSourceExists(String dataSourceName) { + return dataSourceName.equals(DEFAULT_DATASOURCE_NAME) + || dataSourceName.equals("prometheus"); } } diff --git a/core/src/test/java/org/opensearch/sql/analysis/model/DataSourceSchemaIdentifierNameResolverTest.java b/core/src/test/java/org/opensearch/sql/analysis/model/DataSourceSchemaIdentifierNameResolverTest.java index b36deb57f8..ac429e89a0 100644 --- a/core/src/test/java/org/opensearch/sql/analysis/model/DataSourceSchemaIdentifierNameResolverTest.java +++ b/core/src/test/java/org/opensearch/sql/analysis/model/DataSourceSchemaIdentifierNameResolverTest.java @@ -9,6 +9,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME; import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_SCHEMA_NAME; import static org.opensearch.sql.analysis.model.DataSourceSchemaIdentifierNameResolverTest.Identifier.identifierOf; @@ -18,14 +20,23 @@ import java.util.List; import java.util.Set; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver; +import org.opensearch.sql.datasource.DataSourceService; +@ExtendWith(MockitoExtension.class) public class DataSourceSchemaIdentifierNameResolverTest { + @Mock + private DataSourceService dataSourceService; + @Test void testFullyQualifiedName() { + when(dataSourceService.dataSourceExists("prom")).thenReturn(Boolean.TRUE); identifierOf( - Arrays.asList("prom", "information_schema", "tables"), Collections.singleton("prom")) + Arrays.asList("prom", "information_schema", "tables"), dataSourceService) .datasource("prom") .schema("information_schema") .name("tables"); @@ -33,19 +44,22 @@ void testFullyQualifiedName() { @Test void defaultDataSourceNameResolve() { - identifierOf(Arrays.asList("tables"), Collections.emptySet()) + when(dataSourceService.dataSourceExists(any())).thenReturn(Boolean.FALSE); + identifierOf(Arrays.asList("tables"), dataSourceService) .datasource(DEFAULT_DATASOURCE_NAME) .schema(DEFAULT_SCHEMA_NAME) .name("tables"); - identifierOf(Arrays.asList("information_schema", "tables"), Collections.emptySet()) + when(dataSourceService.dataSourceExists(any())).thenReturn(Boolean.FALSE); + identifierOf(Arrays.asList("information_schema", "tables"), dataSourceService) .datasource(DEFAULT_DATASOURCE_NAME) .schema("information_schema") .name("tables"); + when(dataSourceService.dataSourceExists(any())).thenReturn(Boolean.TRUE); identifierOf( Arrays.asList(DEFAULT_DATASOURCE_NAME, "information_schema", "tables"), - Collections.emptySet()) + dataSourceService) .datasource(DEFAULT_DATASOURCE_NAME) .schema("information_schema") .name("tables"); @@ -54,12 +68,13 @@ void defaultDataSourceNameResolve() { static class Identifier { private final DataSourceSchemaIdentifierNameResolver resolver; - protected static Identifier identifierOf(List parts, Set allowedDataSources) { - return new Identifier(parts, allowedDataSources); + protected static Identifier identifierOf(List parts, + DataSourceService dataSourceService) { + return new Identifier(parts, dataSourceService); } - Identifier(List parts, Set allowedDataSources) { - resolver = new DataSourceSchemaIdentifierNameResolver(parts, allowedDataSources); + Identifier(List parts, DataSourceService dataSourceService) { + resolver = new DataSourceSchemaIdentifierNameResolver(dataSourceService, parts); } Identifier datasource(String expectedDatasource) { diff --git a/core/src/test/java/org/opensearch/sql/datasource/DataSourceServiceImplTest.java b/core/src/test/java/org/opensearch/sql/datasource/DataSourceServiceImplTest.java index b623313c96..68a9475f76 100644 --- a/core/src/test/java/org/opensearch/sql/datasource/DataSourceServiceImplTest.java +++ b/core/src/test/java/org/opensearch/sql/datasource/DataSourceServiceImplTest.java @@ -6,17 +6,28 @@ package org.opensearch.sql.datasource; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME; import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; -import org.junit.jupiter.api.AfterEach; +import java.util.Optional; +import java.util.Set; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -33,9 +44,15 @@ class DataSourceServiceImplTest { static final String NAME = "opensearch"; - @Mock private DataSourceFactory dataSourceFactory; + @Mock + private DataSourceFactory dataSourceFactory; + @Mock + private StorageEngine storageEngine; + @Mock + private DataSourceMetadataStorage dataSourceMetadataStorage; - @Mock private StorageEngine storageEngine; + @Mock + private DataSourceUserAuthorizationHelper dataSourceUserAuthorizationHelper; private DataSourceService dataSourceService; @@ -56,102 +73,158 @@ public void setup() { { add(dataSourceFactory); } - }); - } - - @AfterEach - public void clear() { - dataSourceService.clear(); + }, dataSourceMetadataStorage, + dataSourceUserAuthorizationHelper); } @Test - void getDataSourceSuccess() { + void testGetDataSourceForDefaultOpenSearchDataSource() { + doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(any()); dataSourceService.createDataSource(DataSourceMetadata.defaultOpenSearchDataSourceMetadata()); - assertEquals( new DataSource(DEFAULT_DATASOURCE_NAME, DataSourceType.OPENSEARCH, storageEngine), dataSourceService.getDataSource(DEFAULT_DATASOURCE_NAME)); + verifyNoInteractions(dataSourceMetadataStorage); } @Test - void getNotExistDataSourceShouldFail() { + void testGetDataSourceForNonExistingDataSource() { + when(dataSourceMetadataStorage.getDataSourceMetadata("test")) + .thenReturn(Optional.empty()); IllegalArgumentException exception = - assertThrows(IllegalArgumentException.class, () -> dataSourceService.getDataSource("mock")); - assertEquals("DataSource with name mock doesn't exist.", exception.getMessage()); + assertThrows( + IllegalArgumentException.class, + () -> + dataSourceService.getDataSource("test")); + assertEquals("DataSource with name test doesn't exist.", exception.getMessage()); + verify(dataSourceMetadataStorage, times(1)) + .getDataSourceMetadata("test"); + } + + @Test + void testGetDataSourceSuccessCase() { + DataSourceMetadata dataSourceMetadata = metadata("test", DataSourceType.OPENSEARCH, + Collections.emptyList(), ImmutableMap.of()); + doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + when(dataSourceMetadataStorage.getDataSourceMetadata("test")) + .thenReturn(Optional.of(dataSourceMetadata)); + DataSource dataSource = dataSourceService.getDataSource("test"); + assertEquals("test", dataSource.getName()); + assertEquals(DataSourceType.OPENSEARCH, dataSource.getConnectorType()); + verify(dataSourceMetadataStorage, times(1)).getDataSourceMetadata("test"); + verify(dataSourceFactory, times(1)) + .createDataSource(dataSourceMetadata); } @Test - void getAddDataSourcesShouldSuccess() { - assertEquals(0, dataSourceService.getDataSourceMetadataSet().size()); + void testGetDataSourceWithAuthorizationFailure() { + DataSourceMetadata dataSourceMetadata = metadata("test", DataSourceType.OPENSEARCH, + Collections.singletonList("prometheus_access"), ImmutableMap.of()); + doThrow(new SecurityException("User is not authorized to access datasource test. " + + "User should be mapped to any of the roles in [prometheus_access] for access.")) + .when(dataSourceUserAuthorizationHelper) + .authorizeDataSource(dataSourceMetadata); + when(dataSourceMetadataStorage.getDataSourceMetadata("test")) + .thenReturn(Optional.of(dataSourceMetadata)); + + + SecurityException securityException + = Assertions.assertThrows(SecurityException.class, + () -> dataSourceService.getDataSource("test")); + Assertions.assertEquals("User is not authorized to access datasource test. " + + "User should be mapped to any of the roles in [prometheus_access] for access.", + securityException.getMessage()); - dataSourceService.createDataSource(metadata(NAME, - DataSourceType.OPENSEARCH, ImmutableMap.of())); - assertEquals(1, dataSourceService.getDataSourceMetadataSet().size()); + verify(dataSourceMetadataStorage, times(1)).getDataSourceMetadata("test"); + verify(dataSourceFactory, times(0)).createDataSource(dataSourceMetadata); } + @Test - void noDataSourceExistAfterClear() { - dataSourceService.createDataSource(metadata(NAME, - DataSourceType.OPENSEARCH, ImmutableMap.of())); - assertEquals(1, dataSourceService.getDataSourceMetadataSet().size()); + void testCreateDataSourceSuccessCase() { + + DataSourceMetadata dataSourceMetadata = metadata("testDS", DataSourceType.OPENSEARCH, + Collections.emptyList(), ImmutableMap.of()); + dataSourceService.createDataSource(dataSourceMetadata); + verify(dataSourceMetadataStorage, times(1)) + .createDataSourceMetadata(dataSourceMetadata); + verify(dataSourceFactory, times(1)) + .createDataSource(dataSourceMetadata); - dataSourceService.clear(); - assertEquals(0, dataSourceService.getDataSourceMetadataSet().size()); + when(dataSourceMetadataStorage.getDataSourceMetadata("testDS")) + .thenReturn(Optional.ofNullable(metadata("testDS", DataSourceType.OPENSEARCH, + Collections.emptyList(), ImmutableMap.of()))); + DataSource dataSource = dataSourceService.getDataSource("testDS"); + assertEquals("testDS", dataSource.getName()); + assertEquals(storageEngine, dataSource.getStorageEngine()); + assertEquals(DataSourceType.OPENSEARCH, dataSource.getConnectorType()); + verifyNoMoreInteractions(dataSourceFactory); } @Test - void metaDataMissingNameShouldFail() { + void testCreateDataSourceWithDisallowedDatasourceName() { + DataSourceMetadata dataSourceMetadata = metadata("testDS$$$", DataSourceType.OPENSEARCH, + Collections.emptyList(), ImmutableMap.of()); IllegalArgumentException exception = assertThrows( IllegalArgumentException.class, () -> - dataSourceService.createDataSource( - metadata(null, DataSourceType.OPENSEARCH, ImmutableMap.of()))); - assertEquals( - "Missing Name Field from a DataSource. Name is a required parameter.", + dataSourceService.createDataSource(dataSourceMetadata)); + assertEquals("DataSource Name: testDS$$$ contains illegal characters." + + " Allowed characters: a-zA-Z0-9_-*@.", exception.getMessage()); + verify(dataSourceFactory, times(1)).getDataSourceType(); + verify(dataSourceFactory, times(0)).createDataSource(dataSourceMetadata); + verifyNoInteractions(dataSourceMetadataStorage); } @Test - void metaDataHasIllegalDataSourceNameShouldFail() { + void testCreateDataSourceWithEmptyDatasourceName() { + DataSourceMetadata dataSourceMetadata = metadata("", DataSourceType.OPENSEARCH, + Collections.emptyList(), ImmutableMap.of()); IllegalArgumentException exception = assertThrows( IllegalArgumentException.class, () -> - dataSourceService.createDataSource( - metadata("prometheus.test", DataSourceType.OPENSEARCH, ImmutableMap.of()))); - assertEquals( - "DataSource Name: prometheus.test contains illegal characters. " - + "Allowed characters: a-zA-Z0-9_-*@.", + dataSourceService.createDataSource(dataSourceMetadata)); + assertEquals("Missing Name Field from a DataSource. Name is a required parameter.", exception.getMessage()); + verify(dataSourceFactory, times(1)).getDataSourceType(); + verify(dataSourceFactory, times(0)).createDataSource(dataSourceMetadata); + verifyNoInteractions(dataSourceMetadataStorage); } @Test - void metaDataMissingPropertiesShouldFail() { + void testCreateDataSourceWithNullParameters() { + DataSourceMetadata dataSourceMetadata = metadata("testDS", DataSourceType.OPENSEARCH, + Collections.emptyList(), null); IllegalArgumentException exception = assertThrows( IllegalArgumentException.class, - () -> dataSourceService.createDataSource(metadata(NAME, - DataSourceType.OPENSEARCH, null))); - assertEquals( - "Missing properties field in catalog configuration. Properties are required parameters.", + () -> + dataSourceService.createDataSource(dataSourceMetadata)); + assertEquals("Missing properties field in datasource configuration. " + + "Properties are required parameters.", exception.getMessage()); + verify(dataSourceFactory, times(1)).getDataSourceType(); + verify(dataSourceFactory, times(0)).createDataSource(dataSourceMetadata); + verifyNoInteractions(dataSourceMetadataStorage); } @Test - void metaDataHasDuplicateNameShouldFail() { - dataSourceService.createDataSource(metadata(NAME, - DataSourceType.OPENSEARCH, ImmutableMap.of())); - assertEquals(1, dataSourceService.getDataSourceMetadataSet().size()); - - IllegalArgumentException exception = - assertThrows( - IllegalArgumentException.class, - () -> dataSourceService.createDataSource(metadata(NAME, - DataSourceType.OPENSEARCH, null))); - assertEquals( - String.format("Datasource name should be unique, Duplicate datasource found %s.", NAME), - exception.getMessage()); + void testGetDataSourceMetadataSet() { + when(dataSourceMetadataStorage.getDataSourceMetadata()).thenReturn(new ArrayList<>() { + { + add(metadata("testDS", DataSourceType.PROMETHEUS, Collections.emptyList(), + ImmutableMap.of())); + } + }); + Set dataSourceMetadataSet + = dataSourceService.getDataSourceMetadataSet(); + assertEquals(2, dataSourceMetadataSet.size()); + assertTrue(dataSourceMetadataSet + .contains(DataSourceMetadata.defaultOpenSearchDataSourceMetadata())); + verify(dataSourceMetadataStorage, times(1)).getDataSourceMetadata(); } @Test @@ -169,16 +242,29 @@ void testDeleteDatasource() { } @Test - void testLoadDatasource() { - assertThrows( - UnsupportedOperationException.class, - () -> dataSourceService.bootstrapDataSources()); + void testDataSourceExists() { + when(dataSourceMetadataStorage.getDataSourceMetadata("test")) + .thenReturn(Optional.empty()); + Assertions.assertFalse(dataSourceService.dataSourceExists("test")); + when(dataSourceMetadataStorage.getDataSourceMetadata("test")) + .thenReturn(Optional.of(metadata("test", DataSourceType.PROMETHEUS, + List.of(), ImmutableMap.of()))); + Assertions.assertTrue(dataSourceService.dataSourceExists("test")); + } + + @Test + void testDataSourceExistsForDefaultDataSource() { + Assertions.assertTrue(dataSourceService.dataSourceExists(DEFAULT_DATASOURCE_NAME)); + verifyNoInteractions(dataSourceMetadataStorage); } - DataSourceMetadata metadata(String name, DataSourceType type, Map properties) { + DataSourceMetadata metadata(String name, DataSourceType type, + List allowedRoles, + Map properties) { DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); dataSourceMetadata.setName(name); dataSourceMetadata.setConnector(type); + dataSourceMetadata.setAllowedRoles(allowedRoles); dataSourceMetadata.setProperties(properties); return dataSourceMetadata; } diff --git a/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java b/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java index 1c45807245..0f95f05944 100644 --- a/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java @@ -13,6 +13,7 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; +import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Set; @@ -59,7 +60,8 @@ void testIterator() { dataSourceSet.add(new DataSource("opensearch", DataSourceType.OPENSEARCH, storageEngine)); Set dataSourceMetadata = dataSourceSet.stream() .map(dataSource -> new DataSourceMetadata(dataSource.getName(), - dataSource.getConnectorType(), ImmutableMap.of())).collect(Collectors.toSet()); + dataSource.getConnectorType(), Collections.emptyList(), ImmutableMap.of())) + .collect(Collectors.toSet()); when(dataSourceService.getDataSourceMetadataSet()).thenReturn(dataSourceMetadata); assertFalse(dataSourceTableScan.hasNext()); diff --git a/docs/user/dql/metadata.rst b/docs/user/dql/metadata.rst index 7ffc2ffe38..21ddc84341 100644 --- a/docs/user/dql/metadata.rst +++ b/docs/user/dql/metadata.rst @@ -52,6 +52,7 @@ Result set: +---------+-----------+----------------+----------+-------+--------+----------+---------+-------------------------+--------------+ + Example 2: Show Specific Index Information ------------------------------------------ diff --git a/docs/user/ppl/admin/datasources.rst b/docs/user/ppl/admin/datasources.rst index 2974ac20ce..5b22ef94a2 100644 --- a/docs/user/ppl/admin/datasources.rst +++ b/docs/user/ppl/admin/datasources.rst @@ -25,7 +25,7 @@ Definitions of datasource and connector Example Prometheus Datasource Definition :: - [{ + { "name" : "my_prometheus", "connector": "prometheus", "properties" : { @@ -33,36 +33,85 @@ Example Prometheus Datasource Definition :: "prometheus.auth.type" : "basicauth", "prometheus.auth.username" : "admin", "prometheus.auth.password" : "admin" - } - }] + }, + "allowedRoles" : ["prometheus_access"] + } Datasource configuration Restrictions. * ``name``, ``connector``, ``properties`` are required fields in the datasource configuration. +* In case of secure domains, ``allowedRoles`` can be used to specify the opensearch roles allowed to access the datasource via PPL/SQL. +* If ``allowedRoles`` are not specified for a datasource, only users with ``all_access`` could access the datasource in case of secure domains. +* In case of security disabled domains, authorization is disbaled. * All the datasource names should be unique and match the following regex[``[@*A-Za-z]+?[*a-zA-Z_\-0-9]*``]. * Allowed Connectors. * ``prometheus`` [More details: `Prometheus Connector `_] * All the allowed config parameters in ``properties`` are defined in individual connector pages mentioned above. -Configuring a datasource in OpenSearch +Datasource configuration Management. ====================================== +Datasource configuration can be managed using below REST APIs. All the examples below are for OpenSearch domains enabled with secure domain. +we can remove authorization and other details in case of security disabled domains. -* Datasources are configured in opensearch keystore as secure settings under ``plugins.query.federation.datasources.config`` key as they contain credential info. -* A json file containing array of datasource configurations should be injected into keystore with the above mentioned key. sample json file can be seen in the above section. +* Datasource Creation POST API ("_plugins/_query/_datasources") :: + POST https://localhost:9200/_plugins/_query/_datasources + content-type: application/json + Authorization: Basic {{username}} {{password}} -[**To be run on all the nodes in the cluster**] Command to add datasources.json file to OpenSearch Keystore :: + { + "name" : "my_prometheus", + "connector": "prometheus", + "properties" : { + "prometheus.uri" : "http://localhost:8080", + "prometheus.auth.type" : "basicauth", + "prometheus.auth.username" : "admin", + "prometheus.auth.password" : "admin" + }, + "allowedRoles" : ["prometheus_access"] + } + +* Datasource modification PUT API ("_plugins/_query/_datasources") :: + + PUT https://localhost:9200/_plugins/_query/_datasources + content-type: application/json + Authorization: Basic {{username}} {{password}} + + { + "name" : "my_prometheus", + "connector": "prometheus", + "properties" : { + "prometheus.uri" : "http://localhost:8080", + "prometheus.auth.type" : "basicauth", + "prometheus.auth.username" : "admin", + "prometheus.auth.password" : "admin" + }, + "allowedRoles" : ["prometheus_access"] + } + +* Datasource Read GET API("_plugins/_query/_datasources/{{dataSourceName}}" :: + + GET https://localhost:9200/_plugins/_query/_datasources/my_prometheus + content-type: application/json + Authorization: Basic {{username}} {{password}} - >> bin/opensearch-keystore add-file plugins.query.federation.datasource.config datasources.json + **Authentication Information won't be vended out in GET API's response.** -Datasources can be configured during opensearch start up or can be updated while the opensearch is running. -If we update a datasource configuration during runtime, the following api should be triggered to update the query engine with the latest changes. +* Datasource Deletion DELETE API("_plugins/_query/_datasources/{{dataSourceName}}") :: -[**Required only if we update keystore settings during runtime**] Secure Settings refresh api:: + DELETE https://localhost:9200/_plugins/_query/_datasources/my_prometheus + content-type: application/json + Authorization: Basic {{username}} {{password}} - >> curl --request POST \ - --url http://{{opensearch-domain}}:9200/_nodes/reload_secure_settings \ - --data '{"secure_settings_password":"{{keystore-password}}"}' +Authorization of datasource configuration APIs +============================================== +Each of the datasource configuration management apis are controlled by following actions respectively. +* cluster:admin/opensearch/datasources/create [Create POST API] +* cluster:admin/opensearch/datasources/read [Get GET API] +* cluster:admin/opensearch/datasources/update [Update PUT API] +* cluster:admin/opensearch/datasources/delete [Delete DELETE API] + +Only users mapped with roles having above actions are authorized to execute datasource management apis. Using a datasource in PPL command ==================================== @@ -77,6 +126,24 @@ Example source command with prometheus datasource :: >> source = my_prometheus.prometheus_http_requests_total | stats avg(@value) by job; +Authorization of PPL commands on datasources +============================================== +In case of secure opensearch domains, only admins and users with roles mentioned in datasource configuration are allowed to make queries. +For example: with below datasource configuration, only admins and users with prometheus_access role can run queries on my_prometheus datasource. :: + + { + "name" : "my_prometheus", + "connector": "prometheus", + "properties" : { + "prometheus.uri" : "http://localhost:8080", + "prometheus.auth.type" : "basicauth", + "prometheus.auth.username" : "admin", + "prometheus.auth.password" : "admin" + }, + "allowedRoles" : ["prometheus_access"] + } + + Limitations of datasource ==================================== Datasource settings are global and users with PPL access are allowed to fetch data from all the defined datasources. diff --git a/doctest/test_data/datasources.json b/doctest/test_data/datasources.json new file mode 100644 index 0000000000..a75a7a68d3 --- /dev/null +++ b/doctest/test_data/datasources.json @@ -0,0 +1 @@ +{ "name" : "my_prometheus", "connector": "prometheus", "properties" : { "prometheus.uri" : "http://localhost:9090"}} \ No newline at end of file diff --git a/doctest/test_docs.py b/doctest/test_docs.py index b5edf46de9..c517b2756c 100644 --- a/doctest/test_docs.py +++ b/doctest/test_docs.py @@ -27,6 +27,7 @@ BOOKS = "books" APACHE = "apache" WILDCARD = "wildcard" +DATASOURCES = ".ql-datasources" class DocTestConnection(OpenSearchConnection): @@ -94,6 +95,7 @@ def set_up_test_indices(test): load_file("books.json", index_name=BOOKS) load_file("apache.json", index_name=APACHE) load_file("wildcard.json", index_name=WILDCARD) + load_file("datasources.json", index_name=DATASOURCES) def load_file(filename, index_name): diff --git a/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java b/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java new file mode 100644 index 0000000000..e324e976ba --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java @@ -0,0 +1,49 @@ +package org.opensearch.sql.datasource;/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + + +import com.google.gson.Gson; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import org.junit.Assert; +import org.junit.Test; +import org.opensearch.client.Request; +import org.opensearch.client.RequestOptions; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.ppl.PPLIntegTestCase; + +public class DataSourceAPIsIT extends PPLIntegTestCase { + + @Test + public void createDataSourceTest() throws IOException { + Request request = getCreateDataSourceRequest(getDataSourceMetadataJsonString()); + String response = executeRequest(request); + Assert.assertEquals("Created DataSource with name prometheus1", response); + } + + private Request getCreateDataSourceRequest(String dataSourceMetadataJson) { + Request request = new Request("POST", "/_plugins/_query/_datasources"); + request.setJsonEntity(dataSourceMetadataJson); + RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); + restOptionsBuilder.addHeader("Content-Type", "application/json"); + request.setOptions(restOptionsBuilder); + return request; + } + + private String getDataSourceMetadataJsonString() { + DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); + dataSourceMetadata.setName("prometheus1"); + dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS); + dataSourceMetadata.setAllowedRoles(new ArrayList<>()); + Map propertiesMap = new HashMap<>(); + propertiesMap.put("prometheus.uri", "http://localhost:9200"); + dataSourceMetadata.setProperties(propertiesMap); + return new Gson().toJson(dataSourceMetadata); + } + +} diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java index 0cfc4a6aa6..360497300e 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java @@ -323,7 +323,6 @@ private String executeRequest(final String requestBody, final boolean isExplainQ } protected static String executeRequest(final Request request) throws IOException { - Response response = client().performRequest(request); Assert.assertEquals(200, response.getStatusLine().getStatusCode()); return getResponseBody(response); @@ -593,7 +592,12 @@ public enum Index { WILDCARD(TestsConstants.TEST_INDEX_WILDCARD, "wildcard", getMappingFile("wildcard_index_mappings.json"), - "src/test/resources/wildcard.json"),; + "src/test/resources/wildcard.json"), + + DATASOURCES(TestsConstants.DATASOURCES, + "datasource", + getMappingFile("datasources_index_mappings.json"), + "src/test/resources/datasources.json"); private final String name; private final String type; diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java b/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java index c79314af6a..e46993cd17 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java @@ -55,6 +55,7 @@ public class TestsConstants { public final static String TEST_INDEX_NULL_MISSING = TEST_INDEX + "_null_missing"; public final static String TEST_INDEX_CALCS = TEST_INDEX + "_calcs"; public final static String TEST_INDEX_WILDCARD = TEST_INDEX + "_wildcard"; + public final static String DATASOURCES = ".ql-datasources"; public final static String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; public final static String TS_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS"; diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/InformationSchemaCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/InformationSchemaCommandIT.java index 8e7c03777e..a92c3ba50b 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/InformationSchemaCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/InformationSchemaCommandIT.java @@ -14,10 +14,20 @@ import java.io.IOException; import org.json.JSONObject; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; public class InformationSchemaCommandIT extends PPLIntegTestCase { + + @Override + protected void init() throws Exception { + loadIndex(Index.DATASOURCES); + } + @Test public void testSearchTablesFromPrometheusCatalog() throws IOException { JSONObject result = diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusDataSourceCommandsIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusDataSourceCommandsIT.java index 492a40066d..0ed598b742 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusDataSourceCommandsIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusDataSourceCommandsIT.java @@ -23,6 +23,12 @@ public class PrometheusDataSourceCommandsIT extends PPLIntegTestCase { + + @Override + protected void init() throws Exception { + loadIndex(Index.DATASOURCES); + } + @Test @SneakyThrows public void testSourceMetricCommand() { diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/ShowDataSourcesCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/ShowDataSourcesCommandIT.java index 3614168ab8..71b891b77c 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/ShowDataSourcesCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/ShowDataSourcesCommandIT.java @@ -17,6 +17,10 @@ import org.junit.jupiter.api.Test; public class ShowDataSourcesCommandIT extends PPLIntegTestCase { + @Override + protected void init() throws Exception { + loadIndex(Index.DATASOURCES); + } @Test public void testShowDataSourcesCommands() throws IOException { diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java index 2a882afbe1..0c900ea234 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java @@ -15,6 +15,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import lombok.RequiredArgsConstructor; import org.junit.jupiter.api.Test; @@ -30,8 +31,11 @@ import org.opensearch.sql.analysis.ExpressionAnalyzer; import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.datasource.DataSourceMetadataStorage; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasource.DataSourceServiceImpl; +import org.opensearch.sql.datasource.DataSourceUserAuthorizationHelper; +import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.executor.ExecutionEngine.QueryResponse; import org.opensearch.sql.executor.QueryManager; @@ -78,7 +82,7 @@ public void init() { DataSourceService dataSourceService = new DataSourceServiceImpl( new ImmutableSet.Builder() .add(new OpenSearchDataSourceFactory(client, defaultSettings())) - .build()); + .build(), getDataSourceMetadataStorage(), getDataSourceUserRoleHelper()); dataSourceService.createDataSource(defaultOpenSearchDataSourceMetadata()); ModulesBuilder modules = new ModulesBuilder(); @@ -233,4 +237,44 @@ public QueryPlanFactory queryPlanFactory(ExecutionEngine executionEngine) { return new QueryPlanFactory(new QueryService(analyzer, executionEngine, planner)); } } + + + private DataSourceMetadataStorage getDataSourceMetadataStorage() { + return new DataSourceMetadataStorage() { + @Override + public List getDataSourceMetadata() { + return Collections.emptyList(); + } + + @Override + public Optional getDataSourceMetadata(String datasourceName) { + return Optional.empty(); + } + + @Override + public void createDataSourceMetadata(DataSourceMetadata dataSourceMetadata) { + + } + + @Override + public void updateDataSourceMetadata(DataSourceMetadata dataSourceMetadata) { + + } + + @Override + public void deleteDataSourceMetadata(String datasourceName) { + + } + }; + } + + private DataSourceUserAuthorizationHelper getDataSourceUserRoleHelper() { + return new DataSourceUserAuthorizationHelper() { + @Override + public void authorizeDataSource(DataSourceMetadata dataSourceMetadata) { + + } + }; + } + } diff --git a/integ-test/src/test/resources/datasources.json b/integ-test/src/test/resources/datasources.json new file mode 100644 index 0000000000..e1e5d5e8bd --- /dev/null +++ b/integ-test/src/test/resources/datasources.json @@ -0,0 +1,2 @@ +{"index":{"_id":"my_prometheus"}} +{ "name" : "my_prometheus", "connector": "prometheus", "properties" : { "prometheus.uri" : "http://localhost:9090"}} diff --git a/integ-test/src/test/resources/indexDefinitions/datasources_index_mappings.json b/integ-test/src/test/resources/indexDefinitions/datasources_index_mappings.json new file mode 100644 index 0000000000..cb45610e6f --- /dev/null +++ b/integ-test/src/test/resources/indexDefinitions/datasources_index_mappings.json @@ -0,0 +1,17 @@ +{ + "mappings": { + "properties": { + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword" + } + } + }, + "connectorType": { + "type": "keyword" + } + } + } +} \ No newline at end of file diff --git a/legacy/build.gradle b/legacy/build.gradle index db9d6138f0..d3ee13370e 100644 --- a/legacy/build.gradle +++ b/legacy/build.gradle @@ -90,7 +90,7 @@ dependencies { } implementation group: 'com.google.guava', name: 'guava', version: '31.0.1-jre' implementation group: 'org.json', name: 'json', version:'20180813' - implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.10' + implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0' implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}" // add geo module as dependency. https://github.com/opensearch-project/OpenSearch/pull/4180/. implementation group: 'org.opensearch.plugin', name: 'geo', version: "${opensearch_version}" diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricName.java b/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricName.java index 2092bc1ae1..16a719b97e 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricName.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricName.java @@ -27,7 +27,10 @@ public enum MetricName { PPL_REQ_TOTAL("ppl_request_total"), PPL_REQ_COUNT_TOTAL("ppl_request_count"), PPL_FAILED_REQ_COUNT_SYS("ppl_failed_request_count_syserr"), - PPL_FAILED_REQ_COUNT_CUS("ppl_failed_request_count_cuserr"); + PPL_FAILED_REQ_COUNT_CUS("ppl_failed_request_count_cuserr"), + DATASOURCE_REQ_COUNT("datasource_request_count"), + DATASOURCE_FAILED_REQ_COUNT_SYS("datasource_failed_request_count_syserr"), + DATASOURCE_FAILED_REQ_COUNT_CUS("datasource_failed_request_count_cuserr"); private String name; diff --git a/plugin/build.gradle b/plugin/build.gradle index ed13bff8b8..f78b85ec65 100644 --- a/plugin/build.gradle +++ b/plugin/build.gradle @@ -81,7 +81,6 @@ dependencyLicenses.enabled = false thirdPartyAudit.enabled = false configurations.all { - resolutionStrategy.force 'junit:junit:4.13.2' // conflict with spring-jcl resolutionStrategy.force "commons-logging:commons-logging:1.2" // enforce 2.12.6, https://github.com/opensearch-project/sql/issues/424 @@ -113,11 +112,20 @@ dependencies { api "com.fasterxml.jackson.core:jackson-core:${versions.jackson}" api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}" api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}" + api group: 'commons-io', name: 'commons-io', version: '2.8.0' + implementation group: 'org.opensearch', name: 'opensearch-x-content', version: "${opensearch_version}" + implementation group: 'org.opensearch', name: 'common-utils', version: "${opensearch_build}" api project(":ppl") api project(':legacy') api project(':opensearch') api project(':prometheus') + + testImplementation group: 'net.bytebuddy', name: 'byte-buddy-agent', version: '1.12.13' + testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1' + testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.2.0' + testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '5.2.0' + testImplementation 'org.junit.jupiter:junit-jupiter:5.6.2' } test { diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index e918a90ef1..c194962306 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -7,14 +7,8 @@ import static org.opensearch.sql.datasource.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import java.io.IOException; -import java.io.InputStream; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -45,7 +39,6 @@ import org.opensearch.env.NodeEnvironment; import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.Plugin; -import org.opensearch.plugins.ReloadablePlugin; import org.opensearch.plugins.ScriptPlugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.rest.RestController; @@ -53,17 +46,17 @@ import org.opensearch.script.ScriptContext; import org.opensearch.script.ScriptEngine; import org.opensearch.script.ScriptService; +import org.opensearch.sql.common.encryptor.EncryptorImpl; +import org.opensearch.sql.datasource.DataSourceMetadataStorage; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasource.DataSourceServiceImpl; -import org.opensearch.sql.datasource.model.DataSource; -import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.DataSourceUserAuthorizationHelper; import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.executor.AsyncRestExecutor; import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.legacy.plugin.RestSqlAction; import org.opensearch.sql.legacy.plugin.RestSqlStatsAction; import org.opensearch.sql.opensearch.client.OpenSearchNodeClient; -import org.opensearch.sql.opensearch.security.SecurityAccess; import org.opensearch.sql.opensearch.setting.LegacyOpenDistroSettings; import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.sql.opensearch.storage.OpenSearchDataSourceFactory; @@ -71,12 +64,17 @@ import org.opensearch.sql.opensearch.storage.serialization.DefaultExpressionSerializer; import org.opensearch.sql.plugin.config.OpenSearchPluginModule; import org.opensearch.sql.plugin.datasource.DataSourceSettings; +import org.opensearch.sql.plugin.datasource.DataSourceUserAuthorizationHelperImpl; +import org.opensearch.sql.plugin.datasource.OpenSearchDataSourceMetadataStorage; +import org.opensearch.sql.plugin.model.CreateDataSourceActionResponse; +import org.opensearch.sql.plugin.rest.RestDataSourceQueryAction; import org.opensearch.sql.plugin.rest.RestPPLQueryAction; import org.opensearch.sql.plugin.rest.RestPPLStatsAction; import org.opensearch.sql.plugin.rest.RestQuerySettingsAction; import org.opensearch.sql.plugin.transport.PPLQueryAction; import org.opensearch.sql.plugin.transport.TransportPPLQueryAction; import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; +import org.opensearch.sql.plugin.transport.datasource.TransportCreateDataSourceAction; import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory; import org.opensearch.sql.storage.DataSourceFactory; import org.opensearch.threadpool.ExecutorBuilder; @@ -84,21 +82,16 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; -public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin, ReloadablePlugin { +public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin { private static final Logger LOG = LogManager.getLogger(); - private ClusterService clusterService; - /** * Settings should be inited when bootstrap the plugin. */ private org.opensearch.sql.common.setting.Settings pluginSettings; - private NodeClient client; - private DataSourceService dataSourceService; - private Injector injector; public String name() { @@ -129,7 +122,8 @@ public List getRestHandlers( new RestSqlAction(settings, injector), new RestSqlStatsAction(settings, restController), new RestPPLStatsAction(settings, restController), - new RestQuerySettingsAction(settings, restController)); + new RestQuerySettingsAction(settings, restController), + new RestDataSourceQueryAction()); } /** @@ -140,7 +134,9 @@ public List getRestHandlers( return Arrays.asList( new ActionHandler<>( new ActionType<>(PPLQueryAction.NAME, TransportPPLQueryResponse::new), - TransportPPLQueryAction.class)); + TransportPPLQueryAction.class), + new ActionHandler<>(new ActionType<>(TransportCreateDataSourceAction.NAME, + CreateDataSourceActionResponse::new), TransportCreateDataSourceAction.class)); } @Override @@ -159,15 +155,23 @@ public Collection createComponents( this.clusterService = clusterService; this.pluginSettings = new OpenSearchSettings(clusterService.getClusterSettings()); this.client = (NodeClient) client; + String masterKey = DataSourceSettings + .DATASOURCE_MASTER_SECRET_KEY.get(clusterService.getSettings()); + DataSourceMetadataStorage dataSourceMetadataStorage + = new OpenSearchDataSourceMetadataStorage(client, clusterService, + new EncryptorImpl(masterKey)); + DataSourceUserAuthorizationHelper dataSourceUserAuthorizationHelper + = new DataSourceUserAuthorizationHelperImpl(client); this.dataSourceService = new DataSourceServiceImpl( new ImmutableSet.Builder() .add(new OpenSearchDataSourceFactory( - new OpenSearchNodeClient(this.client), pluginSettings)) + new OpenSearchNodeClient(this.client), pluginSettings)) .add(new PrometheusStorageFactory()) - .build()); + .build(), + dataSourceMetadataStorage, + dataSourceUserAuthorizationHelper); dataSourceService.createDataSource(defaultOpenSearchDataSourceMetadata()); - loadDataSources(dataSourceService, clusterService.getSettings()); LocalClusterState.state().setClusterService(clusterService); LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings); @@ -200,6 +204,7 @@ public List> getSettings() { .addAll(LegacyOpenDistroSettings.legacySettings()) .addAll(OpenSearchSettings.pluginSettings()) .add(DataSourceSettings.DATASOURCE_CONFIG) + .add(DataSourceSettings.DATASOURCE_MASTER_SECRET_KEY) .build(); } @@ -208,36 +213,4 @@ public ScriptEngine getScriptEngine(Settings settings, Collection { - InputStream inputStream = DataSourceSettings.DATASOURCE_CONFIG.get(settings); - if (inputStream != null) { - ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - try { - List metadataList = - objectMapper.readValue(inputStream, new TypeReference<>() {}); - dataSourceService.createDataSource(metadataList.toArray(new DataSourceMetadata[0])); - } catch (IOException e) { - LOG.error( - "DataSource Configuration File uploaded is malformed. Verify and re-upload.", e); - } catch (Throwable e) { - LOG.error("DataSource construction failed.", e); - } - } - return null; - }); - } } diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/datasource/DataSourceSettings.java b/plugin/src/main/java/org/opensearch/sql/plugin/datasource/DataSourceSettings.java index 9a3466df45..a451ad30be 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/datasource/DataSourceSettings.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/datasource/DataSourceSettings.java @@ -14,4 +14,10 @@ public class DataSourceSettings { public static final Setting DATASOURCE_CONFIG = SecureSetting.secureFile( "plugins.query.federation.datasources.config", null); + + public static final Setting DATASOURCE_MASTER_SECRET_KEY = Setting.simpleString( + "plugins.query.datasources.encryption.masterkey", + "0000000000000000", + Setting.Property.NodeScope, + Setting.Property.Dynamic); } diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/datasource/DataSourceUserAuthorizationHelperImpl.java b/plugin/src/main/java/org/opensearch/sql/plugin/datasource/DataSourceUserAuthorizationHelperImpl.java new file mode 100644 index 0000000000..41ad450f68 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/datasource/DataSourceUserAuthorizationHelperImpl.java @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.datasource; + +import static org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT; +import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME; + +import java.util.List; +import lombok.AllArgsConstructor; +import org.opensearch.client.Client; +import org.opensearch.commons.authuser.User; +import org.opensearch.sql.datasource.DataSourceUserAuthorizationHelper; +import org.opensearch.sql.datasource.model.DataSourceMetadata; + +@AllArgsConstructor +public class DataSourceUserAuthorizationHelperImpl implements DataSourceUserAuthorizationHelper { + private final Client client; + + private Boolean isAuthorizationRequired() { + String userString = client.threadPool() + .getThreadContext().getTransient(OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT); + return userString != null; + } + + private List getUserRoles() { + String userString = client.threadPool() + .getThreadContext().getTransient(OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT); + User user = User.parse(userString); + return user.getRoles(); + } + + + @Override + public void authorizeDataSource(DataSourceMetadata dataSourceMetadata) { + if (isAuthorizationRequired() + && !dataSourceMetadata.getName().equals(DEFAULT_DATASOURCE_NAME)) { + boolean isAuthorized = false; + for (String role : getUserRoles()) { + if (dataSourceMetadata.getAllowedRoles().contains(role) + || role.equals("all_access")) { + isAuthorized = true; + break; + } + } + if (!isAuthorized) { + throw new SecurityException( + String.format("User is not authorized to access datasource %s. " + + "User should be mapped to any of the roles in %s for access.", + dataSourceMetadata.getName(), dataSourceMetadata.getAllowedRoles().toString())); + } + } + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/datasource/OpenSearchDataSourceMetadataStorage.java b/plugin/src/main/java/org/opensearch/sql/plugin/datasource/OpenSearchDataSourceMetadataStorage.java new file mode 100644 index 0000000000..b3c433f7e6 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/datasource/OpenSearchDataSourceMetadataStorage.java @@ -0,0 +1,244 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.plugin.datasource; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.commons.io.IOUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionFuture; +import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.search.SearchHit; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.sql.common.encryptor.Encryptor; +import org.opensearch.sql.datasource.DataSourceMetadataStorage; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.auth.AuthenticationType; +import org.opensearch.sql.plugin.utils.XContentParserUtils; + +public class OpenSearchDataSourceMetadataStorage implements DataSourceMetadataStorage { + + public static final String DATASOURCE_INDEX_NAME = ".ql-datasources"; + private static final String DATASOURCE_INDEX_MAPPING_FILE_NAME = "datasources-index-mapping.yml"; + private static final String DATASOURCE_INDEX_SETTINGS_FILE_NAME + = "datasources-index-settings.yml"; + private static final Logger LOG = LogManager.getLogger(); + private final Client client; + private final ClusterService clusterService; + + private final Encryptor encryptor; + + /** + * This class implements DataSourceMetadataStorage interface + * using OpenSearch as underlying storage. + * + * @param client opensearch NodeClient. + * @param clusterService ClusterService. + * @param encryptor Encryptor. + */ + public OpenSearchDataSourceMetadataStorage(Client client, ClusterService clusterService, + Encryptor encryptor) { + this.client = client; + this.clusterService = clusterService; + this.encryptor = encryptor; + } + + @Override + public List getDataSourceMetadata() { + if (!this.clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) { + createDataSourcesIndex(); + return Collections.emptyList(); + } + return searchInDataSourcesIndex(QueryBuilders.matchAllQuery()); + } + + @Override + public Optional getDataSourceMetadata(String datasourceName) { + if (!this.clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) { + createDataSourcesIndex(); + } + return searchInDataSourcesIndex(QueryBuilders.termQuery("name", datasourceName)) + .stream() + .findFirst() + .map(x -> this.encryptDecryptAuthenticationData(x, false)); + } + + @Override + public void createDataSourceMetadata(DataSourceMetadata dataSourceMetadata) { + encryptDecryptAuthenticationData(dataSourceMetadata, true); + if (!this.clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) { + createDataSourcesIndex(); + } + IndexRequest indexRequest = new IndexRequest(DATASOURCE_INDEX_NAME); + indexRequest.id(dataSourceMetadata.getName()); + ActionFuture indexResponseActionFuture; + try (ThreadContext.StoredContext storedContext = client.threadPool().getThreadContext() + .stashContext()) { + indexRequest.source(XContentParserUtils.convertToXContent(dataSourceMetadata)); + indexResponseActionFuture = client.index(indexRequest); + } catch (Exception e) { + throw new RuntimeException(e); + } + IndexResponse indexResponse = indexResponseActionFuture.actionGet(); + if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) { + LOG.debug("DatasourceMetadata : {} successfully created", dataSourceMetadata.getName()); + } + } + + @Override + public void updateDataSourceMetadata(DataSourceMetadata dataSourceMetadata) { + throw new UnsupportedOperationException("will be supported in future."); + } + + @Override + public void deleteDataSourceMetadata(String datasourceName) { + throw new UnsupportedOperationException("will be supported in future."); + } + + private void createDataSourcesIndex() { + try { + InputStream mappingFileStream = OpenSearchDataSourceMetadataStorage.class.getClassLoader() + .getResourceAsStream(DATASOURCE_INDEX_MAPPING_FILE_NAME); + InputStream settingsFileStream = OpenSearchDataSourceMetadataStorage.class.getClassLoader() + .getResourceAsStream(DATASOURCE_INDEX_SETTINGS_FILE_NAME); + CreateIndexRequest createIndexRequest = new CreateIndexRequest(DATASOURCE_INDEX_NAME); + createIndexRequest + .mapping(IOUtils.toString(mappingFileStream, StandardCharsets.UTF_8), + XContentType.YAML) + .settings(IOUtils.toString(settingsFileStream, StandardCharsets.UTF_8), + XContentType.YAML); + ActionFuture createIndexResponseActionFuture; + try (ThreadContext.StoredContext storedContext = client.threadPool().getThreadContext() + .stashContext()) { + createIndexResponseActionFuture = client.admin().indices().create(createIndexRequest); + } + CreateIndexResponse createIndexResponse = createIndexResponseActionFuture.actionGet(); + if (createIndexResponse.isAcknowledged()) { + LOG.info("Index: {} creation Acknowledged", DATASOURCE_INDEX_NAME); + } else { + throw new IllegalStateException( + String.format("Index: %s creation failed", DATASOURCE_INDEX_NAME)); + } + } catch (Throwable e) { + throw new RuntimeException( + "Internal server error while creating" + DATASOURCE_INDEX_NAME + " index" + + e.getMessage()); + } + } + + private List searchInDataSourcesIndex(QueryBuilder query) { + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(DATASOURCE_INDEX_NAME); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(query); + searchRequest.source(searchSourceBuilder); + ActionFuture searchResponseActionFuture; + try (ThreadContext.StoredContext storedContext = client.threadPool().getThreadContext() + .stashContext()) { + searchResponseActionFuture = client.search(searchRequest); + } + SearchResponse searchResponse = searchResponseActionFuture.actionGet(); + if (searchResponse.status().getStatus() != 200) { + throw new RuntimeException( + "Internal server error while fetching datasource metadata information"); + } else { + List list = new ArrayList<>(); + for (SearchHit documentFields : searchResponse.getHits().getHits()) { + String sourceAsString = documentFields.getSourceAsString(); + DataSourceMetadata dataSourceMetadata; + try { + dataSourceMetadata = XContentParserUtils.toDataSourceMetadata(sourceAsString); + } catch (IOException e) { + throw new RuntimeException(e); + } + list.add(dataSourceMetadata); + } + return list; + } + } + + private DataSourceMetadata encryptDecryptAuthenticationData(DataSourceMetadata dataSourceMetadata, + Boolean isEncryption) { + Map propertiesMap = dataSourceMetadata.getProperties(); + Optional authTypeOptional + = propertiesMap.keySet().stream().filter(s -> s.endsWith("auth.type")) + .findFirst() + .map(propertiesMap::get) + .map(AuthenticationType::get); + if (authTypeOptional.isPresent()) { + switch (authTypeOptional.get()) { + case BASICAUTH: + handleBasicAuthPropertiesEncryptionDecryption(propertiesMap, isEncryption); + break; + case AWSSIGV4AUTH: + handleSigV4PropertiesEncryptionDecryption(propertiesMap, isEncryption); + break; + default: + break; + } + } + return dataSourceMetadata; + } + + private void handleBasicAuthPropertiesEncryptionDecryption(Map propertiesMap, + Boolean isEncryption) { + Optional usernameKey = propertiesMap.keySet().stream() + .filter(s -> s.endsWith("auth.username")) + .findFirst(); + Optional passwordKey = propertiesMap.keySet().stream() + .filter(s -> s.endsWith("auth.password")) + .findFirst(); + encryptOrDecrypt(propertiesMap, isEncryption, + Arrays.asList(usernameKey.get(), passwordKey.get())); + } + + private void encryptOrDecrypt(Map propertiesMap, Boolean isEncryption, + List keyIdentifiers) { + for (String key : keyIdentifiers) { + if (isEncryption) { + propertiesMap.put(key, + this.encryptor.encrypt(propertiesMap.get(key))); + } else { + propertiesMap.put(key, + this.encryptor.decrypt(propertiesMap.get(key))); + } + } + } + + private void handleSigV4PropertiesEncryptionDecryption(Map propertiesMap, + Boolean isEncryption) { + Optional accessKey = propertiesMap.keySet().stream() + .filter(s -> s.endsWith("auth.access_key")) + .findFirst(); + Optional secretKey = propertiesMap.keySet().stream() + .filter(s -> s.endsWith("auth.secret_key")) + .findFirst(); + encryptOrDecrypt(propertiesMap, isEncryption, Arrays.asList(accessKey.get(), secretKey.get())); + } + +} \ No newline at end of file diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/model/CreateDataSourceActionRequest.java b/plugin/src/main/java/org/opensearch/sql/plugin/model/CreateDataSourceActionRequest.java new file mode 100644 index 0000000000..d6a15e3a0c --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/model/CreateDataSourceActionRequest.java @@ -0,0 +1,49 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.plugin.model; + + +import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME; + +import java.io.IOException; +import lombok.Getter; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.sql.datasource.model.DataSourceMetadata; + +public class CreateDataSourceActionRequest + extends ActionRequest { + + @Getter + private DataSourceMetadata dataSourceMetadata; + + /** + * Constructor of CreateDataSourceActionRequest from StreamInput. + */ + public CreateDataSourceActionRequest(StreamInput in) throws IOException { + super(in); + } + + public CreateDataSourceActionRequest(DataSourceMetadata dataSourceMetadata) { + this.dataSourceMetadata = dataSourceMetadata; + } + + @Override + public ActionRequestValidationException validate() { + if (this.dataSourceMetadata.getName().equals(DEFAULT_DATASOURCE_NAME)) { + ActionRequestValidationException exception = new ActionRequestValidationException(); + exception + .addValidationError( + "Not allowed to create datasource with name : " + DEFAULT_DATASOURCE_NAME); + return exception; + } else { + return null; + } + } +} \ No newline at end of file diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/model/CreateDataSourceActionResponse.java b/plugin/src/main/java/org/opensearch/sql/plugin/model/CreateDataSourceActionResponse.java new file mode 100644 index 0000000000..1d8d9aa9b7 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/model/CreateDataSourceActionResponse.java @@ -0,0 +1,33 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.plugin.model; + +import java.io.IOException; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.opensearch.action.ActionResponse; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +@RequiredArgsConstructor +public class CreateDataSourceActionResponse + extends ActionResponse { + + @Getter + private final String result; + + public CreateDataSourceActionResponse(StreamInput in) throws IOException { + super(in); + result = in.readString(); + } + + @Override + public void writeTo(StreamOutput streamOutput) throws IOException { + streamOutput.writeString(result); + } +} \ No newline at end of file diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestDataSourceQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestDataSourceQueryAction.java new file mode 100644 index 0000000000..7314362e39 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestDataSourceQueryAction.java @@ -0,0 +1,131 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.plugin.rest; + +import static org.opensearch.rest.RestRequest.Method.POST; +import static org.opensearch.rest.RestStatus.BAD_REQUEST; +import static org.opensearch.rest.RestStatus.SERVICE_UNAVAILABLE; +import static org.opensearch.sql.plugin.utils.Scheduler.schedule; + +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.util.List; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; +import org.opensearch.client.node.NodeClient; +import org.opensearch.index.IndexNotFoundException; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestChannel; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestStatus; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.opensearch.response.error.ErrorMessageFactory; +import org.opensearch.sql.plugin.model.CreateDataSourceActionRequest; +import org.opensearch.sql.plugin.model.CreateDataSourceActionResponse; +import org.opensearch.sql.plugin.transport.datasource.TransportCreateDataSourceAction; +import org.opensearch.sql.plugin.utils.XContentParserUtils; + +public class RestDataSourceQueryAction extends BaseRestHandler { + + public static final String DATASOURCE_ACTIONS = "datasource_actions"; + public static final String BASE_DATASOURCE_ACTION_URL = "/_plugins/_query/_datasources"; + + private static final Logger LOG = LogManager.getLogger(RestDataSourceQueryAction.class); + + @Override + public String getName() { + return DATASOURCE_ACTIONS; + } + + @Override + public List routes() { + return ImmutableList.of( + + /* + * + * Create a new datasource. + * Request URL: POST + * Request body: + * Ref [org.opensearch.sql.plugin.transport.datasource.model.CreateDataSourceActionRequest] + * Response body: + * Ref [org.opensearch.sql.plugin.transport.datasource.model.CreateDataSourceActionResponse] + */ + new Route(POST, BASE_DATASOURCE_ACTION_URL) + ); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient nodeClient) + throws IOException { + switch (restRequest.method()) { + case POST: + return executePostRequest(restRequest, nodeClient); + default: + return restChannel + -> restChannel.sendResponse(new BytesRestResponse(RestStatus.METHOD_NOT_ALLOWED, + String.valueOf(restRequest.method()))); + } + } + + private RestChannelConsumer executePostRequest(RestRequest restRequest, + NodeClient nodeClient) throws IOException { + + DataSourceMetadata dataSourceMetadata + = XContentParserUtils.toDataSourceMetadata(restRequest.contentParser()); + return restChannel -> schedule(nodeClient, + () -> nodeClient.execute(TransportCreateDataSourceAction.ACTION_TYPE, + new CreateDataSourceActionRequest(dataSourceMetadata), + new ActionListener<>() { + @Override + public void onResponse( + CreateDataSourceActionResponse createDataSourceActionResponse) { + restChannel.sendResponse( + new BytesRestResponse(RestStatus.OK, "application/json; charset=UTF-8", + createDataSourceActionResponse.getResult())); + } + + @Override + public void onFailure(Exception e) { + if (e instanceof IllegalAccessException) { + reportError(restChannel, e, BAD_REQUEST); + } else { + LOG.error("Error happened during query handling", e); + if (isClientError(e)) { + Metrics.getInstance() + .getNumericalMetric(MetricName.DATASOURCE_FAILED_REQ_COUNT_CUS) + .increment(); + reportError(restChannel, e, BAD_REQUEST); + } else { + Metrics.getInstance() + .getNumericalMetric(MetricName.DATASOURCE_FAILED_REQ_COUNT_SYS) + .increment(); + reportError(restChannel, e, SERVICE_UNAVAILABLE); + } + } + } + })); + } + + private void reportError(final RestChannel channel, final Exception e, final RestStatus status) { + channel.sendResponse( + new BytesRestResponse( + status, ErrorMessageFactory.createErrorMessage(e, status.getStatus()).toString())); + } + + private static boolean isClientError(Exception e) { + return e instanceof NullPointerException + // NPE is hard to differentiate but more likely caused by bad query + || e instanceof IllegalArgumentException + || e instanceof IndexNotFoundException; + } + +} \ No newline at end of file diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/TransportCreateDataSourceAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/TransportCreateDataSourceAction.java new file mode 100644 index 0000000000..006837c256 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/datasource/TransportCreateDataSourceAction.java @@ -0,0 +1,73 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.plugin.transport.datasource; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionType; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.client.Client; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.inject.Inject; +import org.opensearch.sql.datasource.DataSourceService; +import org.opensearch.sql.datasource.DataSourceServiceImpl; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.plugin.model.CreateDataSourceActionRequest; +import org.opensearch.sql.plugin.model.CreateDataSourceActionResponse; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +public class TransportCreateDataSourceAction + extends HandledTransportAction { + + private static final Logger LOG = LogManager.getLogger(); + public static final String NAME = "cluster:admin/opensearch/datasources/create"; + public static final ActionType + ACTION_TYPE = new ActionType<>(NAME, CreateDataSourceActionResponse::new); + + private DataSourceService dataSourceService; + private Client client; + + /** + * TransportCreateDataSourceAction action for creating datasource. + * + * @param transportService transportService. + * @param actionFilters actionFilters. + * @param client client. + * @param dataSourceService dataSourceService. + */ + @Inject + public TransportCreateDataSourceAction(TransportService transportService, + ActionFilters actionFilters, + NodeClient client, + DataSourceServiceImpl dataSourceService) { + super(TransportCreateDataSourceAction.NAME, transportService, actionFilters, + CreateDataSourceActionRequest::new); + this.dataSourceService = dataSourceService; + this.client = client; + } + + @Override + protected void doExecute(Task task, CreateDataSourceActionRequest request, + ActionListener actionListener) { + + Metrics.getInstance().getNumericalMetric(MetricName.DATASOURCE_REQ_COUNT).increment(); + actionListener.onResponse(execute(request.getDataSourceMetadata())); + } + + private CreateDataSourceActionResponse execute(DataSourceMetadata dataSourceMetadata) { + dataSourceService.createDataSource(dataSourceMetadata); + return new CreateDataSourceActionResponse("Created DataSource with name " + + dataSourceMetadata.getName()); + } + +} \ No newline at end of file diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/utils/Scheduler.java b/plugin/src/main/java/org/opensearch/sql/plugin/utils/Scheduler.java new file mode 100644 index 0000000000..a4a87b1b12 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/utils/Scheduler.java @@ -0,0 +1,33 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.utils; + +import java.util.Map; +import lombok.experimental.UtilityClass; +import org.apache.logging.log4j.ThreadContext; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.threadpool.ThreadPool; + +/** The scheduler which schedule the task run in sql-worker thread pool. */ +@UtilityClass +public class Scheduler { + + public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker"; + + public static void schedule(NodeClient client, Runnable task) { + ThreadPool threadPool = client.threadPool(); + threadPool.schedule(withCurrentContext(task), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME); + } + + private static Runnable withCurrentContext(final Runnable task) { + final Map currentContext = ThreadContext.getImmutableContext(); + return () -> { + ThreadContext.putAll(currentContext); + task.run(); + }; + } +} \ No newline at end of file diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/utils/XContentParserUtils.java b/plugin/src/main/java/org/opensearch/sql/plugin/utils/XContentParserUtils.java new file mode 100644 index 0000000000..cc1310ffc3 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/utils/XContentParserUtils.java @@ -0,0 +1,116 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.utils; + +import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.DeprecationHandler; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; + +public class XContentParserUtils { + public static final String NAME_FIELD = "name"; + public static final String CONNECTOR_FIELD = "connector"; + public static final String PROPERTIES_FIELD = "properties"; + public static final String ALLOWED_ROLES_FIELD = "allowedRoles"; + + /** + * Convert xcontent parser to DataSourceMetadata. + * + * @param parser parser. + * @return DataSourceMetadata {@link DataSourceMetadata} + * @throws IOException IOException. + */ + public static DataSourceMetadata toDataSourceMetadata(XContentParser parser) throws IOException { + String name = null; + DataSourceType connector = null; + List allowedRoles = new ArrayList<>(); + Map properties = new HashMap<>(); + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case NAME_FIELD: + name = parser.textOrNull(); + break; + case CONNECTOR_FIELD: + connector = DataSourceType.fromString(parser.textOrNull()); + break; + case ALLOWED_ROLES_FIELD: + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + allowedRoles.add(parser.text()); + } + break; + case PROPERTIES_FIELD: + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String key = parser.currentName(); + parser.nextToken(); + String value = parser.textOrNull(); + properties.put(key, value); + } + break; + default: + throw new IllegalArgumentException("Unknown field: " + fieldName); + } + } + if (name == null || connector == null) { + throw new IllegalArgumentException("Missing required fields"); + } + return new DataSourceMetadata(name, connector, allowedRoles, properties); + } + + /** + * Converts json string to DataSourceMetadata. + * + * @param json jsonstring. + * @return DataSourceMetadata {@link DataSourceMetadata} + * @throws IOException IOException. + */ + public static DataSourceMetadata toDataSourceMetadata(String json) throws IOException { + try (XContentParser parser = XContentType.JSON.xContent() + .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + json)) { + return toDataSourceMetadata(parser); + } + } + + /** + * Converts DataSourceMetadata to XContentBuilder. + * + * @param metadata metadata. + * @return XContentBuilder {@link XContentBuilder} + * @throws Exception Exception. + */ + public static XContentBuilder convertToXContent(DataSourceMetadata metadata) throws Exception { + + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.startObject(); + builder.field(NAME_FIELD, metadata.getName()); + builder.field(CONNECTOR_FIELD, metadata.getConnector().name()); + builder.field(ALLOWED_ROLES_FIELD, metadata.getAllowedRoles().toArray()); + builder.startObject(PROPERTIES_FIELD); + for (Map.Entry entry : metadata.getProperties().entrySet()) { + builder.field(entry.getKey(), entry.getValue()); + } + builder.endObject(); + builder.endObject(); + return builder; + } + + +} diff --git a/plugin/src/main/resources/datasources-index-mapping.yml b/plugin/src/main/resources/datasources-index-mapping.yml new file mode 100644 index 0000000000..584ef00481 --- /dev/null +++ b/plugin/src/main/resources/datasources-index-mapping.yml @@ -0,0 +1,19 @@ +--- +## +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +## + +# Schema file for the observability index +# Since we only search based on "access", sort on lastUpdatedTimeMs & createdTimeMs, +# other fields are not used in mapping to avoid index on those fields. +# Also "dynamic" is set to "false" so that other fields can be added. +dynamic: false +properties: + name: + type: text + fields: + keyword: + type: keyword + connectorType: + type: keyword \ No newline at end of file diff --git a/plugin/src/main/resources/datasources-index-settings.yml b/plugin/src/main/resources/datasources-index-settings.yml new file mode 100644 index 0000000000..c01b2b3376 --- /dev/null +++ b/plugin/src/main/resources/datasources-index-settings.yml @@ -0,0 +1,11 @@ +--- +## +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +## + +# Settings file for the observability index +index: + number_of_shards: "1" + auto_expand_replicas: "0-2" + number_of_replicas: "0" \ No newline at end of file diff --git a/plugin/src/test/java/org/opensearch/sql/plugin/datasource/DataSourceMetaDataTest.java b/plugin/src/test/java/org/opensearch/sql/plugin/datasource/DataSourceMetaDataTest.java deleted file mode 100644 index 3ccb1cd403..0000000000 --- a/plugin/src/test/java/org/opensearch/sql/plugin/datasource/DataSourceMetaDataTest.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.plugin.datasource; - -import static org.junit.Assert.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import com.google.common.collect.ImmutableMap; -import java.io.IOException; -import java.net.URISyntaxException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import lombok.SneakyThrows; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; -import org.opensearch.common.settings.MockSecureSettings; -import org.opensearch.common.settings.Settings; -import org.opensearch.sql.datasource.DataSourceService; -import org.opensearch.sql.datasource.model.DataSourceMetadata; -import org.opensearch.sql.datasource.model.DataSourceType; -import org.opensearch.sql.plugin.SQLPlugin; - -@RunWith(MockitoJUnitRunner.class) -public class DataSourceMetaDataTest { - - public static final String DATASOURCE_SETTING_METADATA_KEY = - "plugins.query.federation.datasources.config"; - - @Mock - private DataSourceService dataSourceService; - - @SneakyThrows - @Test - public void testLoadConnectors() { - Settings settings = getDataSourceSettings("datasources.json"); - loadConnectors(settings); - List expected = - new ArrayList<>() { - { - add( - metadata( - "prometheus", - DataSourceType.PROMETHEUS, - ImmutableMap.of( - "prometheus.uri", "http://localhost:9090", - "prometheus.auth.type", "basicauth", - "prometheus.auth.username", "admin", - "prometheus.auth.password", "type"))); - } - }; - - verifyAddDataSourceWithMetadata(expected); - } - - @SneakyThrows - @Test - public void testLoadConnectorsWithMultipleDataSources() { - Settings settings = getDataSourceSettings("multiple_datasources.json"); - loadConnectors(settings); - List expected = new ArrayList<>() {{ - add(metadata("prometheus", DataSourceType.PROMETHEUS, ImmutableMap.of( - "prometheus.uri", "http://localhost:9090", - "prometheus.auth.type", "basicauth", - "prometheus.auth.username", "admin", - "prometheus.auth.password", "type" - ))); - add(metadata("prometheus-1", DataSourceType.PROMETHEUS, ImmutableMap.of( - "prometheus.uri", "http://localhost:9090", - "prometheus.auth.type", "awssigv4", - "prometheus.auth.region", "us-east-1", - "prometheus.auth.access_key", "accessKey", - "prometheus.auth.secret_key", "secretKey" - ))); - }}; - - verifyAddDataSourceWithMetadata(expected); - } - - @SneakyThrows - @Test - public void testLoadConnectorsWithMalformedJson() { - Settings settings = getDataSourceSettings("malformed_datasources.json"); - loadConnectors(settings); - - verify(dataSourceService, never()).createDataSource(any()); - } - - private Settings getDataSourceSettings(String filename) throws URISyntaxException, IOException { - MockSecureSettings mockSecureSettings = new MockSecureSettings(); - ClassLoader classLoader = getClass().getClassLoader(); - Path filepath = Paths.get(classLoader.getResource(filename).toURI()); - mockSecureSettings.setFile(DATASOURCE_SETTING_METADATA_KEY, Files.readAllBytes(filepath)); - return Settings.builder().setSecureSettings(mockSecureSettings).build(); - } - - void loadConnectors(Settings settings) { - SQLPlugin.loadDataSources(dataSourceService, settings); - } - - void verifyAddDataSourceWithMetadata(List metadataList) { - ArgumentCaptor metadataCaptor = - ArgumentCaptor.forClass(DataSourceMetadata[].class); - verify(dataSourceService, times(1)).createDataSource(metadataCaptor.capture()); - List actualValues = Arrays.asList(metadataCaptor.getValue()); - assertEquals(metadataList.size(), actualValues.size()); - assertEquals(metadataList, actualValues); - } - - DataSourceMetadata metadata(String name, DataSourceType type, Map properties) { - DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); - dataSourceMetadata.setName(name); - dataSourceMetadata.setConnector(type); - dataSourceMetadata.setProperties(properties); - return dataSourceMetadata; - } -} diff --git a/plugin/src/test/java/org/opensearch/sql/plugin/datasource/DataSourceUserAuthorizationHelperImplTest.java b/plugin/src/test/java/org/opensearch/sql/plugin/datasource/DataSourceUserAuthorizationHelperImplTest.java new file mode 100644 index 0000000000..fe57b06c6e --- /dev/null +++ b/plugin/src/test/java/org/opensearch/sql/plugin/datasource/DataSourceUserAuthorizationHelperImplTest.java @@ -0,0 +1,80 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.datasource; + +import static org.mockito.Mockito.when; +import static org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT; + +import java.util.HashMap; +import java.util.List; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.opensearch.client.Client; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; + +@RunWith(MockitoJUnitRunner.class) +public class DataSourceUserAuthorizationHelperImplTest { + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private Client client; + + @InjectMocks + private DataSourceUserAuthorizationHelperImpl dataSourceUserAuthorizationHelper; + + + @Test + public void testAuthorizeDataSourceWithAllowedRoles() { + String userString = "myuser|bckrole1,bckrol2|prometheus_access|myTenant"; + when(client.threadPool().getThreadContext() + .getTransient(OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT)) + .thenReturn(userString); + DataSourceMetadata dataSourceMetadata = dataSourceMetadata(); + this.dataSourceUserAuthorizationHelper + .authorizeDataSource(dataSourceMetadata); + } + + @Test + public void testAuthorizeDataSourceWithAdminRole() { + String userString = "myuser|bckrole1,bckrol2|all_access|myTenant"; + when(client.threadPool().getThreadContext() + .getTransient(OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT)) + .thenReturn(userString); + DataSourceMetadata dataSourceMetadata = dataSourceMetadata(); + this.dataSourceUserAuthorizationHelper + .authorizeDataSource(dataSourceMetadata); + } + + @Test + public void testAuthorizeDataSourceWithException() { + String userString = "myuser|bckrole1,bckrol2|role1|myTenant"; + when(client.threadPool().getThreadContext() + .getTransient(OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT)) + .thenReturn(userString); + DataSourceMetadata dataSourceMetadata = dataSourceMetadata(); + SecurityException securityException + = Assert.assertThrows(SecurityException.class, + () -> this.dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata)); + Assert.assertEquals("User is not authorized to access datasource test. " + + "User should be mapped to any of the roles in [prometheus_access] for access.", + securityException.getMessage()); + } + + private DataSourceMetadata dataSourceMetadata() { + DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); + dataSourceMetadata.setName("test"); + dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS); + dataSourceMetadata.setAllowedRoles(List.of("prometheus_access")); + dataSourceMetadata.setProperties(new HashMap<>()); + return dataSourceMetadata; + } + +} diff --git a/plugin/src/test/java/org/opensearch/sql/plugin/datasource/OpenSearchDataSourceMetadataStorageTest.java b/plugin/src/test/java/org/opensearch/sql/plugin/datasource/OpenSearchDataSourceMetadataStorageTest.java new file mode 100644 index 0000000000..140d4e0edd --- /dev/null +++ b/plugin/src/test/java/org/opensearch/sql/plugin/datasource/OpenSearchDataSourceMetadataStorageTest.java @@ -0,0 +1,220 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.datasource; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.opensearch.sql.plugin.datasource.OpenSearchDataSourceMetadataStorage.DATASOURCE_INDEX_NAME; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import lombok.SneakyThrows; +import org.apache.lucene.search.TotalHits; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.opensearch.action.ActionFuture; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.rest.RestStatus; +import org.opensearch.search.SearchHit; +import org.opensearch.search.SearchHits; +import org.opensearch.sql.common.encryptor.Encryptor; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; + +@RunWith(MockitoJUnitRunner.class) +public class OpenSearchDataSourceMetadataStorageTest { + + private static final String TEST_DATASOURCE_INDEX_NAME = "testDS"; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private Client client; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private ClusterService clusterService; + @Mock + private Encryptor encryptor; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private SearchResponse searchResponse; + @Mock + private ActionFuture searchResponseActionFuture; + @Mock + private ActionFuture createIndexResponseActionFuture; + @Mock + private SearchHit searchHit; + @InjectMocks + private OpenSearchDataSourceMetadataStorage openSearchDataSourceMetadataStorage; + + + @SneakyThrows + @Test + public void testGetDataSourceMetadata() { + when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) + .thenReturn(true); + when(client.search(any())).thenReturn(searchResponseActionFuture); + when(searchResponseActionFuture.actionGet()).thenReturn(searchResponse); + when(searchResponse.status()).thenReturn(RestStatus.OK); + when(searchResponse.getHits()) + .thenReturn( + new SearchHits( + new SearchHit[] {searchHit}, + new TotalHits(21, TotalHits.Relation.EQUAL_TO), + 1.0F)); + when(searchHit.getSourceAsString()) + .thenReturn(getBasicDataSourceMetadataString()); + when(encryptor.decrypt("password")).thenReturn("password"); + when(encryptor.decrypt("username")).thenReturn("username"); + + Optional dataSourceMetadataOptional + = openSearchDataSourceMetadataStorage.getDataSourceMetadata(TEST_DATASOURCE_INDEX_NAME); + + + assertFalse(dataSourceMetadataOptional.isEmpty()); + DataSourceMetadata dataSourceMetadata = dataSourceMetadataOptional.get(); + assertEquals(TEST_DATASOURCE_INDEX_NAME, dataSourceMetadata.getName()); + assertEquals(DataSourceType.PROMETHEUS, dataSourceMetadata.getConnector()); + assertEquals("password", + dataSourceMetadata.getProperties().get("prometheus.auth.password")); + assertEquals("username", + dataSourceMetadata.getProperties().get("prometheus.auth.username")); + assertEquals("basicauth", + dataSourceMetadata.getProperties().get("prometheus.auth.type")); + } + + @SneakyThrows + @Test + public void testGetDataSourceMetadataWithAWSSigV4() { + when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) + .thenReturn(true); + when(client.search(any())).thenReturn(searchResponseActionFuture); + when(searchResponseActionFuture.actionGet()).thenReturn(searchResponse); + when(searchResponse.status()).thenReturn(RestStatus.OK); + when(searchResponse.getHits()) + .thenReturn( + new SearchHits( + new SearchHit[] {searchHit}, + new TotalHits(21, TotalHits.Relation.EQUAL_TO), + 1.0F)); + when(searchHit.getSourceAsString()) + .thenReturn(getAWSSigv4DataSourceMetadataString()); + when(encryptor.decrypt("secret_key")).thenReturn("secret_key"); + when(encryptor.decrypt("access_key")).thenReturn("access_key"); + + Optional dataSourceMetadataOptional + = openSearchDataSourceMetadataStorage.getDataSourceMetadata(TEST_DATASOURCE_INDEX_NAME); + + + assertFalse(dataSourceMetadataOptional.isEmpty()); + DataSourceMetadata dataSourceMetadata = dataSourceMetadataOptional.get(); + assertEquals(TEST_DATASOURCE_INDEX_NAME, dataSourceMetadata.getName()); + assertEquals(DataSourceType.PROMETHEUS, dataSourceMetadata.getConnector()); + assertEquals("secret_key", + dataSourceMetadata.getProperties().get("prometheus.auth.secret_key")); + assertEquals("access_key", + dataSourceMetadata.getProperties().get("prometheus.auth.access_key")); + assertEquals("awssigv4", + dataSourceMetadata.getProperties().get("prometheus.auth.type")); + } + + @Test + public void testCreateDataSourceMetadata() { + + when(clusterService.state().routingTable().hasIndex(DATASOURCE_INDEX_NAME)) + .thenReturn(Boolean.FALSE); + when(encryptor.encrypt("secret_key")).thenReturn("secret_key"); + when(encryptor.encrypt("access_key")).thenReturn("access_key"); + when(client.admin().indices().create(any())) + .thenReturn(createIndexResponseActionFuture); + when(createIndexResponseActionFuture.actionGet()) + .thenReturn(new CreateIndexResponse(true, true, DATASOURCE_INDEX_NAME)); + DataSourceMetadata dataSourceMetadata = getDataSourceMetadata(); + + this.openSearchDataSourceMetadataStorage.createDataSourceMetadata(dataSourceMetadata); + + verify(encryptor, times(1)).encrypt("secret_key"); + verify(encryptor, times(1)).encrypt("access_key"); + verify(client.admin().indices(), times(1)).create(any()); + verify(client, times(1)).index(any()); + verify(client.threadPool().getThreadContext(), times(2)).stashContext(); + + + } + + @Test + public void testUpdateDataSourceMetadata() { + assertThrows( + UnsupportedOperationException.class, + () -> openSearchDataSourceMetadataStorage + .updateDataSourceMetadata(new DataSourceMetadata())); + } + + @Test + public void testDeleteDataSourceMetadata() { + assertThrows( + UnsupportedOperationException.class, + () -> openSearchDataSourceMetadataStorage + .deleteDataSourceMetadata(TEST_DATASOURCE_INDEX_NAME)); + } + + private String getBasicDataSourceMetadataString() throws JsonProcessingException { + DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); + dataSourceMetadata.setName("testDS"); + dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS); + dataSourceMetadata.setAllowedRoles(Collections.singletonList("prometheus_access")); + Map properties = new HashMap<>(); + properties.put("prometheus.auth.type", "basicauth"); + properties.put("prometheus.auth.username", "username"); + properties.put("prometheus.auth.uri", "https://localhost:9090"); + properties.put("prometheus.auth.password", "password"); + dataSourceMetadata.setProperties(properties); + ObjectMapper objectMapper = new ObjectMapper(); + return objectMapper.writeValueAsString(dataSourceMetadata); + } + + private String getAWSSigv4DataSourceMetadataString() throws JsonProcessingException { + DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); + dataSourceMetadata.setName("testDS"); + dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS); + dataSourceMetadata.setAllowedRoles(Collections.singletonList("prometheus_access")); + Map properties = new HashMap<>(); + properties.put("prometheus.auth.type", "awssigv4"); + properties.put("prometheus.auth.secret_key", "secret_key"); + properties.put("prometheus.auth.uri", "https://localhost:9090"); + properties.put("prometheus.auth.access_key", "access_key"); + dataSourceMetadata.setProperties(properties); + ObjectMapper objectMapper = new ObjectMapper(); + return objectMapper.writeValueAsString(dataSourceMetadata); + } + + private DataSourceMetadata getDataSourceMetadata() { + DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); + dataSourceMetadata.setName("testDS"); + dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS); + dataSourceMetadata.setAllowedRoles(Collections.singletonList("prometheus_access")); + Map properties = new HashMap<>(); + properties.put("prometheus.auth.type", "awssigv4"); + properties.put("prometheus.auth.secret_key", "secret_key"); + properties.put("prometheus.auth.uri", "https://localhost:9090"); + properties.put("prometheus.auth.access_key", "access_key"); + dataSourceMetadata.setProperties(properties); + return dataSourceMetadata; + } + +} diff --git a/plugin/src/test/java/org/opensearch/sql/plugin/utils/SchedulerTest.java b/plugin/src/test/java/org/opensearch/sql/plugin/utils/SchedulerTest.java new file mode 100644 index 0000000000..c86f3341b1 --- /dev/null +++ b/plugin/src/test/java/org/opensearch/sql/plugin/utils/SchedulerTest.java @@ -0,0 +1,47 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.utils; + +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.when; + +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.opensearch.client.node.NodeClient; +import org.opensearch.threadpool.ThreadPool; + +@RunWith(MockitoJUnitRunner.class) +public class SchedulerTest { + + @Mock + private NodeClient nodeClient; + + @Mock + private ThreadPool threadPool; + + @Test + public void testSchedule() { + when(nodeClient.threadPool()).thenReturn(threadPool); + + doAnswer( + invocation -> { + Runnable task = invocation.getArgument(0); + task.run(); + return null; + }) + .when(threadPool) + .schedule(any(), any(), any()); + AtomicBoolean isRun = new AtomicBoolean(false); + Scheduler.schedule(nodeClient, () -> isRun.set(true)); + assertTrue(isRun.get()); + } + +} \ No newline at end of file diff --git a/plugin/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/plugin/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 0000000000..ca6ee9cea8 --- /dev/null +++ b/plugin/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline \ No newline at end of file diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java index d65f315c8a..dbc753f1f5 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java @@ -11,6 +11,8 @@ import com.amazonaws.auth.BasicAWSCredentials; import java.net.URI; import java.net.URISyntaxException; +import java.security.AccessController; +import java.security.PrivilegedAction; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -53,13 +55,16 @@ public DataSource createDataSource(DataSourceMetadata metadata) { StorageEngine getStorageEngine(String catalogName, Map requiredConfig) { validateFieldsInConfig(requiredConfig, Set.of(URI)); PrometheusClient prometheusClient; - try { - prometheusClient = new PrometheusClientImpl(getHttpClient(requiredConfig), - new URI(requiredConfig.get(URI))); - } catch (URISyntaxException e) { - throw new RuntimeException( - String.format("Prometheus Client creation failed due to: %s", e.getMessage())); - } + prometheusClient = + AccessController.doPrivileged((PrivilegedAction) () -> { + try { + return new PrometheusClientImpl(getHttpClient(requiredConfig), + new URI(requiredConfig.get(URI))); + } catch (URISyntaxException e) { + throw new RuntimeException( + String.format("Prometheus Client creation failed due to: %s", e.getMessage())); + } + }); return new PrometheusStorageEngine(prometheusClient); }