Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Catalog to Datasource changes #1086

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ jacocoTestReport {
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it,
exclude: ['**/ast/**', '**/catalog/model/**'])
exclude: ['**/ast/**', '**/datasource/model/**'])
}))
}
}
Expand All @@ -85,7 +85,7 @@ jacocoTestCoverageVerification {
excludes = [
'org.opensearch.sql.utils.MLCommonsConstants',
'org.opensearch.sql.utils.Constants',
'org.opensearch.sql.catalog.model.*'
'org.opensearch.sql.datasource.model.*'
]
limit {
counter = 'LINE'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@

@Getter
@RequiredArgsConstructor
public class CatalogSchemaName {
public class DataSourceSchemaName {

private final String catalogName;
private final String dataSourceName;

private final String schemaName;

Expand Down
67 changes: 31 additions & 36 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,11 @@
import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC;
import static org.opensearch.sql.ast.tree.Sort.SortOrder.DESC;
import static org.opensearch.sql.data.type.ExprCoreType.STRUCT;
import static org.opensearch.sql.utils.MLCommonsConstants.ACTION;
import static org.opensearch.sql.utils.MLCommonsConstants.MODELID;
import static org.opensearch.sql.utils.MLCommonsConstants.PREDICT;
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_ANOMALOUS;
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_ANOMALY_GRADE;
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_SCORE;
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_TIMESTAMP;
import static org.opensearch.sql.utils.MLCommonsConstants.STATUS;
import static org.opensearch.sql.utils.MLCommonsConstants.TASKID;
import static org.opensearch.sql.utils.MLCommonsConstants.TIME_FIELD;
import static org.opensearch.sql.utils.MLCommonsConstants.TRAIN;
import static org.opensearch.sql.utils.MLCommonsConstants.TRAINANDPREDICT;
import static org.opensearch.sql.utils.SystemIndexUtils.CATALOGS_TABLE_NAME;
import static org.opensearch.sql.utils.SystemIndexUtils.DATASOURCES_TABLE_NAME;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
Expand All @@ -37,7 +29,7 @@
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.CatalogSchemaName;
import org.opensearch.sql.DataSourceSchemaName;
import org.opensearch.sql.analysis.symbol.Namespace;
import org.opensearch.sql.analysis.symbol.Symbol;
import org.opensearch.sql.ast.AbstractNodeVisitor;
Expand Down Expand Up @@ -69,10 +61,10 @@
import org.opensearch.sql.ast.tree.TableFunction;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Values;
import org.opensearch.sql.catalog.CatalogService;
import org.opensearch.sql.catalog.model.Catalog;
import org.opensearch.sql.data.model.ExprMissingValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.exception.SemanticCheckException;
import org.opensearch.sql.expression.DSL;
import org.opensearch.sql.expression.Expression;
Expand Down Expand Up @@ -101,7 +93,7 @@
import org.opensearch.sql.planner.logical.LogicalRename;
import org.opensearch.sql.planner.logical.LogicalSort;
import org.opensearch.sql.planner.logical.LogicalValues;
import org.opensearch.sql.planner.physical.catalog.CatalogTable;
import org.opensearch.sql.planner.physical.datasource.DataSourceTable;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.utils.ParseUtils;

Expand All @@ -117,7 +109,7 @@ public class Analyzer extends AbstractNodeVisitor<LogicalPlan, AnalysisContext>

private final NamedExpressionAnalyzer namedExpressionAnalyzer;

private final CatalogService catalogService;
private final DataSourceService dataSourceService;

private final BuiltinFunctionRepository repository;

Expand All @@ -126,10 +118,10 @@ public class Analyzer extends AbstractNodeVisitor<LogicalPlan, AnalysisContext>
*/
public Analyzer(
ExpressionAnalyzer expressionAnalyzer,
CatalogService catalogService,
DataSourceService dataSourceService,
BuiltinFunctionRepository repository) {
this.expressionAnalyzer = expressionAnalyzer;
this.catalogService = catalogService;
this.dataSourceService = dataSourceService;
this.selectExpressionAnalyzer = new SelectExpressionAnalyzer(expressionAnalyzer);
this.namedExpressionAnalyzer = new NamedExpressionAnalyzer(expressionAnalyzer);
this.repository = repository;
Expand All @@ -142,25 +134,27 @@ public LogicalPlan analyze(UnresolvedPlan unresolved, AnalysisContext context) {
@Override
public LogicalPlan visitRelation(Relation node, AnalysisContext context) {
QualifiedName qualifiedName = node.getTableQualifiedName();
Set<String> allowedCatalogNames = catalogService.getCatalogs()
Set<String> allowedDataSourceNames = dataSourceService.getDataSources()
.stream()
.map(Catalog::getName)
.map(DataSource::getName)
.collect(Collectors.toSet());
CatalogSchemaIdentifierNameResolver catalogSchemaIdentifierNameResolver
= new CatalogSchemaIdentifierNameResolver(qualifiedName.getParts(), allowedCatalogNames);
String tableName = catalogSchemaIdentifierNameResolver.getIdentifierName();
DataSourceSchemaIdentifierNameResolver dataSourceSchemaIdentifierNameResolver
= new DataSourceSchemaIdentifierNameResolver(qualifiedName.getParts(),
allowedDataSourceNames);
String tableName = dataSourceSchemaIdentifierNameResolver.getIdentifierName();
context.push();
TypeEnvironment curEnv = context.peek();
Table table;
if (CATALOGS_TABLE_NAME.equals(tableName)) {
table = new CatalogTable(catalogService);
if (DATASOURCES_TABLE_NAME.equals(tableName)) {
table = new DataSourceTable(dataSourceService);
} else {
table = catalogService
.getCatalog(catalogSchemaIdentifierNameResolver.getCatalogName())
table = dataSourceService
.getDataSource(dataSourceSchemaIdentifierNameResolver.getDataSourceName())
.getStorageEngine()
.getTable(new CatalogSchemaName(catalogSchemaIdentifierNameResolver.getCatalogName(),
catalogSchemaIdentifierNameResolver.getSchemaName()),
catalogSchemaIdentifierNameResolver.getIdentifierName());
.getTable(new DataSourceSchemaName(
dataSourceSchemaIdentifierNameResolver.getDataSourceName(),
dataSourceSchemaIdentifierNameResolver.getSchemaName()),
dataSourceSchemaIdentifierNameResolver.getIdentifierName());
}
table.getFieldTypes().forEach((k, v) -> curEnv.define(new Symbol(Namespace.FIELD_NAME, k), v));

Expand Down Expand Up @@ -188,28 +182,29 @@ public LogicalPlan visitRelationSubquery(RelationSubquery node, AnalysisContext
@Override
public LogicalPlan visitTableFunction(TableFunction node, AnalysisContext context) {
QualifiedName qualifiedName = node.getFunctionName();
Set<String> allowedCatalogNames = catalogService.getCatalogs()
Set<String> allowedDataSourceNames = dataSourceService.getDataSources()
.stream()
.map(Catalog::getName)
.map(DataSource::getName)
.collect(Collectors.toSet());
CatalogSchemaIdentifierNameResolver catalogSchemaIdentifierNameResolver
= new CatalogSchemaIdentifierNameResolver(qualifiedName.getParts(), allowedCatalogNames);
DataSourceSchemaIdentifierNameResolver dataSourceSchemaIdentifierNameResolver
= new DataSourceSchemaIdentifierNameResolver(qualifiedName.getParts(),
allowedDataSourceNames);

FunctionName functionName
= FunctionName.of(catalogSchemaIdentifierNameResolver.getIdentifierName());
= FunctionName.of(dataSourceSchemaIdentifierNameResolver.getIdentifierName());
List<Expression> arguments = node.getArguments().stream()
.map(unresolvedExpression -> this.expressionAnalyzer.analyze(unresolvedExpression, context))
.collect(Collectors.toList());
TableFunctionImplementation tableFunctionImplementation
= (TableFunctionImplementation) repository.compile(
catalogSchemaIdentifierNameResolver.getCatalogName(), functionName, arguments);
dataSourceSchemaIdentifierNameResolver.getDataSourceName(), functionName, arguments);
context.push();
TypeEnvironment curEnv = context.peek();
Table table = tableFunctionImplementation.applyArguments();
table.getFieldTypes().forEach((k, v) -> curEnv.define(new Symbol(Namespace.FIELD_NAME, k), v));
curEnv.define(new Symbol(Namespace.INDEX_NAME,
catalogSchemaIdentifierNameResolver.getIdentifierName()), STRUCT);
return new LogicalRelation(catalogSchemaIdentifierNameResolver.getIdentifierName(),
dataSourceSchemaIdentifierNameResolver.getIdentifierName()), STRUCT);
return new LogicalRelation(dataSourceSchemaIdentifierNameResolver.getIdentifierName(),
tableFunctionImplementation.applyArguments());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,51 +10,53 @@
import java.util.List;
import java.util.Set;

public class CatalogSchemaIdentifierNameResolver {
public class DataSourceSchemaIdentifierNameResolver {

public static final String DEFAULT_CATALOG_NAME = "@opensearch";
public static final String DEFAULT_DATASOURCE_NAME = "@opensearch";
public static final String DEFAULT_SCHEMA_NAME = "default";
public static final String INFORMATION_SCHEMA_NAME = "information_schema";

private String catalogName = DEFAULT_CATALOG_NAME;
private String dataSourceName = DEFAULT_DATASOURCE_NAME;
private String schemaName = DEFAULT_SCHEMA_NAME;
private String identifierName;

private static final String DOT = ".";

/**
* Data model for capturing catalog, schema and identifier from
* Data model for capturing dataSourceName, schema and identifier from
* fully qualifiedName. In the current state, it is used to capture
* CatalogSchemaTable name and CatalogSchemaFunction in case of table
* DataSourceSchemaTable name and DataSourceSchemaFunction in case of table
* functions.
*
* @param parts parts of qualifiedName.
* @param allowedCatalogs allowedCatalogs.
* @param allowedDataSources allowedDataSources.
*/
public CatalogSchemaIdentifierNameResolver(List<String> parts, Set<String> allowedCatalogs) {
List<String> remainingParts = captureSchemaName(captureCatalogName(parts, allowedCatalogs));
public DataSourceSchemaIdentifierNameResolver(List<String> parts,
Set<String> allowedDataSources) {
List<String> remainingParts
= captureSchemaName(captureDataSourceName(parts, allowedDataSources));
identifierName = String.join(DOT, remainingParts);
}

public String getIdentifierName() {
return identifierName;
}

public String getCatalogName() {
return catalogName;
public String getDataSourceName() {
return dataSourceName;
}

public String getSchemaName() {
return schemaName;
}


// Capture catalog name and return remaining parts(schema name and table name)
// Capture datasource name and return remaining parts(schema name and table name)
// from the fully qualified name.
private List<String> captureCatalogName(List<String> parts, Set<String> allowedCatalogs) {
if (parts.size() > 1 && allowedCatalogs.contains(parts.get(0))
|| DEFAULT_CATALOG_NAME.equals(parts.get(0))) {
catalogName = parts.get(0);
private List<String> captureDataSourceName(List<String> parts, Set<String> allowedDataSources) {
if (parts.size() > 1 && allowedDataSources.contains(parts.get(0))
|| DEFAULT_DATASOURCE_NAME.equals(parts.get(0))) {
dataSourceName = parts.get(0);
return parts.subList(1, parts.size());
} else {
return parts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public String getAlias() {

/**
* Get Qualified name preservs parts of the user given identifiers.
* This can later be utilized to determine Catalog,Schema and Table Name during
* This can later be utilized to determine DataSource,Schema and Table Name during
* Analyzer stage. So Passing QualifiedName directly to Analyzer Stage.
*
* @return TableQualifiedName.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.datasource;

import java.util.Set;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.storage.StorageEngine;

/**
* DataSource Service manages datasources.
*/
public interface DataSourceService {

/**
* Returns all datasource objects.
*
* @return DataSource datasources.
*/
Set<DataSource> getDataSources();

/**
* Returns DataSource with corresponding to the datasource name.
*
* @param dataSourceName Name of the datasource.
* @return DataSource datasource.
*/
DataSource getDataSource(String dataSourceName);

/**
* Default opensearch engine is not defined in datasources config.
* So the registration of default datasource happens separately.
*
* @param storageEngine StorageEngine.
*/
void registerDefaultOpenSearchDataSource(StorageEngine storageEngine);

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.catalog.model;
package org.opensearch.sql.datasource.model;

public enum ConnectorType {
PROMETHEUS,OPENSEARCH
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*
*/

package org.opensearch.sql.catalog.model;
package org.opensearch.sql.datasource.model;

import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand All @@ -15,7 +15,7 @@
@Getter
@RequiredArgsConstructor
@EqualsAndHashCode
public class Catalog {
public class DataSource {

private final String name;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.catalog.model;
package org.opensearch.sql.datasource.model;

import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
Expand All @@ -15,7 +15,7 @@
@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
public class CatalogMetadata {
public class DataSourceMetadata {

@JsonProperty(required = true)
private String name;
Expand Down
Loading