diff --git a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java index 7d0a452e1b..aefaf2674c 100644 --- a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java +++ b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java @@ -11,18 +11,10 @@ import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC; import static org.opensearch.sql.ast.tree.Sort.SortOrder.DESC; import static org.opensearch.sql.data.type.ExprCoreType.STRUCT; -import static org.opensearch.sql.utils.MLCommonsConstants.ACTION; -import static org.opensearch.sql.utils.MLCommonsConstants.MODELID; -import static org.opensearch.sql.utils.MLCommonsConstants.PREDICT; import static org.opensearch.sql.utils.MLCommonsConstants.RCF_ANOMALOUS; import static org.opensearch.sql.utils.MLCommonsConstants.RCF_ANOMALY_GRADE; import static org.opensearch.sql.utils.MLCommonsConstants.RCF_SCORE; -import static org.opensearch.sql.utils.MLCommonsConstants.RCF_TIMESTAMP; -import static org.opensearch.sql.utils.MLCommonsConstants.STATUS; -import static org.opensearch.sql.utils.MLCommonsConstants.TASKID; import static org.opensearch.sql.utils.MLCommonsConstants.TIME_FIELD; -import static org.opensearch.sql.utils.MLCommonsConstants.TRAIN; -import static org.opensearch.sql.utils.MLCommonsConstants.TRAINANDPREDICT; import static org.opensearch.sql.utils.SystemIndexUtils.CATALOGS_TABLE_NAME; import com.google.common.collect.ImmutableList; @@ -69,8 +61,8 @@ import org.opensearch.sql.ast.tree.TableFunction; import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.ast.tree.Values; -import org.opensearch.sql.catalog.CatalogService; -import org.opensearch.sql.catalog.model.Catalog; +import org.opensearch.sql.datasource.DataSourceService; +import org.opensearch.sql.datasource.model.DataSource; import org.opensearch.sql.data.model.ExprMissingValue; import org.opensearch.sql.data.type.ExprCoreType; import org.opensearch.sql.exception.SemanticCheckException; @@ -101,7 +93,7 @@ import org.opensearch.sql.planner.logical.LogicalRename; import org.opensearch.sql.planner.logical.LogicalSort; import org.opensearch.sql.planner.logical.LogicalValues; -import org.opensearch.sql.planner.physical.catalog.CatalogTable; +import org.opensearch.sql.planner.physical.datasource.DataSourceTable; import org.opensearch.sql.storage.Table; import org.opensearch.sql.utils.ParseUtils; @@ -117,7 +109,7 @@ public class Analyzer extends AbstractNodeVisitor private final NamedExpressionAnalyzer namedExpressionAnalyzer; - private final CatalogService catalogService; + private final DataSourceService dataSourceService; private final BuiltinFunctionRepository repository; @@ -126,10 +118,10 @@ public class Analyzer extends AbstractNodeVisitor */ public Analyzer( ExpressionAnalyzer expressionAnalyzer, - CatalogService catalogService, + DataSourceService dataSourceService, BuiltinFunctionRepository repository) { this.expressionAnalyzer = expressionAnalyzer; - this.catalogService = catalogService; + this.dataSourceService = dataSourceService; this.selectExpressionAnalyzer = new SelectExpressionAnalyzer(expressionAnalyzer); this.namedExpressionAnalyzer = new NamedExpressionAnalyzer(expressionAnalyzer); this.repository = repository; @@ -142,25 +134,25 @@ public LogicalPlan analyze(UnresolvedPlan unresolved, AnalysisContext context) { @Override public LogicalPlan visitRelation(Relation node, AnalysisContext context) { QualifiedName qualifiedName = node.getTableQualifiedName(); - Set allowedCatalogNames = catalogService.getCatalogs() + Set allowedCatalogNames = dataSourceService.getDataSources() .stream() - .map(Catalog::getName) + .map(DataSource::getName) .collect(Collectors.toSet()); - CatalogSchemaIdentifierNameResolver catalogSchemaIdentifierNameResolver - = new CatalogSchemaIdentifierNameResolver(qualifiedName.getParts(), allowedCatalogNames); - String tableName = catalogSchemaIdentifierNameResolver.getIdentifierName(); + DataSourceSchemaIdentifierNameResolver dataSourceSchemaIdentifierNameResolver + = new DataSourceSchemaIdentifierNameResolver(qualifiedName.getParts(), allowedCatalogNames); + String tableName = dataSourceSchemaIdentifierNameResolver.getIdentifierName(); context.push(); TypeEnvironment curEnv = context.peek(); Table table; if (CATALOGS_TABLE_NAME.equals(tableName)) { - table = new CatalogTable(catalogService); + table = new DataSourceTable(dataSourceService); } else { - table = catalogService - .getCatalog(catalogSchemaIdentifierNameResolver.getCatalogName()) + table = dataSourceService + .getDataSource(dataSourceSchemaIdentifierNameResolver.getDataSourceName()) .getStorageEngine() - .getTable(new CatalogSchemaName(catalogSchemaIdentifierNameResolver.getCatalogName(), - catalogSchemaIdentifierNameResolver.getSchemaName()), - catalogSchemaIdentifierNameResolver.getIdentifierName()); + .getTable(new CatalogSchemaName(dataSourceSchemaIdentifierNameResolver.getDataSourceName(), + dataSourceSchemaIdentifierNameResolver.getSchemaName()), + dataSourceSchemaIdentifierNameResolver.getIdentifierName()); } table.getFieldTypes().forEach((k, v) -> curEnv.define(new Symbol(Namespace.FIELD_NAME, k), v)); @@ -188,28 +180,28 @@ public LogicalPlan visitRelationSubquery(RelationSubquery node, AnalysisContext @Override public LogicalPlan visitTableFunction(TableFunction node, AnalysisContext context) { QualifiedName qualifiedName = node.getFunctionName(); - Set allowedCatalogNames = catalogService.getCatalogs() + Set allowedCatalogNames = dataSourceService.getDataSources() .stream() - .map(Catalog::getName) + .map(DataSource::getName) .collect(Collectors.toSet()); - CatalogSchemaIdentifierNameResolver catalogSchemaIdentifierNameResolver - = new CatalogSchemaIdentifierNameResolver(qualifiedName.getParts(), allowedCatalogNames); + DataSourceSchemaIdentifierNameResolver dataSourceSchemaIdentifierNameResolver + = new DataSourceSchemaIdentifierNameResolver(qualifiedName.getParts(), allowedCatalogNames); FunctionName functionName - = FunctionName.of(catalogSchemaIdentifierNameResolver.getIdentifierName()); + = FunctionName.of(dataSourceSchemaIdentifierNameResolver.getIdentifierName()); List arguments = node.getArguments().stream() .map(unresolvedExpression -> this.expressionAnalyzer.analyze(unresolvedExpression, context)) .collect(Collectors.toList()); TableFunctionImplementation tableFunctionImplementation = (TableFunctionImplementation) repository.compile( - catalogSchemaIdentifierNameResolver.getCatalogName(), functionName, arguments); + dataSourceSchemaIdentifierNameResolver.getDataSourceName(), functionName, arguments); context.push(); TypeEnvironment curEnv = context.peek(); Table table = tableFunctionImplementation.applyArguments(); table.getFieldTypes().forEach((k, v) -> curEnv.define(new Symbol(Namespace.FIELD_NAME, k), v)); curEnv.define(new Symbol(Namespace.INDEX_NAME, - catalogSchemaIdentifierNameResolver.getIdentifierName()), STRUCT); - return new LogicalRelation(catalogSchemaIdentifierNameResolver.getIdentifierName(), + dataSourceSchemaIdentifierNameResolver.getIdentifierName()), STRUCT); + return new LogicalRelation(dataSourceSchemaIdentifierNameResolver.getIdentifierName(), tableFunctionImplementation.applyArguments()); } diff --git a/core/src/main/java/org/opensearch/sql/analysis/CatalogSchemaIdentifierNameResolver.java b/core/src/main/java/org/opensearch/sql/analysis/DataSourceSchemaIdentifierNameResolver.java similarity index 73% rename from core/src/main/java/org/opensearch/sql/analysis/CatalogSchemaIdentifierNameResolver.java rename to core/src/main/java/org/opensearch/sql/analysis/DataSourceSchemaIdentifierNameResolver.java index 7e0d2af028..c1ba82539e 100644 --- a/core/src/main/java/org/opensearch/sql/analysis/CatalogSchemaIdentifierNameResolver.java +++ b/core/src/main/java/org/opensearch/sql/analysis/DataSourceSchemaIdentifierNameResolver.java @@ -10,13 +10,13 @@ import java.util.List; import java.util.Set; -public class CatalogSchemaIdentifierNameResolver { +public class DataSourceSchemaIdentifierNameResolver { - public static final String DEFAULT_CATALOG_NAME = "@opensearch"; + public static final String DEFAULT_DATASOURCE_NAME = "@opensearch"; public static final String DEFAULT_SCHEMA_NAME = "default"; public static final String INFORMATION_SCHEMA_NAME = "information_schema"; - private String catalogName = DEFAULT_CATALOG_NAME; + private String dataSourceName = DEFAULT_DATASOURCE_NAME; private String schemaName = DEFAULT_SCHEMA_NAME; private String identifierName; @@ -31,8 +31,8 @@ public class CatalogSchemaIdentifierNameResolver { * @param parts parts of qualifiedName. * @param allowedCatalogs allowedCatalogs. */ - public CatalogSchemaIdentifierNameResolver(List parts, Set allowedCatalogs) { - List remainingParts = captureSchemaName(captureCatalogName(parts, allowedCatalogs)); + public DataSourceSchemaIdentifierNameResolver(List parts, Set allowedCatalogs) { + List remainingParts = captureSchemaName(captureDataSourceName(parts, allowedCatalogs)); identifierName = String.join(DOT, remainingParts); } @@ -40,8 +40,8 @@ public String getIdentifierName() { return identifierName; } - public String getCatalogName() { - return catalogName; + public String getDataSourceName() { + return dataSourceName; } public String getSchemaName() { @@ -51,10 +51,10 @@ public String getSchemaName() { // Capture catalog name and return remaining parts(schema name and table name) // from the fully qualified name. - private List captureCatalogName(List parts, Set allowedCatalogs) { + private List captureDataSourceName(List parts, Set allowedCatalogs) { if (parts.size() > 1 && allowedCatalogs.contains(parts.get(0)) - || DEFAULT_CATALOG_NAME.equals(parts.get(0))) { - catalogName = parts.get(0); + || DEFAULT_DATASOURCE_NAME.equals(parts.get(0))) { + dataSourceName = parts.get(0); return parts.subList(1, parts.size()); } else { return parts; diff --git a/core/src/main/java/org/opensearch/sql/catalog/CatalogService.java b/core/src/main/java/org/opensearch/sql/catalog/CatalogService.java deleted file mode 100644 index 4c40920c7b..0000000000 --- a/core/src/main/java/org/opensearch/sql/catalog/CatalogService.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.catalog; - -import java.util.Set; -import org.opensearch.sql.catalog.model.Catalog; -import org.opensearch.sql.storage.StorageEngine; - -/** - * Catalog Service manages catalogs. - */ -public interface CatalogService { - - /** - * Returns all catalog objects. - * - * @return Catalog Catalogs. - */ - Set getCatalogs(); - - /** - * Returns Catalog with corresponding to the catalog name. - * - * @param catalogName Name of the catalog. - * @return Catalog catalog. - */ - Catalog getCatalog(String catalogName); - - /** - * Default opensearch engine is not defined in catalog.json. - * So the registration of default catalog happens separately. - * - * @param storageEngine StorageEngine. - */ - void registerDefaultOpenSearchCatalog(StorageEngine storageEngine); - -} diff --git a/core/src/main/java/org/opensearch/sql/catalog/model/CatalogMetadata.java b/core/src/main/java/org/opensearch/sql/catalog/model/CatalogMetadata.java deleted file mode 100644 index a859090a5d..0000000000 --- a/core/src/main/java/org/opensearch/sql/catalog/model/CatalogMetadata.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.catalog.model; - -import com.fasterxml.jackson.annotation.JsonFormat; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonProperty; -import java.util.Map; -import lombok.Getter; -import lombok.Setter; - -@JsonIgnoreProperties(ignoreUnknown = true) -@Getter -@Setter -public class CatalogMetadata { - - @JsonProperty(required = true) - private String name; - - @JsonProperty(required = true) - @JsonFormat(with = JsonFormat.Feature.ACCEPT_CASE_INSENSITIVE_PROPERTIES) - private ConnectorType connector; - - @JsonProperty(required = true) - private Map properties; - -} diff --git a/core/src/main/java/org/opensearch/sql/catalog/model/ConnectorType.java b/core/src/main/java/org/opensearch/sql/catalog/model/ConnectorType.java deleted file mode 100644 index b84c68adbf..0000000000 --- a/core/src/main/java/org/opensearch/sql/catalog/model/ConnectorType.java +++ /dev/null @@ -1,10 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.catalog.model; - -public enum ConnectorType { - PROMETHEUS,OPENSEARCH -} diff --git a/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java b/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java new file mode 100644 index 0000000000..6a2c7f46d3 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java @@ -0,0 +1,43 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.datasource; + +import java.util.Set; +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; + +/** + * DataSource Service manage {@link DataSource}. + */ +public interface DataSourceService { + + /** + * Returns all DataSource objects. + * + * @return set of {@link DataSource}. + */ + Set getDataSources(); + + /** + * Returns {@link DataSource} with corresponding to the DataSource name. + * + * @param dataSourceName Name of the {@link DataSource}. + * @return {@link DataSource}. + */ + DataSource getDataSource(String dataSourceName); + + /** + * Register {@link DataSource} defined by {@link DataSourceMetadata}. + * + * @param dataSourceMetadata {@link DataSourceMetadata}. + */ + void addDataSource(DataSourceMetadata dataSourceMetadata); + + /** + * remove all the registered {@link DataSource}. + */ + void clear(); +} diff --git a/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java b/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java new file mode 100644 index 0000000000..febf63e475 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/datasource/DataSourceServiceImpl.java @@ -0,0 +1,93 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.datasource; + +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.storage.DataSourceFactory; + +/** + * This class manages catalogs and responsible for creating connectors to these catalogs. + */ +public class DataSourceServiceImpl implements DataSourceService { + + private static String CATALOG_NAME_REGEX = "[@*A-Za-z]+?[*a-zA-Z_\\-0-9]*"; + + private static final Logger LOG = LogManager.getLogger(); + + private final ConcurrentHashMap dataSourceMap; + + private final Map dataSourceFactoryMap; + + /** + * Constructor. + */ + public DataSourceServiceImpl(Set dataSourceFactories) { + dataSourceFactoryMap = + dataSourceFactories.stream() + .collect(Collectors.toMap(DataSourceFactory::getDataSourceType, f -> f)); + dataSourceMap = new ConcurrentHashMap<>(); + } + + @Override + public Set getDataSources() { + return new HashSet<>(dataSourceMap.values()); + } + + @Override + public DataSource getDataSource(String dataSourceName) { + if (!dataSourceMap.containsKey(dataSourceName)) { + throw new IllegalArgumentException( + String.format("DataSource with name %s doesn't exist.", dataSourceName)); + } + return dataSourceMap.get(dataSourceName); + } + + @Override + public void addDataSource(DataSourceMetadata metadata) { + validateDataSource(metadata); + dataSourceMap.put( + metadata.getName(), + dataSourceFactoryMap.get(metadata.getConnector()).createDataSource(metadata)); + } + + @Override + public void clear() { + dataSourceMap.clear(); + } + + /** + * This can be moved to a different validator class when we introduce more connectors. + * + * @param metadata {@link DataSourceMetadata}. + */ + private void validateDataSource(DataSourceMetadata metadata) { + if (StringUtils.isEmpty(metadata.getName())) { + throw new IllegalArgumentException( + "Missing Name Field from a DataSource. Name is a required parameter."); + } + if (!metadata.getName().matches(CATALOG_NAME_REGEX)) { + throw new IllegalArgumentException( + String.format( + "DataSource Name: %s contains illegal characters. Allowed characters: a-zA-Z0-9_-*@.", + metadata.getName())); + } + if (Objects.isNull(metadata.getProperties())) { + throw new IllegalArgumentException( + "Missing properties field in catalog configuration. Properties are required parameters."); + } + } +} diff --git a/core/src/main/java/org/opensearch/sql/catalog/model/Catalog.java b/core/src/main/java/org/opensearch/sql/datasource/model/DataSource.java similarity index 77% rename from core/src/main/java/org/opensearch/sql/catalog/model/Catalog.java rename to core/src/main/java/org/opensearch/sql/datasource/model/DataSource.java index 5b7eaca523..2aad0c4a0a 100644 --- a/core/src/main/java/org/opensearch/sql/catalog/model/Catalog.java +++ b/core/src/main/java/org/opensearch/sql/datasource/model/DataSource.java @@ -5,7 +5,7 @@ * */ -package org.opensearch.sql.catalog.model; +package org.opensearch.sql.datasource.model; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -15,11 +15,11 @@ @Getter @RequiredArgsConstructor @EqualsAndHashCode -public class Catalog { +public class DataSource { private final String name; - private final ConnectorType connectorType; + private final DataSourceType connectorType; @EqualsAndHashCode.Exclude private final StorageEngine storageEngine; diff --git a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java new file mode 100644 index 0000000000..967c221e24 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java @@ -0,0 +1,47 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.datasource.model; + +import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME; + +import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import org.opensearch.sql.datasource.DataSourceService; + +@JsonIgnoreProperties(ignoreUnknown = true) +@Getter +@Setter +@EqualsAndHashCode +public class DataSourceMetadata { + + @JsonProperty(required = true) + private String name; + + @JsonProperty(required = true) + @JsonFormat(with = JsonFormat.Feature.ACCEPT_CASE_INSENSITIVE_PROPERTIES) + private DataSourceType connector; + + @JsonProperty(required = true) + private Map properties; + + /** + * Default OpenSearch {@link DataSourceMetadata}. Which is used to register default OpenSearch + * {@link DataSource} to {@link DataSourceService}. + */ + public static DataSourceMetadata defaultOpenSearchDataSourceMetadata() { + DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); + dataSourceMetadata.setName(DEFAULT_DATASOURCE_NAME); + dataSourceMetadata.setConnector(DataSourceType.OPENSEARCH); + dataSourceMetadata.setProperties(ImmutableMap.of()); + return dataSourceMetadata; + } +} diff --git a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java new file mode 100644 index 0000000000..5a0023809c --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java @@ -0,0 +1,13 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.datasource.model; + +/** + * Supported DataSource Type. + */ +public enum DataSourceType { + PROMETHEUS,OPENSEARCH +} diff --git a/core/src/main/java/org/opensearch/sql/catalog/model/auth/AuthenticationType.java b/core/src/main/java/org/opensearch/sql/datasource/model/auth/AuthenticationType.java similarity index 89% rename from core/src/main/java/org/opensearch/sql/catalog/model/auth/AuthenticationType.java rename to core/src/main/java/org/opensearch/sql/datasource/model/auth/AuthenticationType.java index 1157d8e497..9cf3e01509 100644 --- a/core/src/main/java/org/opensearch/sql/catalog/model/auth/AuthenticationType.java +++ b/core/src/main/java/org/opensearch/sql/datasource/model/auth/AuthenticationType.java @@ -5,12 +5,11 @@ * */ -package org.opensearch.sql.catalog.model.auth; +package org.opensearch.sql.datasource.model.auth; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; public enum AuthenticationType { diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/catalog/CatalogTable.java b/core/src/main/java/org/opensearch/sql/planner/physical/datasource/DataSourceTable.java similarity index 76% rename from core/src/main/java/org/opensearch/sql/planner/physical/catalog/CatalogTable.java rename to core/src/main/java/org/opensearch/sql/planner/physical/datasource/DataSourceTable.java index 4e6a87e21b..c0b3a5bba7 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/catalog/CatalogTable.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/datasource/DataSourceTable.java @@ -5,13 +5,13 @@ * */ -package org.opensearch.sql.planner.physical.catalog; +package org.opensearch.sql.planner.physical.datasource; import com.google.common.annotations.VisibleForTesting; import java.util.Map; import lombok.EqualsAndHashCode; import lombok.RequiredArgsConstructor; -import org.opensearch.sql.catalog.CatalogService; +import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.planner.DefaultImplementor; import org.opensearch.sql.planner.logical.LogicalPlan; @@ -28,13 +28,13 @@ */ @RequiredArgsConstructor @EqualsAndHashCode -public class CatalogTable implements Table { +public class DataSourceTable implements Table { - private final CatalogService catalogService; + private final DataSourceService catalogService; @Override public Map getFieldTypes() { - return CatalogTableSchema.CATALOG_TABLE_SCHEMA.getMapping(); + return DataSourceTableSchema.DATA_SOURCE_TABLE_SCHEMA.getMapping(); } @Override @@ -47,11 +47,11 @@ public PhysicalPlan implement(LogicalPlan plan) { public static class CatalogTableDefaultImplementor extends DefaultImplementor { - private final CatalogService catalogService; + private final DataSourceService catalogService; @Override public PhysicalPlan visitRelation(LogicalRelation node, Object context) { - return new CatalogTableScan(catalogService); + return new DataSourceTableScan(catalogService); } } diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/catalog/CatalogTableScan.java b/core/src/main/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScan.java similarity index 77% rename from core/src/main/java/org/opensearch/sql/planner/physical/catalog/CatalogTableScan.java rename to core/src/main/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScan.java index efc59c97ec..6092f35f33 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/catalog/CatalogTableScan.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScan.java @@ -5,7 +5,7 @@ * */ -package org.opensearch.sql.planner.physical.catalog; +package org.opensearch.sql.planner.physical.datasource; import com.google.common.collect.ImmutableMap; import java.util.ArrayList; @@ -14,8 +14,8 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Set; -import org.opensearch.sql.catalog.CatalogService; -import org.opensearch.sql.catalog.model.Catalog; +import org.opensearch.sql.datasource.DataSourceService; +import org.opensearch.sql.datasource.model.DataSource; import org.opensearch.sql.data.model.ExprTupleValue; import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.data.model.ExprValueUtils; @@ -28,13 +28,13 @@ * persisting catalog info somewhere. * */ -public class CatalogTableScan extends TableScanOperator { +public class DataSourceTableScan extends TableScanOperator { - private final CatalogService catalogService; + private final DataSourceService catalogService; private Iterator iterator; - public CatalogTableScan(CatalogService catalogService) { + public DataSourceTableScan(DataSourceService catalogService) { this.catalogService = catalogService; this.iterator = Collections.emptyIterator(); } @@ -47,8 +47,8 @@ public String explain() { @Override public void open() { List exprValues = new ArrayList<>(); - Set catalogs = catalogService.getCatalogs(); - for (Catalog catalog : catalogs) { + Set catalogs = catalogService.getDataSources(); + for (DataSource catalog : catalogs) { exprValues.add( new ExprTupleValue(new LinkedHashMap<>(ImmutableMap.of( "DATASOURCE_NAME", ExprValueUtils.stringValue(catalog.getName()), diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/catalog/CatalogTableSchema.java b/core/src/main/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableSchema.java similarity index 79% rename from core/src/main/java/org/opensearch/sql/planner/physical/catalog/CatalogTableSchema.java rename to core/src/main/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableSchema.java index b360eb87db..47cb136439 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/catalog/CatalogTableSchema.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableSchema.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.sql.planner.physical.catalog; +package org.opensearch.sql.planner.physical.datasource; import static org.opensearch.sql.data.type.ExprCoreType.STRING; @@ -18,9 +18,9 @@ */ @Getter @RequiredArgsConstructor -public enum CatalogTableSchema { +public enum DataSourceTableSchema { - CATALOG_TABLE_SCHEMA(new LinkedHashMap<>() { + DATA_SOURCE_TABLE_SCHEMA(new LinkedHashMap<>() { { put("DATASOURCE_NAME", STRING); put("CONNECTOR_TYPE", STRING); diff --git a/core/src/main/java/org/opensearch/sql/storage/DataSourceFactory.java b/core/src/main/java/org/opensearch/sql/storage/DataSourceFactory.java new file mode 100644 index 0000000000..feb6d20f3b --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/storage/DataSourceFactory.java @@ -0,0 +1,28 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.storage; + +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; + +/** + * Each builtin {@link DataSourceType} mapping to DataSourceFactory implementation. + * DataSourceFactory is used to create {@link DataSource} based on configuration. + */ +public interface DataSourceFactory { + /** + * Get {@link DataSourceType}. + */ + DataSourceType getDataSourceType(); + + /** + * Create {@link DataSource}. + */ + DataSource createDataSource(DataSourceMetadata metadata); +} diff --git a/core/src/main/java/org/opensearch/sql/storage/StorageEngineFactory.java b/core/src/main/java/org/opensearch/sql/storage/StorageEngineFactory.java deleted file mode 100644 index 4cc27f6fa0..0000000000 --- a/core/src/main/java/org/opensearch/sql/storage/StorageEngineFactory.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * - * * Copyright OpenSearch Contributors - * * SPDX-License-Identifier: Apache-2.0 - * - */ - -package org.opensearch.sql.storage; - -import java.util.Map; -import org.opensearch.sql.catalog.model.ConnectorType; - -public interface StorageEngineFactory { - - ConnectorType getConnectorType(); - - StorageEngine getStorageEngine(String catalogName, Map requiredConfig); - -} diff --git a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java index 97c560d505..e4facf99e7 100644 --- a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java +++ b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java @@ -10,7 +10,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.opensearch.sql.analysis.CatalogSchemaIdentifierNameResolver.DEFAULT_CATALOG_NAME; +import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME; import static org.opensearch.sql.ast.dsl.AstDSL.aggregate; import static org.opensearch.sql.ast.dsl.AstDSL.alias; import static org.opensearch.sql.ast.dsl.AstDSL.argument; @@ -88,7 +88,7 @@ import org.opensearch.sql.planner.logical.LogicalPlanDSL; import org.opensearch.sql.planner.logical.LogicalProject; import org.opensearch.sql.planner.logical.LogicalRelation; -import org.opensearch.sql.planner.physical.catalog.CatalogTable; +import org.opensearch.sql.planner.physical.datasource.DataSourceTable; import org.springframework.context.annotation.Configuration; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ContextConfiguration; @@ -174,7 +174,7 @@ public void filter_relation_with_information_schema_and_os_catalog() { dsl.equal(DSL.ref("integer_value", INTEGER), DSL.literal(integerValue(1)))), AstDSL.filter( AstDSL.relation( - AstDSL.qualifiedName(DEFAULT_CATALOG_NAME, "information_schema", "tables")), + AstDSL.qualifiedName(DEFAULT_DATASOURCE_NAME, "information_schema", "tables")), AstDSL.equalTo(AstDSL.field("integer_value"), AstDSL.intLiteral(1)))); } @@ -1078,7 +1078,7 @@ public void table_function_with_wrong_table_function() { @Test public void show_catalogs() { - assertAnalyzeEqual(new LogicalRelation(".CATALOGS", new CatalogTable(catalogService)), + assertAnalyzeEqual(new LogicalRelation(".CATALOGS", new DataSourceTable(catalogService)), AstDSL.relation(qualifiedName(".CATALOGS"))); } diff --git a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java index 447802c963..f119d2a4cc 100644 --- a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java +++ b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java @@ -15,14 +15,14 @@ import java.util.Map; import java.util.Set; import org.apache.commons.lang3.tuple.Pair; -import org.opensearch.sql.CatalogSchemaName; import org.opensearch.sql.analysis.symbol.Namespace; import org.opensearch.sql.analysis.symbol.Symbol; import org.opensearch.sql.analysis.symbol.SymbolTable; import org.opensearch.sql.ast.tree.UnresolvedPlan; -import org.opensearch.sql.catalog.CatalogService; -import org.opensearch.sql.catalog.model.Catalog; -import org.opensearch.sql.catalog.model.ConnectorType; +import org.opensearch.sql.datasource.DataSourceService; +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.config.TestConfig; import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.exception.ExpressionEvaluationException; @@ -96,8 +96,8 @@ public PhysicalPlan implement(LogicalPlan plan) { } @Bean - protected CatalogService catalogService() { - return new DefaultCatalogService(); + protected DataSourceService catalogService() { + return new DefaultDataSourceService(); } @@ -143,16 +143,17 @@ protected Environment typeEnv() { protected Table table; @Autowired - protected CatalogService catalogService; + protected DataSourceService catalogService; @Autowired protected Environment typeEnv; @Bean - protected Analyzer analyzer(ExpressionAnalyzer expressionAnalyzer, CatalogService catalogService, - StorageEngine storageEngine, BuiltinFunctionRepository functionRepository, - Table table) { - catalogService.registerDefaultOpenSearchCatalog(storageEngine); + protected Analyzer analyzer( + ExpressionAnalyzer expressionAnalyzer, + DataSourceService dataSourceService, + BuiltinFunctionRepository functionRepository, + Table table) { functionRepository.register("prometheus", new FunctionResolver() { @Override @@ -170,7 +171,7 @@ public FunctionName getFunctionName() { return FunctionName.of("query_range"); } }); - return new Analyzer(expressionAnalyzer, catalogService, functionRepository); + return new Analyzer(expressionAnalyzer, dataSourceService, functionRepository); } @Bean @@ -196,26 +197,31 @@ protected LogicalPlan analyze(UnresolvedPlan unresolvedPlan) { return analyzer.analyze(unresolvedPlan, analysisContext); } - private class DefaultCatalogService implements CatalogService { + private class DefaultDataSourceService implements DataSourceService { private StorageEngine storageEngine = storageEngine(); - private final Catalog catalog - = new Catalog("prometheus", ConnectorType.PROMETHEUS, storageEngine); + private final DataSource catalog + = new DataSource("prometheus", DataSourceType.PROMETHEUS, storageEngine); @Override - public Set getCatalogs() { + public Set getDataSources() { return ImmutableSet.of(catalog); } @Override - public Catalog getCatalog(String catalogName) { + public DataSource getDataSource(String dataSourceName) { return catalog; } @Override - public void registerDefaultOpenSearchCatalog(StorageEngine storageEngine) { - this.storageEngine = storageEngine; + public void addDataSource(DataSourceMetadata metadata) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + throw new UnsupportedOperationException(); } } diff --git a/core/src/test/java/org/opensearch/sql/analysis/model/CatalogSchemaIdentifierNameResolverTest.java b/core/src/test/java/org/opensearch/sql/analysis/model/CatalogSchemaIdentifierNameResolverTest.java deleted file mode 100644 index 069a1d814f..0000000000 --- a/core/src/test/java/org/opensearch/sql/analysis/model/CatalogSchemaIdentifierNameResolverTest.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * - * * Copyright OpenSearch Contributors - * * SPDX-License-Identifier: Apache-2.0 - * - */ - -package org.opensearch.sql.analysis.model; - - -import java.util.Arrays; -import java.util.Collections; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.opensearch.sql.analysis.CatalogSchemaIdentifierNameResolver; - -public class CatalogSchemaIdentifierNameResolverTest { - - @Test - void testFullyQualifiedName() { - CatalogSchemaIdentifierNameResolver - catalogSchemaIdentifierNameResolver = new CatalogSchemaIdentifierNameResolver( - Arrays.asList("prom", "information_schema", "tables"), Collections.singleton("prom")); - Assertions.assertEquals("information_schema", - catalogSchemaIdentifierNameResolver.getSchemaName()); - Assertions.assertEquals("prom", catalogSchemaIdentifierNameResolver.getCatalogName()); - Assertions.assertEquals("tables", catalogSchemaIdentifierNameResolver.getIdentifierName()); - } - -} diff --git a/core/src/test/java/org/opensearch/sql/analysis/model/DataSourceSchemaIdentifierNameResolverTest.java b/core/src/test/java/org/opensearch/sql/analysis/model/DataSourceSchemaIdentifierNameResolverTest.java new file mode 100644 index 0000000000..7d7c92f3ed --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/analysis/model/DataSourceSchemaIdentifierNameResolverTest.java @@ -0,0 +1,30 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.analysis.model; + + +import java.util.Arrays; +import java.util.Collections; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver; + +public class DataSourceSchemaIdentifierNameResolverTest { + + @Test + void testFullyQualifiedName() { + DataSourceSchemaIdentifierNameResolver + dataSourceSchemaIdentifierNameResolver = new DataSourceSchemaIdentifierNameResolver( + Arrays.asList("prom", "information_schema", "tables"), Collections.singleton("prom")); + Assertions.assertEquals("information_schema", + dataSourceSchemaIdentifierNameResolver.getSchemaName()); + Assertions.assertEquals("prom", dataSourceSchemaIdentifierNameResolver.getDataSourceName()); + Assertions.assertEquals("tables", dataSourceSchemaIdentifierNameResolver.getIdentifierName()); + } + +} diff --git a/core/src/test/java/org/opensearch/sql/datasource/DataSourceServiceImplTest.java b/core/src/test/java/org/opensearch/sql/datasource/DataSourceServiceImplTest.java new file mode 100644 index 0000000000..4255d8e5ca --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/datasource/DataSourceServiceImplTest.java @@ -0,0 +1,138 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.datasource; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.when; +import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME; + +import com.google.common.collect.ImmutableMap; +import java.util.HashSet; +import java.util.Map; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.storage.DataSourceFactory; +import org.opensearch.sql.storage.StorageEngine; + +@ExtendWith(MockitoExtension.class) +class DataSourceServiceImplTest { + + static final String NAME = "opensearch"; + + @Mock private DataSourceFactory dataSourceFactory; + + @Mock private StorageEngine storageEngine; + + private DataSourceService dataSourceService; + + @BeforeEach + public void setup() { + lenient() + .doAnswer( + invocation -> { + DataSourceMetadata metadata = invocation.getArgument(0); + return new DataSource(metadata.getName(), metadata.getConnector(), storageEngine); + }) + .when(dataSourceFactory) + .createDataSource(any()); + when(dataSourceFactory.getDataSourceType()).thenReturn(DataSourceType.OPENSEARCH); + dataSourceService = + new DataSourceServiceImpl( + new HashSet<>() { + { + add(dataSourceFactory); + } + }); + } + + @Test + void getDataSourceSuccess() { + dataSourceService.addDataSource(DataSourceMetadata.defaultOpenSearchDataSourceMetadata()); + + assertEquals( + new DataSource(DEFAULT_DATASOURCE_NAME, DataSourceType.OPENSEARCH, storageEngine), + dataSourceService.getDataSource(DEFAULT_DATASOURCE_NAME)); + } + + @Test + void getNotExistDataSourceShouldFail() { + IllegalArgumentException exception = + assertThrows(IllegalArgumentException.class, () -> dataSourceService.getDataSource("mock")); + assertEquals("DataSource with name mock doesn't exist.", exception.getMessage()); + } + + @Test + void getAddDataSourcesShouldSuccess() { + assertEquals(0, dataSourceService.getDataSources().size()); + + dataSourceService.addDataSource(metadata(NAME, DataSourceType.OPENSEARCH, ImmutableMap.of())); + assertEquals(1, dataSourceService.getDataSources().size()); + } + + @Test + void noDataSourceExistAfterClear() { + dataSourceService.addDataSource(metadata(NAME, DataSourceType.OPENSEARCH, ImmutableMap.of())); + assertEquals(1, dataSourceService.getDataSources().size()); + + dataSourceService.clear(); + assertEquals(0, dataSourceService.getDataSources().size()); + } + + @Test + void metaDataMissingNameShouldFail() { + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> + dataSourceService.addDataSource( + metadata(null, DataSourceType.OPENSEARCH, ImmutableMap.of()))); + assertEquals( + "Missing Name Field from a DataSource. Name is a required parameter.", + exception.getMessage()); + } + + @Test + void metaDataHasIllegalDataSourceNameShouldFail() { + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> + dataSourceService.addDataSource( + metadata("prometheus.test", DataSourceType.OPENSEARCH, ImmutableMap.of()))); + assertEquals( + "DataSource Name: prometheus.test contains illegal characters. " + + "Allowed characters: a-zA-Z0-9_-*@.", + exception.getMessage()); + } + + @Test + void metaDataMissingPropertiesShouldFail() { + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> dataSourceService.addDataSource(metadata(NAME, DataSourceType.OPENSEARCH, null))); + assertEquals( + "Missing properties field in catalog configuration. Properties are required parameters.", + exception.getMessage()); + } + + DataSourceMetadata metadata(String name, DataSourceType type, Map properties) { + DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); + dataSourceMetadata.setName(name); + dataSourceMetadata.setConnector(type); + dataSourceMetadata.setProperties(properties); + return dataSourceMetadata; + } +} diff --git a/core/src/test/java/org/opensearch/sql/planner/PlannerTest.java b/core/src/test/java/org/opensearch/sql/planner/PlannerTest.java index 4207c7d31b..65b0093108 100644 --- a/core/src/test/java/org/opensearch/sql/planner/PlannerTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/PlannerTest.java @@ -12,7 +12,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.when; -import static org.opensearch.sql.analysis.CatalogSchemaIdentifierNameResolver.DEFAULT_CATALOG_NAME; +import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME; import static org.opensearch.sql.data.type.ExprCoreType.DOUBLE; import static org.opensearch.sql.data.type.ExprCoreType.INTEGER; @@ -81,7 +81,7 @@ public void planner_test() { LogicalPlanDSL.filter( LogicalPlanDSL.relation("schema", storageEngine.getTable( - new CatalogSchemaName(DEFAULT_CATALOG_NAME, "default"), + new CatalogSchemaName(DEFAULT_DATASOURCE_NAME, "default"), "schema")), dsl.equal(DSL.ref("response", INTEGER), DSL.literal(10)) ), diff --git a/core/src/test/java/org/opensearch/sql/planner/physical/catalog/CatalogTableScanTest.java b/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java similarity index 55% rename from core/src/test/java/org/opensearch/sql/planner/physical/catalog/CatalogTableScanTest.java rename to core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java index cf9b5fe016..5c1bda2497 100644 --- a/core/src/test/java/org/opensearch/sql/planner/physical/catalog/CatalogTableScanTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java @@ -5,7 +5,7 @@ * */ -package org.opensearch.sql.planner.physical.catalog; +package org.opensearch.sql.planner.physical.datasource; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -21,49 +21,49 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.sql.catalog.CatalogService; -import org.opensearch.sql.catalog.model.Catalog; -import org.opensearch.sql.catalog.model.ConnectorType; +import org.opensearch.sql.datasource.DataSourceService; +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.data.model.ExprTupleValue; import org.opensearch.sql.data.model.ExprValueUtils; import org.opensearch.sql.storage.StorageEngine; @ExtendWith(MockitoExtension.class) -public class CatalogTableScanTest { +public class DataSourceTableScanTest { @Mock - private CatalogService catalogService; + private DataSourceService catalogService; @Mock private StorageEngine storageEngine; - private CatalogTableScan catalogTableScan; + private DataSourceTableScan dataSourceTableScan; @BeforeEach private void setUp() { - catalogTableScan = new CatalogTableScan(catalogService); + dataSourceTableScan = new DataSourceTableScan(catalogService); } @Test void testExplain() { - assertEquals("GetCatalogRequestRequest{}", catalogTableScan.explain()); + assertEquals("GetCatalogRequestRequest{}", dataSourceTableScan.explain()); } @Test void testIterator() { - Set catalogSet = new HashSet<>(); - catalogSet.add(new Catalog("prometheus", ConnectorType.PROMETHEUS, storageEngine)); - catalogSet.add(new Catalog("opensearch", ConnectorType.OPENSEARCH, storageEngine)); - when(catalogService.getCatalogs()).thenReturn(catalogSet); + Set catalogSet = new HashSet<>(); + catalogSet.add(new DataSource("prometheus", DataSourceType.PROMETHEUS, storageEngine)); + catalogSet.add(new DataSource("opensearch", DataSourceType.OPENSEARCH, storageEngine)); + when(catalogService.getDataSources()).thenReturn(catalogSet); - assertFalse(catalogTableScan.hasNext()); - catalogTableScan.open(); - assertTrue(catalogTableScan.hasNext()); - for (Catalog catalog : catalogSet) { + assertFalse(dataSourceTableScan.hasNext()); + dataSourceTableScan.open(); + assertTrue(dataSourceTableScan.hasNext()); + for (DataSource catalog : catalogSet) { assertEquals(new ExprTupleValue(new LinkedHashMap<>(ImmutableMap.of( "DATASOURCE_NAME", ExprValueUtils.stringValue(catalog.getName()), "CONNECTOR_TYPE", ExprValueUtils.stringValue(catalog.getConnectorType().name())))), - catalogTableScan.next()); + dataSourceTableScan.next()); } } diff --git a/core/src/test/java/org/opensearch/sql/planner/physical/catalog/CatalogTableTest.java b/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableTest.java similarity index 69% rename from core/src/test/java/org/opensearch/sql/planner/physical/catalog/CatalogTableTest.java rename to core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableTest.java index 1c069005f0..caacc63666 100644 --- a/core/src/test/java/org/opensearch/sql/planner/physical/catalog/CatalogTableTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableTest.java @@ -5,7 +5,7 @@ * */ -package org.opensearch.sql.planner.physical.catalog; +package org.opensearch.sql.planner.physical.datasource; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -17,22 +17,22 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.sql.catalog.CatalogService; +import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.data.type.ExprCoreType; import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.planner.logical.LogicalPlanDSL; import org.opensearch.sql.planner.physical.PhysicalPlan; @ExtendWith(MockitoExtension.class) -public class CatalogTableTest { +public class DataSourceTableTest { @Mock - private CatalogService catalogService; + private DataSourceService catalogService; @Test void testGetFieldTypes() { - CatalogTable catalogTable = new CatalogTable(catalogService); - Map fieldTypes = catalogTable.getFieldTypes(); + DataSourceTable dataSourceTable = new DataSourceTable(catalogService); + Map fieldTypes = dataSourceTable.getFieldTypes(); Map expectedTypes = new HashMap<>(); expectedTypes.put("DATASOURCE_NAME", ExprCoreType.STRING); expectedTypes.put("CONNECTOR_TYPE", ExprCoreType.STRING); @@ -41,10 +41,10 @@ void testGetFieldTypes() { @Test void testImplement() { - CatalogTable catalogTable = new CatalogTable(catalogService); + DataSourceTable dataSourceTable = new DataSourceTable(catalogService); PhysicalPlan physicalPlan - = catalogTable.implement(LogicalPlanDSL.relation(".CATALOGS", catalogTable)); - assertTrue(physicalPlan instanceof CatalogTableScan); + = dataSourceTable.implement(LogicalPlanDSL.relation(".CATALOGS", dataSourceTable)); + assertTrue(physicalPlan instanceof DataSourceTableScan); } // todo. temporary added for code coverage. remove if required. @@ -52,7 +52,7 @@ void testImplement() { void testExist() { UnsupportedOperationException exception = assertThrows(UnsupportedOperationException.class, - () -> new CatalogTable(catalogService).exists()); + () -> new DataSourceTable(catalogService).exists()); assertEquals("Unsupported Operation", exception.getMessage()); } @@ -61,7 +61,7 @@ void testExist() { void testCreateTable() { UnsupportedOperationException exception = assertThrows(UnsupportedOperationException.class, - () -> new CatalogTable(catalogService).create(new HashMap<>())); + () -> new DataSourceTable(catalogService).create(new HashMap<>())); assertEquals("Unsupported Operation", exception.getMessage()); } } diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java index bcd0c0ffb8..e5a743a45d 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java @@ -6,9 +6,11 @@ package org.opensearch.sql.ppl; +import static org.opensearch.sql.datasource.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata; import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import java.io.IOException; import java.util.Collections; import java.util.List; @@ -20,7 +22,8 @@ import org.opensearch.client.RestHighLevelClient; import org.opensearch.sql.analysis.Analyzer; import org.opensearch.sql.analysis.ExpressionAnalyzer; -import org.opensearch.sql.catalog.CatalogService; +import org.opensearch.sql.datasource.DataSourceService; +import org.opensearch.sql.datasource.DataSourceServiceImpl; import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.executor.DefaultQueryManager; @@ -37,14 +40,14 @@ import org.opensearch.sql.opensearch.client.OpenSearchRestClient; import org.opensearch.sql.opensearch.executor.OpenSearchExecutionEngine; import org.opensearch.sql.opensearch.executor.protector.OpenSearchExecutionProtector; -import org.opensearch.sql.opensearch.storage.OpenSearchStorageEngine; +import org.opensearch.sql.opensearch.storage.OpenSearchDataSourceFactory; import org.opensearch.sql.planner.Planner; import org.opensearch.sql.planner.optimizer.LogicalPlanOptimizer; -import org.opensearch.sql.plugin.catalog.CatalogServiceImpl; import org.opensearch.sql.ppl.config.PPLServiceConfig; import org.opensearch.sql.ppl.domain.PPLQueryRequest; import org.opensearch.sql.protocol.response.QueryResult; import org.opensearch.sql.protocol.response.format.SimpleJsonResponseFormatter; +import org.opensearch.sql.storage.DataSourceFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.AnnotationConfigApplicationContext; @@ -75,9 +78,12 @@ public void init() { new OpenSearchExecutionProtector(new AlwaysHealthyMonitor()))); context.registerBean(OpenSearchClient.class, () -> client); context.registerBean(Settings.class, () -> defaultSettings()); - OpenSearchStorageEngine openSearchStorageEngine = new OpenSearchStorageEngine(client, defaultSettings()); - CatalogServiceImpl.getInstance().registerDefaultOpenSearchCatalog(openSearchStorageEngine); - context.registerBean(CatalogService.class, CatalogServiceImpl::getInstance); + DataSourceService dataSourceService = new DataSourceServiceImpl( + new ImmutableSet.Builder() + .add(new OpenSearchDataSourceFactory(client, defaultSettings())) + .build()); + dataSourceService.addDataSource(defaultOpenSearchDataSourceMetadata()); + context.registerBean(DataSourceService.class, () -> dataSourceService); context.register(StandaloneConfig.class); context.register(PPLServiceConfig.class); context.refresh(); @@ -169,7 +175,7 @@ public InternalRestHighLevelClient(RestClient restClient) { @Import({ExpressionConfig.class}) static class StandaloneConfig { @Autowired - private CatalogService catalogService; + private DataSourceService catalogService; @Autowired private ExecutionEngine executionEngine; @@ -183,7 +189,7 @@ QueryManager queryManager() { @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) QueryPlanFactory queryExecutionFactory(BuiltinFunctionRepository functionRepository) { catalogService - .getCatalogs() + .getDataSources() .forEach( catalog -> catalog diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/plugin/RestSQLQueryActionTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/plugin/RestSQLQueryActionTest.java index 4c9afe802e..3eafbd32d9 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/plugin/RestSQLQueryActionTest.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/plugin/RestSQLQueryActionTest.java @@ -27,7 +27,7 @@ import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.RestChannel; import org.opensearch.rest.RestRequest; -import org.opensearch.sql.catalog.CatalogService; +import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.common.antlr.SyntaxCheckException; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.executor.QueryManager; @@ -66,7 +66,7 @@ public void setup() { context = new AnnotationConfigApplicationContext(); context.registerBean(StorageEngine.class, () -> Mockito.mock(StorageEngine.class)); context.registerBean(ExecutionEngine.class, () -> Mockito.mock(ExecutionEngine.class)); - context.registerBean(CatalogService.class, () -> Mockito.mock(CatalogService.class)); + context.registerBean(DataSourceService.class, () -> Mockito.mock(DataSourceService.class)); context.registerBean(QueryManager.class, () -> queryManager); context.registerBean(QueryPlanFactory.class, () -> factory); context.register(SQLServiceConfig.class); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactory.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactory.java new file mode 100644 index 0000000000..db6b626c29 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactory.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.storage; + +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.opensearch.client.OpenSearchClient; +import org.opensearch.sql.storage.DataSourceFactory; + +@RequiredArgsConstructor +public class OpenSearchDataSourceFactory implements DataSourceFactory { + + /** OpenSearch client connection. */ + private final OpenSearchClient client; + + private final Settings settings; + + @Override + public DataSourceType getDataSourceType() { + return DataSourceType.OPENSEARCH; + } + + @Override + public DataSource createDataSource(DataSourceMetadata metadata) { + return new DataSource(metadata.getName(), DataSourceType.OPENSEARCH, + new OpenSearchStorageEngine(client, settings)); + } +} diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngineTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngineTest.java index dd660d54a1..5c9284e980 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngineTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngineTest.java @@ -8,7 +8,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.opensearch.sql.analysis.CatalogSchemaIdentifierNameResolver.DEFAULT_CATALOG_NAME; +import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME; import static org.opensearch.sql.utils.SystemIndexUtils.TABLE_INFO; import org.junit.jupiter.api.Test; @@ -33,14 +33,14 @@ class OpenSearchStorageEngineTest { @Test public void getTable() { OpenSearchStorageEngine engine = new OpenSearchStorageEngine(client, settings); - Table table = engine.getTable(new CatalogSchemaName(DEFAULT_CATALOG_NAME, "default"), "test"); + Table table = engine.getTable(new CatalogSchemaName(DEFAULT_DATASOURCE_NAME, "default"), "test"); assertNotNull(table); } @Test public void getSystemTable() { OpenSearchStorageEngine engine = new OpenSearchStorageEngine(client, settings); - Table table = engine.getTable(new CatalogSchemaName(DEFAULT_CATALOG_NAME, "default"), + Table table = engine.getTable(new CatalogSchemaName(DEFAULT_DATASOURCE_NAME, "default"), TABLE_INFO); assertNotNull(table); assertTrue(table instanceof OpenSearchSystemIndex); diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index e1d29a5775..bdabcb5a17 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -5,13 +5,27 @@ package org.opensearch.sql.plugin; +import static org.opensearch.sql.datasource.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import java.io.IOException; +import java.io.InputStream; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Objects; +import java.util.Set; import java.util.function.Supplier; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionResponse; import org.opensearch.action.ActionType; @@ -40,7 +54,10 @@ import org.opensearch.script.ScriptContext; import org.opensearch.script.ScriptEngine; import org.opensearch.script.ScriptService; -import org.opensearch.sql.catalog.CatalogService; +import org.opensearch.sql.datasource.DataSourceService; +import org.opensearch.sql.datasource.DataSourceServiceImpl; +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.executor.AsyncRestExecutor; import org.opensearch.sql.legacy.metrics.Metrics; @@ -50,12 +67,11 @@ import org.opensearch.sql.opensearch.security.SecurityAccess; import org.opensearch.sql.opensearch.setting.LegacyOpenDistroSettings; import org.opensearch.sql.opensearch.setting.OpenSearchSettings; -import org.opensearch.sql.opensearch.storage.OpenSearchStorageEngine; +import org.opensearch.sql.opensearch.storage.OpenSearchDataSourceFactory; import org.opensearch.sql.opensearch.storage.script.ExpressionScriptEngine; import org.opensearch.sql.opensearch.storage.serialization.DefaultExpressionSerializer; -import org.opensearch.sql.plugin.catalog.CatalogServiceImpl; -import org.opensearch.sql.plugin.catalog.CatalogSettings; import org.opensearch.sql.plugin.config.OpenSearchPluginConfig; +import org.opensearch.sql.plugin.datasource.DataSourceSettings; import org.opensearch.sql.plugin.rest.RestPPLQueryAction; import org.opensearch.sql.plugin.rest.RestPPLStatsAction; import org.opensearch.sql.plugin.rest.RestQuerySettingsAction; @@ -63,8 +79,9 @@ import org.opensearch.sql.plugin.transport.TransportPPLQueryAction; import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; import org.opensearch.sql.ppl.config.PPLServiceConfig; +import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory; import org.opensearch.sql.sql.config.SQLServiceConfig; -import org.opensearch.sql.storage.StorageEngine; +import org.opensearch.sql.storage.DataSourceFactory; import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.FixedExecutorBuilder; import org.opensearch.threadpool.ThreadPool; @@ -73,6 +90,8 @@ public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin, ReloadablePlugin { + private static final Logger LOG = LogManager.getLogger(); + private ClusterService clusterService; /** @@ -84,6 +103,8 @@ public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin, Rel private AnnotationConfigApplicationContext applicationContext; + private DataSourceService dataSourceService; + public String name() { return "sql"; } @@ -142,8 +163,15 @@ public Collection createComponents( this.clusterService = clusterService; this.pluginSettings = new OpenSearchSettings(clusterService.getClusterSettings()); this.client = (NodeClient) client; - CatalogServiceImpl.getInstance().loadConnectors(clusterService.getSettings()); - CatalogServiceImpl.getInstance().registerDefaultOpenSearchCatalog(openSearchStorageEngine()); + this.dataSourceService = + new DataSourceServiceImpl( + new ImmutableSet.Builder() + .add(new OpenSearchDataSourceFactory( + new OpenSearchNodeClient(this.client), pluginSettings)) + .add(new PrometheusStorageFactory()) + .build()); + dataSourceService.addDataSource(defaultOpenSearchDataSourceMetadata()); + loadDataSources(dataSourceService, clusterService.getSettings()); LocalClusterState.state().setClusterService(clusterService); LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings); @@ -155,7 +183,7 @@ public Collection createComponents( applicationContext.registerBean( org.opensearch.sql.common.setting.Settings.class, () -> pluginSettings); applicationContext.registerBean( - CatalogService.class, () -> CatalogServiceImpl.getInstance()); + DataSourceService.class, () -> dataSourceService); applicationContext.register(OpenSearchPluginConfig.class); applicationContext.register(PPLServiceConfig.class); applicationContext.register(SQLServiceConfig.class); @@ -184,7 +212,7 @@ public List> getSettings() { return new ImmutableList.Builder>() .addAll(LegacyOpenDistroSettings.legacySettings()) .addAll(OpenSearchSettings.pluginSettings()) - .add(CatalogSettings.CATALOG_CONFIG) + .add(DataSourceSettings.CATALOG_CONFIG) .build(); } @@ -195,12 +223,49 @@ public ScriptEngine getScriptEngine(Settings settings, Collection { + InputStream inputStream = DataSourceSettings.CATALOG_CONFIG.get(settings); + if (inputStream != null) { + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + try { + List metadataList = + objectMapper.readValue(inputStream, new TypeReference<>() {}); + verifyDuplicateName(metadataList); + metadataList.forEach(metadata -> dataSourceService.addDataSource(metadata)); + } catch (IOException e) { + LOG.error( + "DataSource Configuration File uploaded is malformed. Verify and re-upload.", e); + } catch (Throwable e) { + LOG.error("DataSource construction failed.", e); + } + } + return null; + }); } + static void verifyDuplicateName(List metadataList) { + Set seenNames = new HashSet<>(); + for (DataSourceMetadata metadata : metadataList) { + if (seenNames.contains(metadata.getName())) { + throw new IllegalArgumentException( + String.format( + Locale.ROOT, + "Datasource name should be unique, Duplicate datasource found %s", + metadata.getName())); + } + seenNames.add(metadata.getName()); + } + } } diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/catalog/CatalogServiceImpl.java b/plugin/src/main/java/org/opensearch/sql/plugin/catalog/CatalogServiceImpl.java deleted file mode 100644 index 3c6e0e5281..0000000000 --- a/plugin/src/main/java/org/opensearch/sql/plugin/catalog/CatalogServiceImpl.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.plugin.catalog; - -import static org.opensearch.sql.analysis.CatalogSchemaIdentifierNameResolver.DEFAULT_CATALOG_NAME; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; -import java.io.InputStream; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import org.apache.commons.lang3.StringUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.common.settings.Settings; -import org.opensearch.sql.catalog.CatalogService; -import org.opensearch.sql.catalog.model.Catalog; -import org.opensearch.sql.catalog.model.CatalogMetadata; -import org.opensearch.sql.catalog.model.ConnectorType; -import org.opensearch.sql.opensearch.security.SecurityAccess; -import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory; -import org.opensearch.sql.storage.StorageEngine; -import org.opensearch.sql.storage.StorageEngineFactory; - -/** - * This class manages catalogs and responsible for creating connectors to these catalogs. - */ -public class CatalogServiceImpl implements CatalogService { - - private static final CatalogServiceImpl INSTANCE = new CatalogServiceImpl(); - - private static final String CATALOG_NAME_REGEX = "[@*A-Za-z]+?[*a-zA-Z_\\-0-9]*"; - - private static final Logger LOG = LogManager.getLogger(); - - private Map catalogMap = new HashMap<>(); - - private final Map connectorTypeStorageEngineFactoryMap; - - public static CatalogServiceImpl getInstance() { - return INSTANCE; - } - - private CatalogServiceImpl() { - connectorTypeStorageEngineFactoryMap = new HashMap<>(); - PrometheusStorageFactory prometheusStorageFactory = new PrometheusStorageFactory(); - connectorTypeStorageEngineFactoryMap.put(prometheusStorageFactory.getConnectorType(), - prometheusStorageFactory); - } - - /** - * This function reads settings and loads connectors to the data stores. - * This will be invoked during start up and also when settings are updated. - * - * @param settings settings. - */ - public void loadConnectors(Settings settings) { - SecurityAccess.doPrivileged(() -> { - InputStream inputStream = CatalogSettings.CATALOG_CONFIG.get(settings); - if (inputStream != null) { - ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - try { - List catalogs = - objectMapper.readValue(inputStream, new TypeReference<>() { - }); - validateCatalogs(catalogs); - constructConnectors(catalogs); - } catch (IOException e) { - LOG.error("Catalog Configuration File uploaded is malformed. Verify and re-upload.", e); - } catch (Throwable e) { - LOG.error("Catalog construction failed.", e); - } - } - return null; - }); - } - - @Override - public Set getCatalogs() { - return new HashSet<>(catalogMap.values()); - } - - @Override - public Catalog getCatalog(String catalogName) { - if (!catalogMap.containsKey(catalogName)) { - throw new IllegalArgumentException( - String.format("Catalog with name %s doesn't exist.", catalogName)); - } - return catalogMap.get(catalogName); - } - - - @Override - public void registerDefaultOpenSearchCatalog(StorageEngine storageEngine) { - if (storageEngine == null) { - throw new IllegalArgumentException("Default storage engine can't be null"); - } - catalogMap.put(DEFAULT_CATALOG_NAME, - new Catalog(DEFAULT_CATALOG_NAME, ConnectorType.OPENSEARCH, storageEngine)); - } - - private StorageEngine createStorageEngine(CatalogMetadata catalog) { - ConnectorType connector = catalog.getConnector(); - switch (connector) { - case PROMETHEUS: - return connectorTypeStorageEngineFactoryMap - .get(catalog.getConnector()) - .getStorageEngine(catalog.getName(), catalog.getProperties()); - default: - throw new IllegalStateException( - String.format("Unsupported Connector: %s", connector.name())); - } - } - - private void constructConnectors(List catalogs) { - catalogMap = new HashMap<>(); - for (CatalogMetadata catalog : catalogs) { - try { - String catalogName = catalog.getName(); - StorageEngine storageEngine = createStorageEngine(catalog); - catalogMap.put(catalogName, - new Catalog(catalog.getName(), catalog.getConnector(), storageEngine)); - } catch (Throwable e) { - LOG.error("Catalog : {} storage engine creation failed with the following message: {}", - catalog.getName(), e.getMessage(), e); - } - } - } - - /** - * This can be moved to a different validator class - * when we introduce more connectors. - * - * @param catalogs catalogs. - */ - private void validateCatalogs(List catalogs) { - - Set reviewedCatalogs = new HashSet<>(); - for (CatalogMetadata catalog : catalogs) { - - if (StringUtils.isEmpty(catalog.getName())) { - throw new IllegalArgumentException( - "Missing Name Field from a catalog. Name is a required parameter."); - } - - if (!catalog.getName().matches(CATALOG_NAME_REGEX)) { - throw new IllegalArgumentException( - String.format("Catalog Name: %s contains illegal characters." - + " Allowed characters: a-zA-Z0-9_-*@ ", catalog.getName())); - } - - String catalogName = catalog.getName(); - if (reviewedCatalogs.contains(catalogName)) { - throw new IllegalArgumentException("Catalogs with same name are not allowed."); - } else { - reviewedCatalogs.add(catalogName); - } - - if (Objects.isNull(catalog.getProperties())) { - throw new IllegalArgumentException("Missing properties field in catalog configuration. " - + "Properties are required parameters"); - } - - } - } - - -} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginConfig.java b/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginConfig.java index 596296522c..ee64c554b6 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginConfig.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginConfig.java @@ -12,8 +12,8 @@ import org.opensearch.client.node.NodeClient; import org.opensearch.sql.analysis.Analyzer; import org.opensearch.sql.analysis.ExpressionAnalyzer; -import org.opensearch.sql.catalog.CatalogService; import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.executor.QueryManager; import org.opensearch.sql.executor.QueryService; @@ -56,7 +56,7 @@ public class OpenSearchPluginConfig { private Settings settings; @Autowired - private CatalogService catalogService; + private DataSourceService catalogService; @Bean @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) @@ -103,7 +103,7 @@ public QueryManager queryManager() { @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) public QueryPlanFactory queryExecutionFactory(BuiltinFunctionRepository functionRepository) { catalogService - .getCatalogs() + .getDataSources() .forEach( catalog -> catalog diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/catalog/CatalogSettings.java b/plugin/src/main/java/org/opensearch/sql/plugin/datasource/DataSourceSettings.java similarity index 82% rename from plugin/src/main/java/org/opensearch/sql/plugin/catalog/CatalogSettings.java rename to plugin/src/main/java/org/opensearch/sql/plugin/datasource/DataSourceSettings.java index 558e7558ca..a53984ad6b 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/catalog/CatalogSettings.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/datasource/DataSourceSettings.java @@ -3,13 +3,13 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.sql.plugin.catalog; +package org.opensearch.sql.plugin.datasource; import java.io.InputStream; import org.opensearch.common.settings.SecureSetting; import org.opensearch.common.settings.Setting; -public class CatalogSettings { +public class DataSourceSettings { public static final Setting CATALOG_CONFIG = SecureSetting.secureFile( "plugins.query.federation.datasources.config", diff --git a/plugin/src/test/java/org/opensearch/sql/plugin/catalog/CatalogServiceImplTest.java b/plugin/src/test/java/org/opensearch/sql/plugin/catalog/CatalogServiceImplTest.java deleted file mode 100644 index cdbce55cb1..0000000000 --- a/plugin/src/test/java/org/opensearch/sql/plugin/catalog/CatalogServiceImplTest.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.plugin.catalog; - -import static org.opensearch.sql.analysis.CatalogSchemaIdentifierNameResolver.DEFAULT_CATALOG_NAME; - -import java.io.IOException; -import java.net.URISyntaxException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.HashSet; -import java.util.Set; -import lombok.SneakyThrows; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; -import org.opensearch.common.settings.MockSecureSettings; -import org.opensearch.common.settings.Settings; -import org.opensearch.sql.catalog.model.Catalog; -import org.opensearch.sql.catalog.model.ConnectorType; -import org.opensearch.sql.storage.StorageEngine; - -@RunWith(MockitoJUnitRunner.class) -public class CatalogServiceImplTest { - - public static final String CATALOG_SETTING_METADATA_KEY = - "plugins.query.federation.datasources.config"; - - @Mock - private StorageEngine storageEngine; - - @SneakyThrows - @Test - public void testLoadConnectors() { - Settings settings = getCatalogSettings("catalogs.json"); - CatalogServiceImpl.getInstance().loadConnectors(settings); - Set expected = new HashSet<>() {{ - add(new Catalog("prometheus", ConnectorType.PROMETHEUS, storageEngine)); - }}; - Assert.assertEquals(expected, CatalogServiceImpl.getInstance().getCatalogs()); - } - - - @SneakyThrows - @Test - public void testLoadConnectorsWithMultipleCatalogs() { - Settings settings = getCatalogSettings("multiple_catalogs.json"); - CatalogServiceImpl.getInstance().loadConnectors(settings); - Set expected = new HashSet<>() {{ - add(new Catalog("prometheus", ConnectorType.PROMETHEUS, storageEngine)); - add(new Catalog("prometheus-1", ConnectorType.PROMETHEUS, storageEngine)); - }}; - Assert.assertEquals(expected, CatalogServiceImpl.getInstance().getCatalogs()); - } - - @SneakyThrows - @Test - public void testLoadConnectorsWithMissingName() { - Settings settings = getCatalogSettings("catalog_missing_name.json"); - Set expected = CatalogServiceImpl.getInstance().getCatalogs(); - CatalogServiceImpl.getInstance().loadConnectors(settings); - Assert.assertEquals(expected, CatalogServiceImpl.getInstance().getCatalogs()); - } - - @SneakyThrows - @Test - public void testLoadConnectorsWithDuplicateCatalogNames() { - Settings settings = getCatalogSettings("duplicate_catalog_names.json"); - Set expected = CatalogServiceImpl.getInstance().getCatalogs(); - CatalogServiceImpl.getInstance().loadConnectors(settings); - Assert.assertEquals(expected, CatalogServiceImpl.getInstance().getCatalogs()); - } - - @SneakyThrows - @Test - public void testLoadConnectorsWithMalformedJson() { - Settings settings = getCatalogSettings("malformed_catalogs.json"); - Set expected = CatalogServiceImpl.getInstance().getCatalogs(); - CatalogServiceImpl.getInstance().loadConnectors(settings); - Assert.assertEquals(expected, CatalogServiceImpl.getInstance().getCatalogs()); - } - - @SneakyThrows - @Test - public void testGetStorageEngineAfterGetCatalogs() { - Settings settings = getCatalogSettings("empty_catalog.json"); - CatalogServiceImpl.getInstance().loadConnectors(settings); - CatalogServiceImpl.getInstance().registerDefaultOpenSearchCatalog(storageEngine); - Set expected = new HashSet<>(); - expected.add(new Catalog(DEFAULT_CATALOG_NAME, ConnectorType.OPENSEARCH, storageEngine)); - Assert.assertEquals(expected, CatalogServiceImpl.getInstance().getCatalogs()); - Assert.assertEquals(storageEngine, - CatalogServiceImpl.getInstance().getCatalog(DEFAULT_CATALOG_NAME).getStorageEngine()); - Assert.assertEquals(expected, CatalogServiceImpl.getInstance().getCatalogs()); - Assert.assertEquals(storageEngine, - CatalogServiceImpl.getInstance().getCatalog(DEFAULT_CATALOG_NAME).getStorageEngine()); - IllegalArgumentException illegalArgumentException - = Assert.assertThrows(IllegalArgumentException.class, - () -> CatalogServiceImpl.getInstance().getCatalog("test")); - Assert.assertEquals("Catalog with name test doesn't exist.", - illegalArgumentException.getMessage()); - } - - - @SneakyThrows - @Test - public void testGetStorageEngineAfterLoadingConnectors() { - Settings settings = getCatalogSettings("empty_catalog.json"); - CatalogServiceImpl.getInstance().registerDefaultOpenSearchCatalog(storageEngine); - //Load Connectors will empty the catalogMap.So OpenSearch Storage Engine - CatalogServiceImpl.getInstance().loadConnectors(settings); - Set expected = new HashSet<>(); - Assert.assertEquals(expected, CatalogServiceImpl.getInstance().getCatalogs()); - } - - @SneakyThrows - @Test - public void testLoadConnectorsWithIllegalCatalogNames() { - Settings settings = getCatalogSettings("illegal_catalog_name.json"); - Set expected = CatalogServiceImpl.getInstance().getCatalogs(); - CatalogServiceImpl.getInstance().loadConnectors(settings); - Assert.assertEquals(expected, CatalogServiceImpl.getInstance().getCatalogs()); - } - - private Settings getCatalogSettings(String filename) throws URISyntaxException, IOException { - MockSecureSettings mockSecureSettings = new MockSecureSettings(); - ClassLoader classLoader = getClass().getClassLoader(); - Path filepath = Paths.get(classLoader.getResource(filename).toURI()); - mockSecureSettings.setFile(CATALOG_SETTING_METADATA_KEY, Files.readAllBytes(filepath)); - return Settings.builder().setSecureSettings(mockSecureSettings).build(); - } - -} diff --git a/plugin/src/test/java/org/opensearch/sql/plugin/datasource/CatalogServiceImplTest.java b/plugin/src/test/java/org/opensearch/sql/plugin/datasource/CatalogServiceImplTest.java new file mode 100644 index 0000000000..00445824de --- /dev/null +++ b/plugin/src/test/java/org/opensearch/sql/plugin/datasource/CatalogServiceImplTest.java @@ -0,0 +1,138 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.datasource; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import lombok.SneakyThrows; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.opensearch.common.settings.MockSecureSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.sql.datasource.DataSourceService; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.plugin.SQLPlugin; + +@RunWith(MockitoJUnitRunner.class) +public class CatalogServiceImplTest { + + public static final String CATALOG_SETTING_METADATA_KEY = + "plugins.query.federation.datasources.config"; + + @Mock + private DataSourceService dataSourceService; + + @SneakyThrows + @Test + public void testLoadConnectors() { + Settings settings = getCatalogSettings("catalogs.json"); + loadConnectors(settings); + List expected = + new ArrayList<>() { + { + add( + metadata( + "prometheus", + DataSourceType.PROMETHEUS, + ImmutableMap.of( + "prometheus.uri", "http://localhost:9090", + "prometheus.auth.type", "basicauth", + "prometheus.auth.username", "admin", + "prometheus.auth.password", "type"))); + } + }; + + verifyAddDataSourceWithMetadata(expected); + } + + @SneakyThrows + @Test + public void testLoadConnectorsWithMultipleCatalogs() { + Settings settings = getCatalogSettings("multiple_catalogs.json"); + loadConnectors(settings); + List expected = new ArrayList<>() {{ + add(metadata("prometheus", DataSourceType.PROMETHEUS, ImmutableMap.of( + "prometheus.uri", "http://localhost:9090", + "prometheus.auth.type", "basicauth", + "prometheus.auth.username", "admin", + "prometheus.auth.password", "type" + ))); + add(metadata("prometheus-1", DataSourceType.PROMETHEUS, ImmutableMap.of( + "prometheus.uri", "http://localhost:9090", + "prometheus.auth.type", "awssigv4", + "prometheus.auth.region", "us-east-1", + "prometheus.auth.access_key", "accessKey", + "prometheus.auth.secret_key", "secretKey" + ))); + }}; + + verifyAddDataSourceWithMetadata(expected); + } + + @SneakyThrows + @Test + public void testLoadConnectorsWithDuplicateCatalogNames() { + Settings settings = getCatalogSettings("duplicate_catalog_names.json"); + loadConnectors(settings); + + verify(dataSourceService, never()).addDataSource(any()); + } + + @SneakyThrows + @Test + public void testLoadConnectorsWithMalformedJson() { + Settings settings = getCatalogSettings("malformed_catalogs.json"); + loadConnectors(settings); + + verify(dataSourceService, never()).addDataSource(any()); + } + + private Settings getCatalogSettings(String filename) throws URISyntaxException, IOException { + MockSecureSettings mockSecureSettings = new MockSecureSettings(); + ClassLoader classLoader = getClass().getClassLoader(); + Path filepath = Paths.get(classLoader.getResource(filename).toURI()); + mockSecureSettings.setFile(CATALOG_SETTING_METADATA_KEY, Files.readAllBytes(filepath)); + return Settings.builder().setSecureSettings(mockSecureSettings).build(); + } + + void loadConnectors(Settings settings) { + SQLPlugin.loadDataSources(dataSourceService, settings); + } + + void verifyAddDataSourceWithMetadata(List metadataList) { + int expectCount = metadataList.size(); + ArgumentCaptor metadataCaptor = + ArgumentCaptor.forClass(DataSourceMetadata.class); + verify(dataSourceService, times(expectCount)).addDataSource(metadataCaptor.capture()); + List actualValues = metadataCaptor.getAllValues(); + assertEquals(metadataList, actualValues); + } + + DataSourceMetadata metadata(String name, DataSourceType type, Map properties) { + DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); + dataSourceMetadata.setName(name); + dataSourceMetadata.setConnector(type); + dataSourceMetadata.setProperties(properties); + return dataSourceMetadata; + } +} diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/PPLServiceTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/PPLServiceTest.java index ef8ec25df8..fca004196f 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/PPLServiceTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/PPLServiceTest.java @@ -16,7 +16,7 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import org.opensearch.sql.catalog.CatalogService; +import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.executor.DefaultQueryManager; import org.opensearch.sql.executor.ExecutionEngine; @@ -52,7 +52,7 @@ public class PPLServiceTest { private ExecutionEngine executionEngine; @Mock - private CatalogService catalogService; + private DataSourceService catalogService; @Mock private ExecutionEngine.Schema schema; @@ -66,7 +66,7 @@ public void setUp() { context.registerBean(QueryPlanFactory.class, () -> new QueryPlanFactory(queryService)); context.registerBean(StorageEngine.class, () -> storageEngine); context.registerBean(ExecutionEngine.class, () -> executionEngine); - context.registerBean(CatalogService.class, () -> catalogService); + context.registerBean(DataSourceService.class, () -> catalogService); context.register(PPLServiceConfig.class); context.refresh(); pplService = context.getBean(PPLService.class); diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngine.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngine.java index f8ae0936ee..42a118fadc 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngine.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngine.java @@ -7,7 +7,7 @@ package org.opensearch.sql.prometheus.storage; -import static org.opensearch.sql.analysis.CatalogSchemaIdentifierNameResolver.INFORMATION_SCHEMA_NAME; +import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.INFORMATION_SCHEMA_NAME; import static org.opensearch.sql.utils.SystemIndexUtils.isSystemIndex; import java.util.Collection; diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java index 41cbf3748f..0c73afa8f3 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java @@ -14,16 +14,18 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import okhttp3.OkHttpClient; -import org.opensearch.sql.catalog.model.ConnectorType; -import org.opensearch.sql.catalog.model.auth.AuthenticationType; +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.datasource.model.auth.AuthenticationType; import org.opensearch.sql.prometheus.authinterceptors.AwsSigningInterceptor; import org.opensearch.sql.prometheus.authinterceptors.BasicAuthenticationInterceptor; import org.opensearch.sql.prometheus.client.PrometheusClient; import org.opensearch.sql.prometheus.client.PrometheusClientImpl; import org.opensearch.sql.storage.StorageEngine; -import org.opensearch.sql.storage.StorageEngineFactory; +import org.opensearch.sql.storage.DataSourceFactory; -public class PrometheusStorageFactory implements StorageEngineFactory { +public class PrometheusStorageFactory implements DataSourceFactory { public static final String URI = "prometheus.uri"; public static final String AUTH_TYPE = "prometheus.auth.type"; @@ -33,14 +35,20 @@ public class PrometheusStorageFactory implements StorageEngineFactory { public static final String ACCESS_KEY = "prometheus.auth.access_key"; public static final String SECRET_KEY = "prometheus.auth.secret_key"; - @Override - public ConnectorType getConnectorType() { - return ConnectorType.PROMETHEUS; + public DataSourceType getDataSourceType() { + return DataSourceType.PROMETHEUS; } @Override - public StorageEngine getStorageEngine(String catalogName, Map requiredConfig) { + public DataSource createDataSource(DataSourceMetadata metadata) { + return new DataSource( + metadata.getName(), + DataSourceType.PROMETHEUS, + getStorageEngine(metadata.getName(), metadata.getProperties())); + } + + StorageEngine getStorageEngine(String catalogName, Map requiredConfig) { validateFieldsInConfig(requiredConfig, Set.of(URI)); PrometheusClient prometheusClient; try { diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactoryTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactoryTest.java index 1b54cde5d9..ff5d1ac977 100644 --- a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactoryTest.java +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactoryTest.java @@ -13,7 +13,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.sql.catalog.model.ConnectorType; +import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.storage.StorageEngine; @ExtendWith(MockitoExtension.class) @@ -22,7 +22,7 @@ public class PrometheusStorageFactoryTest { @Test void testGetConnectorType() { PrometheusStorageFactory prometheusStorageFactory = new PrometheusStorageFactory(); - Assertions.assertEquals(ConnectorType.PROMETHEUS, prometheusStorageFactory.getConnectorType()); + Assertions.assertEquals(DataSourceType.PROMETHEUS, prometheusStorageFactory.getDataSourceType()); } @Test