diff --git a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java index 7d0a452e1b..365c006f8c 100644 --- a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java +++ b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java @@ -11,18 +11,10 @@ import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC; import static org.opensearch.sql.ast.tree.Sort.SortOrder.DESC; import static org.opensearch.sql.data.type.ExprCoreType.STRUCT; -import static org.opensearch.sql.utils.MLCommonsConstants.ACTION; -import static org.opensearch.sql.utils.MLCommonsConstants.MODELID; -import static org.opensearch.sql.utils.MLCommonsConstants.PREDICT; import static org.opensearch.sql.utils.MLCommonsConstants.RCF_ANOMALOUS; import static org.opensearch.sql.utils.MLCommonsConstants.RCF_ANOMALY_GRADE; import static org.opensearch.sql.utils.MLCommonsConstants.RCF_SCORE; -import static org.opensearch.sql.utils.MLCommonsConstants.RCF_TIMESTAMP; -import static org.opensearch.sql.utils.MLCommonsConstants.STATUS; -import static org.opensearch.sql.utils.MLCommonsConstants.TASKID; import static org.opensearch.sql.utils.MLCommonsConstants.TIME_FIELD; -import static org.opensearch.sql.utils.MLCommonsConstants.TRAIN; -import static org.opensearch.sql.utils.MLCommonsConstants.TRAINANDPREDICT; import static org.opensearch.sql.utils.SystemIndexUtils.CATALOGS_TABLE_NAME; import com.google.common.collect.ImmutableList; @@ -69,8 +61,8 @@ import org.opensearch.sql.ast.tree.TableFunction; import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.ast.tree.Values; -import org.opensearch.sql.catalog.CatalogService; -import org.opensearch.sql.catalog.model.Catalog; +import org.opensearch.sql.catalog.DataSourceService; +import org.opensearch.sql.catalog.model.DataSource; import org.opensearch.sql.data.model.ExprMissingValue; import org.opensearch.sql.data.type.ExprCoreType; import org.opensearch.sql.exception.SemanticCheckException; @@ -117,7 +109,7 @@ public class Analyzer extends AbstractNodeVisitor private final NamedExpressionAnalyzer namedExpressionAnalyzer; - private final CatalogService catalogService; + private final DataSourceService catalogService; private final BuiltinFunctionRepository repository; @@ -126,7 +118,7 @@ public class Analyzer extends AbstractNodeVisitor */ public Analyzer( ExpressionAnalyzer expressionAnalyzer, - CatalogService catalogService, + DataSourceService catalogService, BuiltinFunctionRepository repository) { this.expressionAnalyzer = expressionAnalyzer; this.catalogService = catalogService; @@ -142,9 +134,9 @@ public LogicalPlan analyze(UnresolvedPlan unresolved, AnalysisContext context) { @Override public LogicalPlan visitRelation(Relation node, AnalysisContext context) { QualifiedName qualifiedName = node.getTableQualifiedName(); - Set allowedCatalogNames = catalogService.getCatalogs() + Set allowedCatalogNames = catalogService.getDataSources() .stream() - .map(Catalog::getName) + .map(DataSource::getName) .collect(Collectors.toSet()); CatalogSchemaIdentifierNameResolver catalogSchemaIdentifierNameResolver = new CatalogSchemaIdentifierNameResolver(qualifiedName.getParts(), allowedCatalogNames); @@ -156,7 +148,7 @@ public LogicalPlan visitRelation(Relation node, AnalysisContext context) { table = new CatalogTable(catalogService); } else { table = catalogService - .getCatalog(catalogSchemaIdentifierNameResolver.getCatalogName()) + .getDataSource(catalogSchemaIdentifierNameResolver.getCatalogName()) .getStorageEngine() .getTable(new CatalogSchemaName(catalogSchemaIdentifierNameResolver.getCatalogName(), catalogSchemaIdentifierNameResolver.getSchemaName()), @@ -188,9 +180,9 @@ public LogicalPlan visitRelationSubquery(RelationSubquery node, AnalysisContext @Override public LogicalPlan visitTableFunction(TableFunction node, AnalysisContext context) { QualifiedName qualifiedName = node.getFunctionName(); - Set allowedCatalogNames = catalogService.getCatalogs() + Set allowedCatalogNames = catalogService.getDataSources() .stream() - .map(Catalog::getName) + .map(DataSource::getName) .collect(Collectors.toSet()); CatalogSchemaIdentifierNameResolver catalogSchemaIdentifierNameResolver = new CatalogSchemaIdentifierNameResolver(qualifiedName.getParts(), allowedCatalogNames); diff --git a/core/src/main/java/org/opensearch/sql/catalog/CatalogService.java b/core/src/main/java/org/opensearch/sql/catalog/CatalogService.java deleted file mode 100644 index 4c40920c7b..0000000000 --- a/core/src/main/java/org/opensearch/sql/catalog/CatalogService.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.catalog; - -import java.util.Set; -import org.opensearch.sql.catalog.model.Catalog; -import org.opensearch.sql.storage.StorageEngine; - -/** - * Catalog Service manages catalogs. - */ -public interface CatalogService { - - /** - * Returns all catalog objects. - * - * @return Catalog Catalogs. - */ - Set getCatalogs(); - - /** - * Returns Catalog with corresponding to the catalog name. - * - * @param catalogName Name of the catalog. - * @return Catalog catalog. - */ - Catalog getCatalog(String catalogName); - - /** - * Default opensearch engine is not defined in catalog.json. - * So the registration of default catalog happens separately. - * - * @param storageEngine StorageEngine. - */ - void registerDefaultOpenSearchCatalog(StorageEngine storageEngine); - -} diff --git a/core/src/main/java/org/opensearch/sql/catalog/DataSourceService.java b/core/src/main/java/org/opensearch/sql/catalog/DataSourceService.java new file mode 100644 index 0000000000..75c1bc1793 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/catalog/DataSourceService.java @@ -0,0 +1,43 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.catalog; + +import java.util.Set; +import org.opensearch.sql.catalog.model.DataSource; +import org.opensearch.sql.catalog.model.DataSourceMetadata; + +/** + * DataSource Service manage {@link DataSource}. + */ +public interface DataSourceService { + + /** + * Returns all DataSource objects. + * + * @return set of {@link DataSource}. + */ + Set getDataSources(); + + /** + * Returns {@link DataSource} with corresponding to the DataSource name. + * + * @param dataSourceName Name of the {@link DataSource}. + * @return {@link DataSource}. + */ + DataSource getDataSource(String dataSourceName); + + /** + * Register {@link DataSource} defined by {@link DataSourceMetadata}. + * + * @param dataSourceMetadata {@link DataSourceMetadata}. + */ + void addDataSource(DataSourceMetadata dataSourceMetadata); + + /** + * remove all the registered {@link DataSource}. + */ + void clear(); +} diff --git a/core/src/main/java/org/opensearch/sql/catalog/DataSourceServiceImpl.java b/core/src/main/java/org/opensearch/sql/catalog/DataSourceServiceImpl.java new file mode 100644 index 0000000000..6a900d09f7 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/catalog/DataSourceServiceImpl.java @@ -0,0 +1,93 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.catalog; + +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.sql.catalog.model.DataSource; +import org.opensearch.sql.catalog.model.DataSourceMetadata; +import org.opensearch.sql.catalog.model.DataSourceType; +import org.opensearch.sql.storage.DataSourceFactory; + +/** + * This class manages catalogs and responsible for creating connectors to these catalogs. + */ +public class DataSourceServiceImpl implements DataSourceService { + + private static String DATASOURCE_NAME_REGEX = "[@*A-Za-z]+?[*a-zA-Z_\\-0-9]*"; + + private static final Logger LOG = LogManager.getLogger(); + + private final ConcurrentHashMap dataSourceMap; + + private final Map dataSourceFactoryMap; + + /** + * Constructor. + */ + public DataSourceServiceImpl(Set dataSourceFactories) { + dataSourceFactoryMap = + dataSourceFactories.stream() + .collect(Collectors.toMap(DataSourceFactory::getDataSourceType, f -> f)); + dataSourceMap = new ConcurrentHashMap<>(); + } + + @Override + public Set getDataSources() { + return new HashSet<>(dataSourceMap.values()); + } + + @Override + public DataSource getDataSource(String dataSourceName) { + if (!dataSourceMap.containsKey(dataSourceName)) { + throw new IllegalArgumentException( + String.format("DataSource with name %s doesn't exist.", dataSourceName)); + } + return dataSourceMap.get(dataSourceName); + } + + @Override + public void addDataSource(DataSourceMetadata metadata) { + validateDataSource(metadata); + dataSourceMap.put( + metadata.getName(), + dataSourceFactoryMap.get(metadata.getConnector()).createDataSource(metadata)); + } + + @Override + public void clear() { + dataSourceMap.clear(); + } + + /** + * This can be moved to a different validator class when we introduce more connectors. + * + * @param metadata {@link DataSourceMetadata}. + */ + private void validateDataSource(DataSourceMetadata metadata) { + if (StringUtils.isEmpty(metadata.getName())) { + throw new IllegalArgumentException( + "Missing Name Field from a DataSource. Name is a required parameter."); + } + if (!metadata.getName().matches(DATASOURCE_NAME_REGEX)) { + throw new IllegalArgumentException( + String.format( + "DataSource Name: %s contains illegal characters. Allowed characters: a-zA-Z0-9_-*@.", + metadata.getName())); + } + if (Objects.isNull(metadata.getProperties())) { + throw new IllegalArgumentException( + "Missing properties field in catalog configuration. Properties are required parameters."); + } + } +} diff --git a/core/src/main/java/org/opensearch/sql/catalog/model/CatalogMetadata.java b/core/src/main/java/org/opensearch/sql/catalog/model/CatalogMetadata.java deleted file mode 100644 index a859090a5d..0000000000 --- a/core/src/main/java/org/opensearch/sql/catalog/model/CatalogMetadata.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.catalog.model; - -import com.fasterxml.jackson.annotation.JsonFormat; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonProperty; -import java.util.Map; -import lombok.Getter; -import lombok.Setter; - -@JsonIgnoreProperties(ignoreUnknown = true) -@Getter -@Setter -public class CatalogMetadata { - - @JsonProperty(required = true) - private String name; - - @JsonProperty(required = true) - @JsonFormat(with = JsonFormat.Feature.ACCEPT_CASE_INSENSITIVE_PROPERTIES) - private ConnectorType connector; - - @JsonProperty(required = true) - private Map properties; - -} diff --git a/core/src/main/java/org/opensearch/sql/catalog/model/Catalog.java b/core/src/main/java/org/opensearch/sql/catalog/model/DataSource.java similarity index 86% rename from core/src/main/java/org/opensearch/sql/catalog/model/Catalog.java rename to core/src/main/java/org/opensearch/sql/catalog/model/DataSource.java index 5b7eaca523..1256e556fe 100644 --- a/core/src/main/java/org/opensearch/sql/catalog/model/Catalog.java +++ b/core/src/main/java/org/opensearch/sql/catalog/model/DataSource.java @@ -15,11 +15,11 @@ @Getter @RequiredArgsConstructor @EqualsAndHashCode -public class Catalog { +public class DataSource { private final String name; - private final ConnectorType connectorType; + private final DataSourceType connectorType; @EqualsAndHashCode.Exclude private final StorageEngine storageEngine; diff --git a/core/src/main/java/org/opensearch/sql/catalog/model/DataSourceMetadata.java b/core/src/main/java/org/opensearch/sql/catalog/model/DataSourceMetadata.java new file mode 100644 index 0000000000..53c64b38d0 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/catalog/model/DataSourceMetadata.java @@ -0,0 +1,47 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.catalog.model; + +import static org.opensearch.sql.analysis.CatalogSchemaIdentifierNameResolver.DEFAULT_CATALOG_NAME; + +import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import org.opensearch.sql.catalog.DataSourceService; + +@JsonIgnoreProperties(ignoreUnknown = true) +@Getter +@Setter +@EqualsAndHashCode +public class DataSourceMetadata { + + @JsonProperty(required = true) + private String name; + + @JsonProperty(required = true) + @JsonFormat(with = JsonFormat.Feature.ACCEPT_CASE_INSENSITIVE_PROPERTIES) + private DataSourceType connector; + + @JsonProperty(required = true) + private Map properties; + + /** + * Default OpenSearch {@link DataSourceMetadata}. Which is used to register default OpenSearch + * {@link DataSource} to {@link DataSourceService}. + */ + public static DataSourceMetadata defaultOpenSearchDataSourceMetadata() { + DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); + dataSourceMetadata.setName(DEFAULT_CATALOG_NAME); + dataSourceMetadata.setConnector(DataSourceType.OPENSEARCH); + dataSourceMetadata.setProperties(ImmutableMap.of()); + return dataSourceMetadata; + } +} diff --git a/core/src/main/java/org/opensearch/sql/catalog/model/ConnectorType.java b/core/src/main/java/org/opensearch/sql/catalog/model/DataSourceType.java similarity index 69% rename from core/src/main/java/org/opensearch/sql/catalog/model/ConnectorType.java rename to core/src/main/java/org/opensearch/sql/catalog/model/DataSourceType.java index b84c68adbf..f8081ab30d 100644 --- a/core/src/main/java/org/opensearch/sql/catalog/model/ConnectorType.java +++ b/core/src/main/java/org/opensearch/sql/catalog/model/DataSourceType.java @@ -5,6 +5,9 @@ package org.opensearch.sql.catalog.model; -public enum ConnectorType { +/** + * Supported DataSource Type. + */ +public enum DataSourceType { PROMETHEUS,OPENSEARCH } diff --git a/core/src/main/java/org/opensearch/sql/catalog/model/auth/AuthenticationType.java b/core/src/main/java/org/opensearch/sql/catalog/model/auth/AuthenticationType.java index 1157d8e497..e746dc08e7 100644 --- a/core/src/main/java/org/opensearch/sql/catalog/model/auth/AuthenticationType.java +++ b/core/src/main/java/org/opensearch/sql/catalog/model/auth/AuthenticationType.java @@ -10,7 +10,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; public enum AuthenticationType { diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/catalog/CatalogTable.java b/core/src/main/java/org/opensearch/sql/planner/physical/catalog/CatalogTable.java index 4e6a87e21b..8d36f20bb4 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/catalog/CatalogTable.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/catalog/CatalogTable.java @@ -11,7 +11,7 @@ import java.util.Map; import lombok.EqualsAndHashCode; import lombok.RequiredArgsConstructor; -import org.opensearch.sql.catalog.CatalogService; +import org.opensearch.sql.catalog.DataSourceService; import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.planner.DefaultImplementor; import org.opensearch.sql.planner.logical.LogicalPlan; @@ -30,7 +30,7 @@ @EqualsAndHashCode public class CatalogTable implements Table { - private final CatalogService catalogService; + private final DataSourceService catalogService; @Override public Map getFieldTypes() { @@ -47,7 +47,7 @@ public PhysicalPlan implement(LogicalPlan plan) { public static class CatalogTableDefaultImplementor extends DefaultImplementor { - private final CatalogService catalogService; + private final DataSourceService catalogService; @Override public PhysicalPlan visitRelation(LogicalRelation node, Object context) { diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/catalog/CatalogTableScan.java b/core/src/main/java/org/opensearch/sql/planner/physical/catalog/CatalogTableScan.java index efc59c97ec..cf9be901ac 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/catalog/CatalogTableScan.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/catalog/CatalogTableScan.java @@ -14,8 +14,8 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Set; -import org.opensearch.sql.catalog.CatalogService; -import org.opensearch.sql.catalog.model.Catalog; +import org.opensearch.sql.catalog.DataSourceService; +import org.opensearch.sql.catalog.model.DataSource; import org.opensearch.sql.data.model.ExprTupleValue; import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.data.model.ExprValueUtils; @@ -30,11 +30,11 @@ */ public class CatalogTableScan extends TableScanOperator { - private final CatalogService catalogService; + private final DataSourceService catalogService; private Iterator iterator; - public CatalogTableScan(CatalogService catalogService) { + public CatalogTableScan(DataSourceService catalogService) { this.catalogService = catalogService; this.iterator = Collections.emptyIterator(); } @@ -47,8 +47,8 @@ public String explain() { @Override public void open() { List exprValues = new ArrayList<>(); - Set catalogs = catalogService.getCatalogs(); - for (Catalog catalog : catalogs) { + Set catalogs = catalogService.getDataSources(); + for (DataSource catalog : catalogs) { exprValues.add( new ExprTupleValue(new LinkedHashMap<>(ImmutableMap.of( "DATASOURCE_NAME", ExprValueUtils.stringValue(catalog.getName()), diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/datasource/DataSourceTable.java b/core/src/main/java/org/opensearch/sql/planner/physical/datasource/DataSourceTable.java new file mode 100644 index 0000000000..9553264935 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/physical/datasource/DataSourceTable.java @@ -0,0 +1,58 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.planner.physical.datasource; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Map; +import lombok.EqualsAndHashCode; +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.catalog.DataSourceService; +import org.opensearch.sql.planner.DefaultImplementor; +import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.planner.logical.LogicalRelation; +import org.opensearch.sql.planner.physical.PhysicalPlan; +import org.opensearch.sql.storage.Table; + + +/** + * Table implementation to handle show datasources command. + * Since catalog information is not tied to any storage engine, this info + * is handled via Catalog Table. + * + */ +@RequiredArgsConstructor +@EqualsAndHashCode +public class DataSourceTable implements Table { + + private final DataSourceService catalogService; + + @Override + public Map getFieldTypes() { + return DataSourceTableSchema.DATA_SOURCE_TABLE_SCHEMA.getMapping(); + } + + @Override + public PhysicalPlan implement(LogicalPlan plan) { + return plan.accept(new CatalogTableDefaultImplementor(catalogService), null); + } + + @VisibleForTesting + @RequiredArgsConstructor + public static class CatalogTableDefaultImplementor + extends DefaultImplementor { + + private final DataSourceService catalogService; + + @Override + public PhysicalPlan visitRelation(LogicalRelation node, Object context) { + return new DataSourceTableScan(catalogService); + } + } + +} diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScan.java b/core/src/main/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScan.java new file mode 100644 index 0000000000..5f78cc6b7e --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScan.java @@ -0,0 +1,70 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.planner.physical.datasource; + +import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Set; +import org.opensearch.sql.data.model.ExprTupleValue; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.model.ExprValueUtils; +import org.opensearch.sql.catalog.DataSourceService; +import org.opensearch.sql.catalog.model.DataSource; +import org.opensearch.sql.storage.TableScanOperator; + +/** + * This class handles table scan of catalog table. + * Right now these are derived from catalogService thorough static fields. + * In future this might scan data from underlying datastore if we start + * persisting catalog info somewhere. + * + */ +public class DataSourceTableScan extends TableScanOperator { + + private final DataSourceService catalogService; + + private Iterator iterator; + + public DataSourceTableScan(DataSourceService catalogService) { + this.catalogService = catalogService; + this.iterator = Collections.emptyIterator(); + } + + @Override + public String explain() { + return "GetCatalogRequestRequest{}"; + } + + @Override + public void open() { + List exprValues = new ArrayList<>(); + Set catalogs = catalogService.getDataSources(); + for (DataSource catalog : catalogs) { + exprValues.add( + new ExprTupleValue(new LinkedHashMap<>(ImmutableMap.of( + "DATASOURCE_NAME", ExprValueUtils.stringValue(catalog.getName()), + "CONNECTOR_TYPE", ExprValueUtils.stringValue(catalog.getConnectorType().name()))))); + } + iterator = exprValues.iterator(); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public ExprValue next() { + return iterator.next(); + } + +} diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableSchema.java b/core/src/main/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableSchema.java new file mode 100644 index 0000000000..47cb136439 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableSchema.java @@ -0,0 +1,31 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.physical.datasource; + +import static org.opensearch.sql.data.type.ExprCoreType.STRING; + +import java.util.LinkedHashMap; +import java.util.Map; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.data.type.ExprType; + +/** + * Definition of the system table schema. + */ +@Getter +@RequiredArgsConstructor +public enum DataSourceTableSchema { + + DATA_SOURCE_TABLE_SCHEMA(new LinkedHashMap<>() { + { + put("DATASOURCE_NAME", STRING); + put("CONNECTOR_TYPE", STRING); + } + } + ); + private final Map mapping; +} diff --git a/core/src/main/java/org/opensearch/sql/storage/DataSourceFactory.java b/core/src/main/java/org/opensearch/sql/storage/DataSourceFactory.java new file mode 100644 index 0000000000..3cb63b54e9 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/storage/DataSourceFactory.java @@ -0,0 +1,28 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.storage; + +import org.opensearch.sql.catalog.model.DataSource; +import org.opensearch.sql.catalog.model.DataSourceMetadata; +import org.opensearch.sql.catalog.model.DataSourceType; + +/** + * Each builtin {@link DataSourceType} mapping to DataSourceFactory implementation. + * DataSourceFactory is used to create {@link DataSource} based on configuration. + */ +public interface DataSourceFactory { + /** + * Get {@link DataSourceType}. + */ + DataSourceType getDataSourceType(); + + /** + * Create {@link DataSource}. + */ + DataSource createDataSource(DataSourceMetadata metadata); +} diff --git a/core/src/main/java/org/opensearch/sql/storage/StorageEngineFactory.java b/core/src/main/java/org/opensearch/sql/storage/StorageEngineFactory.java deleted file mode 100644 index 4cc27f6fa0..0000000000 --- a/core/src/main/java/org/opensearch/sql/storage/StorageEngineFactory.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * - * * Copyright OpenSearch Contributors - * * SPDX-License-Identifier: Apache-2.0 - * - */ - -package org.opensearch.sql.storage; - -import java.util.Map; -import org.opensearch.sql.catalog.model.ConnectorType; - -public interface StorageEngineFactory { - - ConnectorType getConnectorType(); - - StorageEngine getStorageEngine(String catalogName, Map requiredConfig); - -} diff --git a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java index 97c560d505..0492e05775 100644 --- a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java +++ b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java @@ -88,7 +88,7 @@ import org.opensearch.sql.planner.logical.LogicalPlanDSL; import org.opensearch.sql.planner.logical.LogicalProject; import org.opensearch.sql.planner.logical.LogicalRelation; -import org.opensearch.sql.planner.physical.catalog.CatalogTable; +import org.opensearch.sql.planner.physical.datasource.DataSourceTable; import org.springframework.context.annotation.Configuration; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ContextConfiguration; @@ -1078,7 +1078,7 @@ public void table_function_with_wrong_table_function() { @Test public void show_catalogs() { - assertAnalyzeEqual(new LogicalRelation(".CATALOGS", new CatalogTable(catalogService)), + assertAnalyzeEqual(new LogicalRelation(".CATALOGS", new DataSourceTable(catalogService)), AstDSL.relation(qualifiedName(".CATALOGS"))); } diff --git a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java index 447802c963..d5deee3374 100644 --- a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java +++ b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java @@ -15,16 +15,16 @@ import java.util.Map; import java.util.Set; import org.apache.commons.lang3.tuple.Pair; -import org.opensearch.sql.CatalogSchemaName; import org.opensearch.sql.analysis.symbol.Namespace; import org.opensearch.sql.analysis.symbol.Symbol; import org.opensearch.sql.analysis.symbol.SymbolTable; import org.opensearch.sql.ast.tree.UnresolvedPlan; -import org.opensearch.sql.catalog.CatalogService; -import org.opensearch.sql.catalog.model.Catalog; -import org.opensearch.sql.catalog.model.ConnectorType; import org.opensearch.sql.config.TestConfig; import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.catalog.DataSourceService; +import org.opensearch.sql.catalog.model.DataSource; +import org.opensearch.sql.catalog.model.DataSourceMetadata; +import org.opensearch.sql.catalog.model.DataSourceType; import org.opensearch.sql.exception.ExpressionEvaluationException; import org.opensearch.sql.expression.DSL; import org.opensearch.sql.expression.Expression; @@ -96,8 +96,8 @@ public PhysicalPlan implement(LogicalPlan plan) { } @Bean - protected CatalogService catalogService() { - return new DefaultCatalogService(); + protected DataSourceService catalogService() { + return new DefaultDataSourceService(); } @@ -143,16 +143,17 @@ protected Environment typeEnv() { protected Table table; @Autowired - protected CatalogService catalogService; + protected DataSourceService catalogService; @Autowired protected Environment typeEnv; @Bean - protected Analyzer analyzer(ExpressionAnalyzer expressionAnalyzer, CatalogService catalogService, - StorageEngine storageEngine, BuiltinFunctionRepository functionRepository, - Table table) { - catalogService.registerDefaultOpenSearchCatalog(storageEngine); + protected Analyzer analyzer( + ExpressionAnalyzer expressionAnalyzer, + DataSourceService dataSourceService, + BuiltinFunctionRepository functionRepository, + Table table) { functionRepository.register("prometheus", new FunctionResolver() { @Override @@ -170,7 +171,7 @@ public FunctionName getFunctionName() { return FunctionName.of("query_range"); } }); - return new Analyzer(expressionAnalyzer, catalogService, functionRepository); + return new Analyzer(expressionAnalyzer, dataSourceService, functionRepository); } @Bean @@ -196,26 +197,31 @@ protected LogicalPlan analyze(UnresolvedPlan unresolvedPlan) { return analyzer.analyze(unresolvedPlan, analysisContext); } - private class DefaultCatalogService implements CatalogService { + private class DefaultDataSourceService implements DataSourceService { private StorageEngine storageEngine = storageEngine(); - private final Catalog catalog - = new Catalog("prometheus", ConnectorType.PROMETHEUS, storageEngine); + private final DataSource catalog + = new DataSource("prometheus", DataSourceType.PROMETHEUS, storageEngine); @Override - public Set getCatalogs() { + public Set getDataSources() { return ImmutableSet.of(catalog); } @Override - public Catalog getCatalog(String catalogName) { + public DataSource getDataSource(String dataSourceName) { return catalog; } @Override - public void registerDefaultOpenSearchCatalog(StorageEngine storageEngine) { - this.storageEngine = storageEngine; + public void addDataSource(DataSourceMetadata metadata) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + throw new UnsupportedOperationException(); } } diff --git a/core/src/test/java/org/opensearch/sql/catalog/DataSourceServiceImplTest.java b/core/src/test/java/org/opensearch/sql/catalog/DataSourceServiceImplTest.java new file mode 100644 index 0000000000..2e4cc85a9c --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/catalog/DataSourceServiceImplTest.java @@ -0,0 +1,138 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.catalog; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.when; +import static org.opensearch.sql.analysis.CatalogSchemaIdentifierNameResolver.DEFAULT_CATALOG_NAME; + +import com.google.common.collect.ImmutableMap; +import java.util.HashSet; +import java.util.Map; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.catalog.model.DataSource; +import org.opensearch.sql.catalog.model.DataSourceMetadata; +import org.opensearch.sql.catalog.model.DataSourceType; +import org.opensearch.sql.storage.DataSourceFactory; +import org.opensearch.sql.storage.StorageEngine; + +@ExtendWith(MockitoExtension.class) +class DataSourceServiceImplTest { + + static final String NAME = "opensearch"; + + @Mock private DataSourceFactory dataSourceFactory; + + @Mock private StorageEngine storageEngine; + + private DataSourceService dataSourceService; + + @BeforeEach + public void setup() { + lenient() + .doAnswer( + invocation -> { + DataSourceMetadata metadata = invocation.getArgument(0); + return new DataSource(metadata.getName(), metadata.getConnector(), storageEngine); + }) + .when(dataSourceFactory) + .createDataSource(any()); + when(dataSourceFactory.getDataSourceType()).thenReturn(DataSourceType.OPENSEARCH); + dataSourceService = + new DataSourceServiceImpl( + new HashSet<>() { + { + add(dataSourceFactory); + } + }); + } + + @Test + void getDataSourceSuccess() { + dataSourceService.addDataSource(DataSourceMetadata.defaultOpenSearchDataSourceMetadata()); + + assertEquals( + new DataSource(DEFAULT_CATALOG_NAME, DataSourceType.OPENSEARCH, storageEngine), + dataSourceService.getDataSource(DEFAULT_CATALOG_NAME)); + } + + @Test + void getNotExistDataSourceShouldFail() { + IllegalArgumentException exception = + assertThrows(IllegalArgumentException.class, () -> dataSourceService.getDataSource("mock")); + assertEquals("DataSource with name mock doesn't exist.", exception.getMessage()); + } + + @Test + void getAddDataSourcesShouldSuccess() { + assertEquals(0, dataSourceService.getDataSources().size()); + + dataSourceService.addDataSource(metadata(NAME, DataSourceType.OPENSEARCH, ImmutableMap.of())); + assertEquals(1, dataSourceService.getDataSources().size()); + } + + @Test + void noDataSourceExistAfterClear() { + dataSourceService.addDataSource(metadata(NAME, DataSourceType.OPENSEARCH, ImmutableMap.of())); + assertEquals(1, dataSourceService.getDataSources().size()); + + dataSourceService.clear(); + assertEquals(0, dataSourceService.getDataSources().size()); + } + + @Test + void metaDataMissingNameShouldFail() { + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> + dataSourceService.addDataSource( + metadata(null, DataSourceType.OPENSEARCH, ImmutableMap.of()))); + assertEquals( + "Missing Name Field from a DataSource. Name is a required parameter.", + exception.getMessage()); + } + + @Test + void metaDataHasIllegalDataSourceNameShouldFail() { + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> + dataSourceService.addDataSource( + metadata("prometheus.test", DataSourceType.OPENSEARCH, ImmutableMap.of()))); + assertEquals( + "DataSource Name: prometheus.test contains illegal characters. " + + "Allowed characters: a-zA-Z0-9_-*@.", + exception.getMessage()); + } + + @Test + void metaDataMissingPropertiesShouldFail() { + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> dataSourceService.addDataSource(metadata(NAME, DataSourceType.OPENSEARCH, null))); + assertEquals( + "Missing properties field in catalog configuration. Properties are required parameters.", + exception.getMessage()); + } + + DataSourceMetadata metadata(String name, DataSourceType type, Map properties) { + DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); + dataSourceMetadata.setName(name); + dataSourceMetadata.setConnector(type); + dataSourceMetadata.setProperties(properties); + return dataSourceMetadata; + } +} diff --git a/core/src/test/java/org/opensearch/sql/catalog/model/auth/AuthenticationTypeTest.java b/core/src/test/java/org/opensearch/sql/catalog/model/auth/AuthenticationTypeTest.java new file mode 100644 index 0000000000..3fdf778b1a --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/catalog/model/auth/AuthenticationTypeTest.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.catalog.model.auth; + + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import org.junit.jupiter.api.Test; + +class AuthenticationTypeTest { + @Test + void getAuthType() { + assertEquals( + AuthenticationType.BASICAUTH, + AuthenticationType.get(AuthenticationType.BASICAUTH.getName())); + assertEquals( + AuthenticationType.AWSSIGV4AUTH, + AuthenticationType.get(AuthenticationType.AWSSIGV4AUTH.getName())); + } + + @Test + void getNotExistAuthType() { + assertNull(AuthenticationType.get("mock")); + } +} diff --git a/core/src/test/java/org/opensearch/sql/planner/physical/catalog/CatalogTableScanTest.java b/core/src/test/java/org/opensearch/sql/planner/physical/catalog/CatalogTableScanTest.java index cf9b5fe016..f01f3025f3 100644 --- a/core/src/test/java/org/opensearch/sql/planner/physical/catalog/CatalogTableScanTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/physical/catalog/CatalogTableScanTest.java @@ -21,9 +21,9 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.sql.catalog.CatalogService; -import org.opensearch.sql.catalog.model.Catalog; -import org.opensearch.sql.catalog.model.ConnectorType; +import org.opensearch.sql.catalog.DataSourceService; +import org.opensearch.sql.catalog.model.DataSource; +import org.opensearch.sql.catalog.model.DataSourceType; import org.opensearch.sql.data.model.ExprTupleValue; import org.opensearch.sql.data.model.ExprValueUtils; import org.opensearch.sql.storage.StorageEngine; @@ -32,7 +32,7 @@ public class CatalogTableScanTest { @Mock - private CatalogService catalogService; + private DataSourceService catalogService; @Mock private StorageEngine storageEngine; @@ -51,15 +51,15 @@ void testExplain() { @Test void testIterator() { - Set catalogSet = new HashSet<>(); - catalogSet.add(new Catalog("prometheus", ConnectorType.PROMETHEUS, storageEngine)); - catalogSet.add(new Catalog("opensearch", ConnectorType.OPENSEARCH, storageEngine)); - when(catalogService.getCatalogs()).thenReturn(catalogSet); + Set catalogSet = new HashSet<>(); + catalogSet.add(new DataSource("prometheus", DataSourceType.PROMETHEUS, storageEngine)); + catalogSet.add(new DataSource("opensearch", DataSourceType.OPENSEARCH, storageEngine)); + when(catalogService.getDataSources()).thenReturn(catalogSet); assertFalse(catalogTableScan.hasNext()); catalogTableScan.open(); assertTrue(catalogTableScan.hasNext()); - for (Catalog catalog : catalogSet) { + for (DataSource catalog : catalogSet) { assertEquals(new ExprTupleValue(new LinkedHashMap<>(ImmutableMap.of( "DATASOURCE_NAME", ExprValueUtils.stringValue(catalog.getName()), "CONNECTOR_TYPE", ExprValueUtils.stringValue(catalog.getConnectorType().name())))), diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java index bcd0c0ffb8..73620dd594 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java @@ -6,9 +6,11 @@ package org.opensearch.sql.ppl; +import static org.opensearch.sql.catalog.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata; import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import java.io.IOException; import java.util.Collections; import java.util.List; @@ -20,7 +22,8 @@ import org.opensearch.client.RestHighLevelClient; import org.opensearch.sql.analysis.Analyzer; import org.opensearch.sql.analysis.ExpressionAnalyzer; -import org.opensearch.sql.catalog.CatalogService; +import org.opensearch.sql.catalog.DataSourceService; +import org.opensearch.sql.catalog.DataSourceServiceImpl; import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.executor.DefaultQueryManager; @@ -37,14 +40,14 @@ import org.opensearch.sql.opensearch.client.OpenSearchRestClient; import org.opensearch.sql.opensearch.executor.OpenSearchExecutionEngine; import org.opensearch.sql.opensearch.executor.protector.OpenSearchExecutionProtector; -import org.opensearch.sql.opensearch.storage.OpenSearchStorageEngine; +import org.opensearch.sql.opensearch.storage.OpenSearchDataSourceFactory; import org.opensearch.sql.planner.Planner; import org.opensearch.sql.planner.optimizer.LogicalPlanOptimizer; -import org.opensearch.sql.plugin.catalog.CatalogServiceImpl; import org.opensearch.sql.ppl.config.PPLServiceConfig; import org.opensearch.sql.ppl.domain.PPLQueryRequest; import org.opensearch.sql.protocol.response.QueryResult; import org.opensearch.sql.protocol.response.format.SimpleJsonResponseFormatter; +import org.opensearch.sql.storage.DataSourceFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.AnnotationConfigApplicationContext; @@ -75,9 +78,12 @@ public void init() { new OpenSearchExecutionProtector(new AlwaysHealthyMonitor()))); context.registerBean(OpenSearchClient.class, () -> client); context.registerBean(Settings.class, () -> defaultSettings()); - OpenSearchStorageEngine openSearchStorageEngine = new OpenSearchStorageEngine(client, defaultSettings()); - CatalogServiceImpl.getInstance().registerDefaultOpenSearchCatalog(openSearchStorageEngine); - context.registerBean(CatalogService.class, CatalogServiceImpl::getInstance); + DataSourceService dataSourceService = new DataSourceServiceImpl( + new ImmutableSet.Builder() + .add(new OpenSearchDataSourceFactory(client, defaultSettings())) + .build()); + dataSourceService.addDataSource(defaultOpenSearchDataSourceMetadata()); + context.registerBean(DataSourceService.class, () -> dataSourceService); context.register(StandaloneConfig.class); context.register(PPLServiceConfig.class); context.refresh(); @@ -169,7 +175,7 @@ public InternalRestHighLevelClient(RestClient restClient) { @Import({ExpressionConfig.class}) static class StandaloneConfig { @Autowired - private CatalogService catalogService; + private DataSourceService catalogService; @Autowired private ExecutionEngine executionEngine; @@ -183,7 +189,7 @@ QueryManager queryManager() { @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) QueryPlanFactory queryExecutionFactory(BuiltinFunctionRepository functionRepository) { catalogService - .getCatalogs() + .getDataSources() .forEach( catalog -> catalog diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/plugin/RestSQLQueryActionTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/plugin/RestSQLQueryActionTest.java index 4c9afe802e..327a56aa82 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/plugin/RestSQLQueryActionTest.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/plugin/RestSQLQueryActionTest.java @@ -27,7 +27,7 @@ import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.RestChannel; import org.opensearch.rest.RestRequest; -import org.opensearch.sql.catalog.CatalogService; +import org.opensearch.sql.catalog.DataSourceService; import org.opensearch.sql.common.antlr.SyntaxCheckException; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.executor.QueryManager; @@ -66,7 +66,7 @@ public void setup() { context = new AnnotationConfigApplicationContext(); context.registerBean(StorageEngine.class, () -> Mockito.mock(StorageEngine.class)); context.registerBean(ExecutionEngine.class, () -> Mockito.mock(ExecutionEngine.class)); - context.registerBean(CatalogService.class, () -> Mockito.mock(CatalogService.class)); + context.registerBean(DataSourceService.class, () -> Mockito.mock(DataSourceService.class)); context.registerBean(QueryManager.class, () -> queryManager); context.registerBean(QueryPlanFactory.class, () -> factory); context.register(SQLServiceConfig.class); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactory.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactory.java new file mode 100644 index 0000000000..7248a9edd4 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactory.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.storage; + +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.catalog.model.DataSource; +import org.opensearch.sql.catalog.model.DataSourceMetadata; +import org.opensearch.sql.catalog.model.DataSourceType; +import org.opensearch.sql.opensearch.client.OpenSearchClient; +import org.opensearch.sql.storage.DataSourceFactory; + +@RequiredArgsConstructor +public class OpenSearchDataSourceFactory implements DataSourceFactory { + + /** OpenSearch client connection. */ + private final OpenSearchClient client; + + private final Settings settings; + + @Override + public DataSourceType getDataSourceType() { + return DataSourceType.OPENSEARCH; + } + + @Override + public DataSource createDataSource(DataSourceMetadata metadata) { + return new DataSource(metadata.getName(), DataSourceType.OPENSEARCH, + new OpenSearchStorageEngine(client, settings)); + } +} diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactoryTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactoryTest.java new file mode 100644 index 0000000000..4edf618a02 --- /dev/null +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchDataSourceFactoryTest.java @@ -0,0 +1,51 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.storage; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.when; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.catalog.model.DataSource; +import org.opensearch.sql.catalog.model.DataSourceMetadata; +import org.opensearch.sql.catalog.model.DataSourceType; +import org.opensearch.sql.opensearch.client.OpenSearchClient; + +@ExtendWith(MockitoExtension.class) +class OpenSearchDataSourceFactoryTest { + + @Mock private OpenSearchClient client; + + @Mock private Settings settings; + + @Mock private DataSourceMetadata dataSourceMetadata; + + private OpenSearchDataSourceFactory factory; + + @BeforeEach + public void setup() { + factory = new OpenSearchDataSourceFactory(client, settings); + } + + @Test + void getDataSourceType() { + assertEquals(DataSourceType.OPENSEARCH, factory.getDataSourceType()); + } + + @Test + void createDataSource() { + when(dataSourceMetadata.getName()).thenReturn("opensearch"); + + DataSource dataSource = factory.createDataSource(dataSourceMetadata); + assertEquals("opensearch", dataSource.getName()); + assertEquals(DataSourceType.OPENSEARCH, dataSource.getConnectorType()); + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index e1d29a5775..ab2e858517 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -5,13 +5,27 @@ package org.opensearch.sql.plugin; +import static org.opensearch.sql.catalog.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import java.io.IOException; +import java.io.InputStream; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Objects; +import java.util.Set; import java.util.function.Supplier; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionResponse; import org.opensearch.action.ActionType; @@ -40,7 +54,10 @@ import org.opensearch.script.ScriptContext; import org.opensearch.script.ScriptEngine; import org.opensearch.script.ScriptService; -import org.opensearch.sql.catalog.CatalogService; +import org.opensearch.sql.catalog.DataSourceService; +import org.opensearch.sql.catalog.DataSourceServiceImpl; +import org.opensearch.sql.catalog.model.DataSource; +import org.opensearch.sql.catalog.model.DataSourceMetadata; import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.executor.AsyncRestExecutor; import org.opensearch.sql.legacy.metrics.Metrics; @@ -50,10 +67,9 @@ import org.opensearch.sql.opensearch.security.SecurityAccess; import org.opensearch.sql.opensearch.setting.LegacyOpenDistroSettings; import org.opensearch.sql.opensearch.setting.OpenSearchSettings; -import org.opensearch.sql.opensearch.storage.OpenSearchStorageEngine; +import org.opensearch.sql.opensearch.storage.OpenSearchDataSourceFactory; import org.opensearch.sql.opensearch.storage.script.ExpressionScriptEngine; import org.opensearch.sql.opensearch.storage.serialization.DefaultExpressionSerializer; -import org.opensearch.sql.plugin.catalog.CatalogServiceImpl; import org.opensearch.sql.plugin.catalog.CatalogSettings; import org.opensearch.sql.plugin.config.OpenSearchPluginConfig; import org.opensearch.sql.plugin.rest.RestPPLQueryAction; @@ -63,8 +79,9 @@ import org.opensearch.sql.plugin.transport.TransportPPLQueryAction; import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; import org.opensearch.sql.ppl.config.PPLServiceConfig; +import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory; import org.opensearch.sql.sql.config.SQLServiceConfig; -import org.opensearch.sql.storage.StorageEngine; +import org.opensearch.sql.storage.DataSourceFactory; import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.FixedExecutorBuilder; import org.opensearch.threadpool.ThreadPool; @@ -73,6 +90,8 @@ public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin, ReloadablePlugin { + private static final Logger LOG = LogManager.getLogger(); + private ClusterService clusterService; /** @@ -84,6 +103,8 @@ public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin, Rel private AnnotationConfigApplicationContext applicationContext; + private DataSourceService dataSourceService; + public String name() { return "sql"; } @@ -142,8 +163,15 @@ public Collection createComponents( this.clusterService = clusterService; this.pluginSettings = new OpenSearchSettings(clusterService.getClusterSettings()); this.client = (NodeClient) client; - CatalogServiceImpl.getInstance().loadConnectors(clusterService.getSettings()); - CatalogServiceImpl.getInstance().registerDefaultOpenSearchCatalog(openSearchStorageEngine()); + this.dataSourceService = + new DataSourceServiceImpl( + new ImmutableSet.Builder() + .add(new OpenSearchDataSourceFactory( + new OpenSearchNodeClient(this.client), pluginSettings)) + .add(new PrometheusStorageFactory()) + .build()); + dataSourceService.addDataSource(defaultOpenSearchDataSourceMetadata()); + loadDataSources(dataSourceService, clusterService.getSettings()); LocalClusterState.state().setClusterService(clusterService); LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings); @@ -155,7 +183,7 @@ public Collection createComponents( applicationContext.registerBean( org.opensearch.sql.common.setting.Settings.class, () -> pluginSettings); applicationContext.registerBean( - CatalogService.class, () -> CatalogServiceImpl.getInstance()); + DataSourceService.class, () -> dataSourceService); applicationContext.register(OpenSearchPluginConfig.class); applicationContext.register(PPLServiceConfig.class); applicationContext.register(SQLServiceConfig.class); @@ -195,12 +223,49 @@ public ScriptEngine getScriptEngine(Settings settings, Collection { + InputStream inputStream = CatalogSettings.CATALOG_CONFIG.get(settings); + if (inputStream != null) { + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + try { + List metadataList = + objectMapper.readValue(inputStream, new TypeReference<>() {}); + verifyDuplicateName(metadataList); + metadataList.forEach(metadata -> dataSourceService.addDataSource(metadata)); + } catch (IOException e) { + LOG.error( + "DataSource Configuration File uploaded is malformed. Verify and re-upload.", e); + } catch (Throwable e) { + LOG.error("DataSource construction failed.", e); + } + } + return null; + }); } + static void verifyDuplicateName(List metadataList) { + Set seenNames = new HashSet<>(); + for (DataSourceMetadata metadata : metadataList) { + if (seenNames.contains(metadata.getName())) { + throw new IllegalArgumentException( + String.format( + Locale.ROOT, + "Datasource name should be unique, Duplicate datasource found %s", + metadata.getName())); + } + seenNames.add(metadata.getName()); + } + } } diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/catalog/CatalogServiceImpl.java b/plugin/src/main/java/org/opensearch/sql/plugin/catalog/CatalogServiceImpl.java deleted file mode 100644 index 3c6e0e5281..0000000000 --- a/plugin/src/main/java/org/opensearch/sql/plugin/catalog/CatalogServiceImpl.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.plugin.catalog; - -import static org.opensearch.sql.analysis.CatalogSchemaIdentifierNameResolver.DEFAULT_CATALOG_NAME; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; -import java.io.InputStream; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import org.apache.commons.lang3.StringUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.common.settings.Settings; -import org.opensearch.sql.catalog.CatalogService; -import org.opensearch.sql.catalog.model.Catalog; -import org.opensearch.sql.catalog.model.CatalogMetadata; -import org.opensearch.sql.catalog.model.ConnectorType; -import org.opensearch.sql.opensearch.security.SecurityAccess; -import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory; -import org.opensearch.sql.storage.StorageEngine; -import org.opensearch.sql.storage.StorageEngineFactory; - -/** - * This class manages catalogs and responsible for creating connectors to these catalogs. - */ -public class CatalogServiceImpl implements CatalogService { - - private static final CatalogServiceImpl INSTANCE = new CatalogServiceImpl(); - - private static final String CATALOG_NAME_REGEX = "[@*A-Za-z]+?[*a-zA-Z_\\-0-9]*"; - - private static final Logger LOG = LogManager.getLogger(); - - private Map catalogMap = new HashMap<>(); - - private final Map connectorTypeStorageEngineFactoryMap; - - public static CatalogServiceImpl getInstance() { - return INSTANCE; - } - - private CatalogServiceImpl() { - connectorTypeStorageEngineFactoryMap = new HashMap<>(); - PrometheusStorageFactory prometheusStorageFactory = new PrometheusStorageFactory(); - connectorTypeStorageEngineFactoryMap.put(prometheusStorageFactory.getConnectorType(), - prometheusStorageFactory); - } - - /** - * This function reads settings and loads connectors to the data stores. - * This will be invoked during start up and also when settings are updated. - * - * @param settings settings. - */ - public void loadConnectors(Settings settings) { - SecurityAccess.doPrivileged(() -> { - InputStream inputStream = CatalogSettings.CATALOG_CONFIG.get(settings); - if (inputStream != null) { - ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - try { - List catalogs = - objectMapper.readValue(inputStream, new TypeReference<>() { - }); - validateCatalogs(catalogs); - constructConnectors(catalogs); - } catch (IOException e) { - LOG.error("Catalog Configuration File uploaded is malformed. Verify and re-upload.", e); - } catch (Throwable e) { - LOG.error("Catalog construction failed.", e); - } - } - return null; - }); - } - - @Override - public Set getCatalogs() { - return new HashSet<>(catalogMap.values()); - } - - @Override - public Catalog getCatalog(String catalogName) { - if (!catalogMap.containsKey(catalogName)) { - throw new IllegalArgumentException( - String.format("Catalog with name %s doesn't exist.", catalogName)); - } - return catalogMap.get(catalogName); - } - - - @Override - public void registerDefaultOpenSearchCatalog(StorageEngine storageEngine) { - if (storageEngine == null) { - throw new IllegalArgumentException("Default storage engine can't be null"); - } - catalogMap.put(DEFAULT_CATALOG_NAME, - new Catalog(DEFAULT_CATALOG_NAME, ConnectorType.OPENSEARCH, storageEngine)); - } - - private StorageEngine createStorageEngine(CatalogMetadata catalog) { - ConnectorType connector = catalog.getConnector(); - switch (connector) { - case PROMETHEUS: - return connectorTypeStorageEngineFactoryMap - .get(catalog.getConnector()) - .getStorageEngine(catalog.getName(), catalog.getProperties()); - default: - throw new IllegalStateException( - String.format("Unsupported Connector: %s", connector.name())); - } - } - - private void constructConnectors(List catalogs) { - catalogMap = new HashMap<>(); - for (CatalogMetadata catalog : catalogs) { - try { - String catalogName = catalog.getName(); - StorageEngine storageEngine = createStorageEngine(catalog); - catalogMap.put(catalogName, - new Catalog(catalog.getName(), catalog.getConnector(), storageEngine)); - } catch (Throwable e) { - LOG.error("Catalog : {} storage engine creation failed with the following message: {}", - catalog.getName(), e.getMessage(), e); - } - } - } - - /** - * This can be moved to a different validator class - * when we introduce more connectors. - * - * @param catalogs catalogs. - */ - private void validateCatalogs(List catalogs) { - - Set reviewedCatalogs = new HashSet<>(); - for (CatalogMetadata catalog : catalogs) { - - if (StringUtils.isEmpty(catalog.getName())) { - throw new IllegalArgumentException( - "Missing Name Field from a catalog. Name is a required parameter."); - } - - if (!catalog.getName().matches(CATALOG_NAME_REGEX)) { - throw new IllegalArgumentException( - String.format("Catalog Name: %s contains illegal characters." - + " Allowed characters: a-zA-Z0-9_-*@ ", catalog.getName())); - } - - String catalogName = catalog.getName(); - if (reviewedCatalogs.contains(catalogName)) { - throw new IllegalArgumentException("Catalogs with same name are not allowed."); - } else { - reviewedCatalogs.add(catalogName); - } - - if (Objects.isNull(catalog.getProperties())) { - throw new IllegalArgumentException("Missing properties field in catalog configuration. " - + "Properties are required parameters"); - } - - } - } - - -} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginConfig.java b/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginConfig.java index 596296522c..b8b0409d74 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginConfig.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginConfig.java @@ -12,7 +12,7 @@ import org.opensearch.client.node.NodeClient; import org.opensearch.sql.analysis.Analyzer; import org.opensearch.sql.analysis.ExpressionAnalyzer; -import org.opensearch.sql.catalog.CatalogService; +import org.opensearch.sql.catalog.DataSourceService; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.executor.QueryManager; @@ -56,7 +56,7 @@ public class OpenSearchPluginConfig { private Settings settings; @Autowired - private CatalogService catalogService; + private DataSourceService catalogService; @Bean @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) @@ -103,7 +103,7 @@ public QueryManager queryManager() { @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) public QueryPlanFactory queryExecutionFactory(BuiltinFunctionRepository functionRepository) { catalogService - .getCatalogs() + .getDataSources() .forEach( catalog -> catalog diff --git a/plugin/src/test/java/org/opensearch/sql/plugin/catalog/CatalogServiceImplTest.java b/plugin/src/test/java/org/opensearch/sql/plugin/catalog/CatalogServiceImplTest.java deleted file mode 100644 index cdbce55cb1..0000000000 --- a/plugin/src/test/java/org/opensearch/sql/plugin/catalog/CatalogServiceImplTest.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.plugin.catalog; - -import static org.opensearch.sql.analysis.CatalogSchemaIdentifierNameResolver.DEFAULT_CATALOG_NAME; - -import java.io.IOException; -import java.net.URISyntaxException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.HashSet; -import java.util.Set; -import lombok.SneakyThrows; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; -import org.opensearch.common.settings.MockSecureSettings; -import org.opensearch.common.settings.Settings; -import org.opensearch.sql.catalog.model.Catalog; -import org.opensearch.sql.catalog.model.ConnectorType; -import org.opensearch.sql.storage.StorageEngine; - -@RunWith(MockitoJUnitRunner.class) -public class CatalogServiceImplTest { - - public static final String CATALOG_SETTING_METADATA_KEY = - "plugins.query.federation.datasources.config"; - - @Mock - private StorageEngine storageEngine; - - @SneakyThrows - @Test - public void testLoadConnectors() { - Settings settings = getCatalogSettings("catalogs.json"); - CatalogServiceImpl.getInstance().loadConnectors(settings); - Set expected = new HashSet<>() {{ - add(new Catalog("prometheus", ConnectorType.PROMETHEUS, storageEngine)); - }}; - Assert.assertEquals(expected, CatalogServiceImpl.getInstance().getCatalogs()); - } - - - @SneakyThrows - @Test - public void testLoadConnectorsWithMultipleCatalogs() { - Settings settings = getCatalogSettings("multiple_catalogs.json"); - CatalogServiceImpl.getInstance().loadConnectors(settings); - Set expected = new HashSet<>() {{ - add(new Catalog("prometheus", ConnectorType.PROMETHEUS, storageEngine)); - add(new Catalog("prometheus-1", ConnectorType.PROMETHEUS, storageEngine)); - }}; - Assert.assertEquals(expected, CatalogServiceImpl.getInstance().getCatalogs()); - } - - @SneakyThrows - @Test - public void testLoadConnectorsWithMissingName() { - Settings settings = getCatalogSettings("catalog_missing_name.json"); - Set expected = CatalogServiceImpl.getInstance().getCatalogs(); - CatalogServiceImpl.getInstance().loadConnectors(settings); - Assert.assertEquals(expected, CatalogServiceImpl.getInstance().getCatalogs()); - } - - @SneakyThrows - @Test - public void testLoadConnectorsWithDuplicateCatalogNames() { - Settings settings = getCatalogSettings("duplicate_catalog_names.json"); - Set expected = CatalogServiceImpl.getInstance().getCatalogs(); - CatalogServiceImpl.getInstance().loadConnectors(settings); - Assert.assertEquals(expected, CatalogServiceImpl.getInstance().getCatalogs()); - } - - @SneakyThrows - @Test - public void testLoadConnectorsWithMalformedJson() { - Settings settings = getCatalogSettings("malformed_catalogs.json"); - Set expected = CatalogServiceImpl.getInstance().getCatalogs(); - CatalogServiceImpl.getInstance().loadConnectors(settings); - Assert.assertEquals(expected, CatalogServiceImpl.getInstance().getCatalogs()); - } - - @SneakyThrows - @Test - public void testGetStorageEngineAfterGetCatalogs() { - Settings settings = getCatalogSettings("empty_catalog.json"); - CatalogServiceImpl.getInstance().loadConnectors(settings); - CatalogServiceImpl.getInstance().registerDefaultOpenSearchCatalog(storageEngine); - Set expected = new HashSet<>(); - expected.add(new Catalog(DEFAULT_CATALOG_NAME, ConnectorType.OPENSEARCH, storageEngine)); - Assert.assertEquals(expected, CatalogServiceImpl.getInstance().getCatalogs()); - Assert.assertEquals(storageEngine, - CatalogServiceImpl.getInstance().getCatalog(DEFAULT_CATALOG_NAME).getStorageEngine()); - Assert.assertEquals(expected, CatalogServiceImpl.getInstance().getCatalogs()); - Assert.assertEquals(storageEngine, - CatalogServiceImpl.getInstance().getCatalog(DEFAULT_CATALOG_NAME).getStorageEngine()); - IllegalArgumentException illegalArgumentException - = Assert.assertThrows(IllegalArgumentException.class, - () -> CatalogServiceImpl.getInstance().getCatalog("test")); - Assert.assertEquals("Catalog with name test doesn't exist.", - illegalArgumentException.getMessage()); - } - - - @SneakyThrows - @Test - public void testGetStorageEngineAfterLoadingConnectors() { - Settings settings = getCatalogSettings("empty_catalog.json"); - CatalogServiceImpl.getInstance().registerDefaultOpenSearchCatalog(storageEngine); - //Load Connectors will empty the catalogMap.So OpenSearch Storage Engine - CatalogServiceImpl.getInstance().loadConnectors(settings); - Set expected = new HashSet<>(); - Assert.assertEquals(expected, CatalogServiceImpl.getInstance().getCatalogs()); - } - - @SneakyThrows - @Test - public void testLoadConnectorsWithIllegalCatalogNames() { - Settings settings = getCatalogSettings("illegal_catalog_name.json"); - Set expected = CatalogServiceImpl.getInstance().getCatalogs(); - CatalogServiceImpl.getInstance().loadConnectors(settings); - Assert.assertEquals(expected, CatalogServiceImpl.getInstance().getCatalogs()); - } - - private Settings getCatalogSettings(String filename) throws URISyntaxException, IOException { - MockSecureSettings mockSecureSettings = new MockSecureSettings(); - ClassLoader classLoader = getClass().getClassLoader(); - Path filepath = Paths.get(classLoader.getResource(filename).toURI()); - mockSecureSettings.setFile(CATALOG_SETTING_METADATA_KEY, Files.readAllBytes(filepath)); - return Settings.builder().setSecureSettings(mockSecureSettings).build(); - } - -} diff --git a/plugin/src/test/java/org/opensearch/sql/plugin/datasource/CatalogServiceImplTest.java b/plugin/src/test/java/org/opensearch/sql/plugin/datasource/CatalogServiceImplTest.java new file mode 100644 index 0000000000..7dbfb226cc --- /dev/null +++ b/plugin/src/test/java/org/opensearch/sql/plugin/datasource/CatalogServiceImplTest.java @@ -0,0 +1,138 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.datasource; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import lombok.SneakyThrows; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.opensearch.common.settings.MockSecureSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.sql.catalog.DataSourceService; +import org.opensearch.sql.catalog.model.DataSourceMetadata; +import org.opensearch.sql.catalog.model.DataSourceType; +import org.opensearch.sql.plugin.SQLPlugin; + +@RunWith(MockitoJUnitRunner.class) +public class CatalogServiceImplTest { + + public static final String CATALOG_SETTING_METADATA_KEY = + "plugins.query.federation.datasources.config"; + + @Mock + private DataSourceService dataSourceService; + + @SneakyThrows + @Test + public void testLoadConnectors() { + Settings settings = getCatalogSettings("catalogs.json"); + loadConnectors(settings); + List expected = + new ArrayList<>() { + { + add( + metadata( + "prometheus", + DataSourceType.PROMETHEUS, + ImmutableMap.of( + "prometheus.uri", "http://localhost:9090", + "prometheus.auth.type", "basicauth", + "prometheus.auth.username", "admin", + "prometheus.auth.password", "type"))); + } + }; + + verifyAddDataSourceWithMetadata(expected); + } + + @SneakyThrows + @Test + public void testLoadConnectorsWithMultipleCatalogs() { + Settings settings = getCatalogSettings("multiple_catalogs.json"); + loadConnectors(settings); + List expected = new ArrayList<>() {{ + add(metadata("prometheus", DataSourceType.PROMETHEUS, ImmutableMap.of( + "prometheus.uri", "http://localhost:9090", + "prometheus.auth.type", "basicauth", + "prometheus.auth.username", "admin", + "prometheus.auth.password", "type" + ))); + add(metadata("prometheus-1", DataSourceType.PROMETHEUS, ImmutableMap.of( + "prometheus.uri", "http://localhost:9090", + "prometheus.auth.type", "awssigv4", + "prometheus.auth.region", "us-east-1", + "prometheus.auth.access_key", "accessKey", + "prometheus.auth.secret_key", "secretKey" + ))); + }}; + + verifyAddDataSourceWithMetadata(expected); + } + + @SneakyThrows + @Test + public void testLoadConnectorsWithDuplicateCatalogNames() { + Settings settings = getCatalogSettings("duplicate_catalog_names.json"); + loadConnectors(settings); + + verify(dataSourceService, never()).addDataSource(any()); + } + + @SneakyThrows + @Test + public void testLoadConnectorsWithMalformedJson() { + Settings settings = getCatalogSettings("malformed_catalogs.json"); + loadConnectors(settings); + + verify(dataSourceService, never()).addDataSource(any()); + } + + private Settings getCatalogSettings(String filename) throws URISyntaxException, IOException { + MockSecureSettings mockSecureSettings = new MockSecureSettings(); + ClassLoader classLoader = getClass().getClassLoader(); + Path filepath = Paths.get(classLoader.getResource(filename).toURI()); + mockSecureSettings.setFile(CATALOG_SETTING_METADATA_KEY, Files.readAllBytes(filepath)); + return Settings.builder().setSecureSettings(mockSecureSettings).build(); + } + + void loadConnectors(Settings settings) { + SQLPlugin.loadDataSources(dataSourceService, settings); + } + + void verifyAddDataSourceWithMetadata(List metadataList) { + int expectCount = metadataList.size(); + ArgumentCaptor metadataCaptor = + ArgumentCaptor.forClass(DataSourceMetadata.class); + verify(dataSourceService, times(expectCount)).addDataSource(metadataCaptor.capture()); + List actualValues = metadataCaptor.getAllValues(); + assertEquals(metadataList, actualValues); + } + + DataSourceMetadata metadata(String name, DataSourceType type, Map properties) { + DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); + dataSourceMetadata.setName(name); + dataSourceMetadata.setConnector(type); + dataSourceMetadata.setProperties(properties); + return dataSourceMetadata; + } +} diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/PPLServiceTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/PPLServiceTest.java index ef8ec25df8..02f45b99f8 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/PPLServiceTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/PPLServiceTest.java @@ -16,8 +16,8 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import org.opensearch.sql.catalog.CatalogService; import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.catalog.DataSourceService; import org.opensearch.sql.executor.DefaultQueryManager; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse; @@ -52,7 +52,7 @@ public class PPLServiceTest { private ExecutionEngine executionEngine; @Mock - private CatalogService catalogService; + private DataSourceService catalogService; @Mock private ExecutionEngine.Schema schema; @@ -66,7 +66,7 @@ public void setUp() { context.registerBean(QueryPlanFactory.class, () -> new QueryPlanFactory(queryService)); context.registerBean(StorageEngine.class, () -> storageEngine); context.registerBean(ExecutionEngine.class, () -> executionEngine); - context.registerBean(CatalogService.class, () -> catalogService); + context.registerBean(DataSourceService.class, () -> catalogService); context.register(PPLServiceConfig.class); context.refresh(); pplService = context.getBean(PPLService.class); diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java index 41cbf3748f..a08bf02c4b 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java @@ -14,16 +14,18 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import okhttp3.OkHttpClient; -import org.opensearch.sql.catalog.model.ConnectorType; +import org.opensearch.sql.catalog.model.DataSource; +import org.opensearch.sql.catalog.model.DataSourceMetadata; +import org.opensearch.sql.catalog.model.DataSourceType; import org.opensearch.sql.catalog.model.auth.AuthenticationType; import org.opensearch.sql.prometheus.authinterceptors.AwsSigningInterceptor; import org.opensearch.sql.prometheus.authinterceptors.BasicAuthenticationInterceptor; import org.opensearch.sql.prometheus.client.PrometheusClient; import org.opensearch.sql.prometheus.client.PrometheusClientImpl; +import org.opensearch.sql.storage.DataSourceFactory; import org.opensearch.sql.storage.StorageEngine; -import org.opensearch.sql.storage.StorageEngineFactory; -public class PrometheusStorageFactory implements StorageEngineFactory { +public class PrometheusStorageFactory implements DataSourceFactory { public static final String URI = "prometheus.uri"; public static final String AUTH_TYPE = "prometheus.auth.type"; @@ -33,14 +35,20 @@ public class PrometheusStorageFactory implements StorageEngineFactory { public static final String ACCESS_KEY = "prometheus.auth.access_key"; public static final String SECRET_KEY = "prometheus.auth.secret_key"; - @Override - public ConnectorType getConnectorType() { - return ConnectorType.PROMETHEUS; + public DataSourceType getDataSourceType() { + return DataSourceType.PROMETHEUS; } @Override - public StorageEngine getStorageEngine(String catalogName, Map requiredConfig) { + public DataSource createDataSource(DataSourceMetadata metadata) { + return new DataSource( + metadata.getName(), + DataSourceType.PROMETHEUS, + getStorageEngine(metadata.getName(), metadata.getProperties())); + } + + StorageEngine getStorageEngine(String catalogName, Map requiredConfig) { validateFieldsInConfig(requiredConfig, Set.of(URI)); PrometheusClient prometheusClient; try { diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactoryTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactoryTest.java index 1b54cde5d9..e3553651a0 100644 --- a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactoryTest.java +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactoryTest.java @@ -13,7 +13,9 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.sql.catalog.model.ConnectorType; +import org.opensearch.sql.catalog.model.DataSource; +import org.opensearch.sql.catalog.model.DataSourceMetadata; +import org.opensearch.sql.catalog.model.DataSourceType; import org.opensearch.sql.storage.StorageEngine; @ExtendWith(MockitoExtension.class) @@ -22,7 +24,8 @@ public class PrometheusStorageFactoryTest { @Test void testGetConnectorType() { PrometheusStorageFactory prometheusStorageFactory = new PrometheusStorageFactory(); - Assertions.assertEquals(ConnectorType.PROMETHEUS, prometheusStorageFactory.getConnectorType()); + Assertions.assertEquals( + DataSourceType.PROMETHEUS, prometheusStorageFactory.getDataSourceType()); } @Test @@ -130,6 +133,21 @@ void testGetStorageEngineWithInvalidURISyntax() { exception.getMessage().contains("Prometheus Client creation failed due to:")); } + @Test + void createDataSourceSuccess() { + HashMap properties = new HashMap<>(); + properties.put("prometheus.uri", "http://dummyprometheus:9090"); + properties.put("prometheus.auth.type", "basicauth"); + properties.put("prometheus.auth.username", "admin"); + properties.put("prometheus.auth.password", "admin"); + DataSourceMetadata metadata = new DataSourceMetadata(); + metadata.setName("prometheus"); + metadata.setConnector(DataSourceType.PROMETHEUS); + metadata.setProperties(properties); + + DataSource dataSource = new PrometheusStorageFactory().createDataSource(metadata); + Assertions.assertTrue(dataSource.getStorageEngine() instanceof PrometheusStorageEngine); + } }