Skip to content

Commit

Permalink
Catalog to Datasource changes (#1086)
Browse files Browse the repository at this point in the history
Signed-off-by: vamsi-amazon <[email protected]>
  • Loading branch information
vmmusings authored Nov 18, 2022
1 parent 81c9285 commit fef20f8
Show file tree
Hide file tree
Showing 74 changed files with 745 additions and 739 deletions.
4 changes: 2 additions & 2 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ jacocoTestReport {
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it,
exclude: ['**/ast/**', '**/catalog/model/**'])
exclude: ['**/ast/**', '**/datasource/model/**'])
}))
}
}
Expand All @@ -85,7 +85,7 @@ jacocoTestCoverageVerification {
excludes = [
'org.opensearch.sql.utils.MLCommonsConstants',
'org.opensearch.sql.utils.Constants',
'org.opensearch.sql.catalog.model.*'
'org.opensearch.sql.datasource.model.*'
]
limit {
counter = 'LINE'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@

@Getter
@RequiredArgsConstructor
public class CatalogSchemaName {
public class DataSourceSchemaName {

private final String catalogName;
private final String dataSourceName;

private final String schemaName;

Expand Down
67 changes: 31 additions & 36 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,11 @@
import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC;
import static org.opensearch.sql.ast.tree.Sort.SortOrder.DESC;
import static org.opensearch.sql.data.type.ExprCoreType.STRUCT;
import static org.opensearch.sql.utils.MLCommonsConstants.ACTION;
import static org.opensearch.sql.utils.MLCommonsConstants.MODELID;
import static org.opensearch.sql.utils.MLCommonsConstants.PREDICT;
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_ANOMALOUS;
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_ANOMALY_GRADE;
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_SCORE;
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_TIMESTAMP;
import static org.opensearch.sql.utils.MLCommonsConstants.STATUS;
import static org.opensearch.sql.utils.MLCommonsConstants.TASKID;
import static org.opensearch.sql.utils.MLCommonsConstants.TIME_FIELD;
import static org.opensearch.sql.utils.MLCommonsConstants.TRAIN;
import static org.opensearch.sql.utils.MLCommonsConstants.TRAINANDPREDICT;
import static org.opensearch.sql.utils.SystemIndexUtils.CATALOGS_TABLE_NAME;
import static org.opensearch.sql.utils.SystemIndexUtils.DATASOURCES_TABLE_NAME;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
Expand All @@ -37,7 +29,7 @@
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.CatalogSchemaName;
import org.opensearch.sql.DataSourceSchemaName;
import org.opensearch.sql.analysis.symbol.Namespace;
import org.opensearch.sql.analysis.symbol.Symbol;
import org.opensearch.sql.ast.AbstractNodeVisitor;
Expand Down Expand Up @@ -69,10 +61,10 @@
import org.opensearch.sql.ast.tree.TableFunction;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Values;
import org.opensearch.sql.catalog.CatalogService;
import org.opensearch.sql.catalog.model.Catalog;
import org.opensearch.sql.data.model.ExprMissingValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.exception.SemanticCheckException;
import org.opensearch.sql.expression.DSL;
import org.opensearch.sql.expression.Expression;
Expand Down Expand Up @@ -101,7 +93,7 @@
import org.opensearch.sql.planner.logical.LogicalRename;
import org.opensearch.sql.planner.logical.LogicalSort;
import org.opensearch.sql.planner.logical.LogicalValues;
import org.opensearch.sql.planner.physical.catalog.CatalogTable;
import org.opensearch.sql.planner.physical.datasource.DataSourceTable;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.utils.ParseUtils;

Expand All @@ -117,7 +109,7 @@ public class Analyzer extends AbstractNodeVisitor<LogicalPlan, AnalysisContext>

private final NamedExpressionAnalyzer namedExpressionAnalyzer;

private final CatalogService catalogService;
private final DataSourceService dataSourceService;

private final BuiltinFunctionRepository repository;

Expand All @@ -126,10 +118,10 @@ public class Analyzer extends AbstractNodeVisitor<LogicalPlan, AnalysisContext>
*/
public Analyzer(
ExpressionAnalyzer expressionAnalyzer,
CatalogService catalogService,
DataSourceService dataSourceService,
BuiltinFunctionRepository repository) {
this.expressionAnalyzer = expressionAnalyzer;
this.catalogService = catalogService;
this.dataSourceService = dataSourceService;
this.selectExpressionAnalyzer = new SelectExpressionAnalyzer(expressionAnalyzer);
this.namedExpressionAnalyzer = new NamedExpressionAnalyzer(expressionAnalyzer);
this.repository = repository;
Expand All @@ -142,25 +134,27 @@ public LogicalPlan analyze(UnresolvedPlan unresolved, AnalysisContext context) {
@Override
public LogicalPlan visitRelation(Relation node, AnalysisContext context) {
QualifiedName qualifiedName = node.getTableQualifiedName();
Set<String> allowedCatalogNames = catalogService.getCatalogs()
Set<String> allowedDataSourceNames = dataSourceService.getDataSources()
.stream()
.map(Catalog::getName)
.map(DataSource::getName)
.collect(Collectors.toSet());
CatalogSchemaIdentifierNameResolver catalogSchemaIdentifierNameResolver
= new CatalogSchemaIdentifierNameResolver(qualifiedName.getParts(), allowedCatalogNames);
String tableName = catalogSchemaIdentifierNameResolver.getIdentifierName();
DataSourceSchemaIdentifierNameResolver dataSourceSchemaIdentifierNameResolver
= new DataSourceSchemaIdentifierNameResolver(qualifiedName.getParts(),
allowedDataSourceNames);
String tableName = dataSourceSchemaIdentifierNameResolver.getIdentifierName();
context.push();
TypeEnvironment curEnv = context.peek();
Table table;
if (CATALOGS_TABLE_NAME.equals(tableName)) {
table = new CatalogTable(catalogService);
if (DATASOURCES_TABLE_NAME.equals(tableName)) {
table = new DataSourceTable(dataSourceService);
} else {
table = catalogService
.getCatalog(catalogSchemaIdentifierNameResolver.getCatalogName())
table = dataSourceService
.getDataSource(dataSourceSchemaIdentifierNameResolver.getDataSourceName())
.getStorageEngine()
.getTable(new CatalogSchemaName(catalogSchemaIdentifierNameResolver.getCatalogName(),
catalogSchemaIdentifierNameResolver.getSchemaName()),
catalogSchemaIdentifierNameResolver.getIdentifierName());
.getTable(new DataSourceSchemaName(
dataSourceSchemaIdentifierNameResolver.getDataSourceName(),
dataSourceSchemaIdentifierNameResolver.getSchemaName()),
dataSourceSchemaIdentifierNameResolver.getIdentifierName());
}
table.getFieldTypes().forEach((k, v) -> curEnv.define(new Symbol(Namespace.FIELD_NAME, k), v));

Expand Down Expand Up @@ -188,28 +182,29 @@ public LogicalPlan visitRelationSubquery(RelationSubquery node, AnalysisContext
@Override
public LogicalPlan visitTableFunction(TableFunction node, AnalysisContext context) {
QualifiedName qualifiedName = node.getFunctionName();
Set<String> allowedCatalogNames = catalogService.getCatalogs()
Set<String> allowedDataSourceNames = dataSourceService.getDataSources()
.stream()
.map(Catalog::getName)
.map(DataSource::getName)
.collect(Collectors.toSet());
CatalogSchemaIdentifierNameResolver catalogSchemaIdentifierNameResolver
= new CatalogSchemaIdentifierNameResolver(qualifiedName.getParts(), allowedCatalogNames);
DataSourceSchemaIdentifierNameResolver dataSourceSchemaIdentifierNameResolver
= new DataSourceSchemaIdentifierNameResolver(qualifiedName.getParts(),
allowedDataSourceNames);

FunctionName functionName
= FunctionName.of(catalogSchemaIdentifierNameResolver.getIdentifierName());
= FunctionName.of(dataSourceSchemaIdentifierNameResolver.getIdentifierName());
List<Expression> arguments = node.getArguments().stream()
.map(unresolvedExpression -> this.expressionAnalyzer.analyze(unresolvedExpression, context))
.collect(Collectors.toList());
TableFunctionImplementation tableFunctionImplementation
= (TableFunctionImplementation) repository.compile(
catalogSchemaIdentifierNameResolver.getCatalogName(), functionName, arguments);
dataSourceSchemaIdentifierNameResolver.getDataSourceName(), functionName, arguments);
context.push();
TypeEnvironment curEnv = context.peek();
Table table = tableFunctionImplementation.applyArguments();
table.getFieldTypes().forEach((k, v) -> curEnv.define(new Symbol(Namespace.FIELD_NAME, k), v));
curEnv.define(new Symbol(Namespace.INDEX_NAME,
catalogSchemaIdentifierNameResolver.getIdentifierName()), STRUCT);
return new LogicalRelation(catalogSchemaIdentifierNameResolver.getIdentifierName(),
dataSourceSchemaIdentifierNameResolver.getIdentifierName()), STRUCT);
return new LogicalRelation(dataSourceSchemaIdentifierNameResolver.getIdentifierName(),
tableFunctionImplementation.applyArguments());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,51 +10,53 @@
import java.util.List;
import java.util.Set;

public class CatalogSchemaIdentifierNameResolver {
public class DataSourceSchemaIdentifierNameResolver {

public static final String DEFAULT_CATALOG_NAME = "@opensearch";
public static final String DEFAULT_DATASOURCE_NAME = "@opensearch";
public static final String DEFAULT_SCHEMA_NAME = "default";
public static final String INFORMATION_SCHEMA_NAME = "information_schema";

private String catalogName = DEFAULT_CATALOG_NAME;
private String dataSourceName = DEFAULT_DATASOURCE_NAME;
private String schemaName = DEFAULT_SCHEMA_NAME;
private String identifierName;

private static final String DOT = ".";

/**
* Data model for capturing catalog, schema and identifier from
* Data model for capturing dataSourceName, schema and identifier from
* fully qualifiedName. In the current state, it is used to capture
* CatalogSchemaTable name and CatalogSchemaFunction in case of table
* DataSourceSchemaTable name and DataSourceSchemaFunction in case of table
* functions.
*
* @param parts parts of qualifiedName.
* @param allowedCatalogs allowedCatalogs.
* @param allowedDataSources allowedDataSources.
*/
public CatalogSchemaIdentifierNameResolver(List<String> parts, Set<String> allowedCatalogs) {
List<String> remainingParts = captureSchemaName(captureCatalogName(parts, allowedCatalogs));
public DataSourceSchemaIdentifierNameResolver(List<String> parts,
Set<String> allowedDataSources) {
List<String> remainingParts
= captureSchemaName(captureDataSourceName(parts, allowedDataSources));
identifierName = String.join(DOT, remainingParts);
}

public String getIdentifierName() {
return identifierName;
}

public String getCatalogName() {
return catalogName;
public String getDataSourceName() {
return dataSourceName;
}

public String getSchemaName() {
return schemaName;
}


// Capture catalog name and return remaining parts(schema name and table name)
// Capture datasource name and return remaining parts(schema name and table name)
// from the fully qualified name.
private List<String> captureCatalogName(List<String> parts, Set<String> allowedCatalogs) {
if (parts.size() > 1 && allowedCatalogs.contains(parts.get(0))
|| DEFAULT_CATALOG_NAME.equals(parts.get(0))) {
catalogName = parts.get(0);
private List<String> captureDataSourceName(List<String> parts, Set<String> allowedDataSources) {
if (parts.size() > 1 && allowedDataSources.contains(parts.get(0))
|| DEFAULT_DATASOURCE_NAME.equals(parts.get(0))) {
dataSourceName = parts.get(0);
return parts.subList(1, parts.size());
} else {
return parts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public String getAlias() {

/**
* Get Qualified name preservs parts of the user given identifiers.
* This can later be utilized to determine Catalog,Schema and Table Name during
* This can later be utilized to determine DataSource,Schema and Table Name during
* Analyzer stage. So Passing QualifiedName directly to Analyzer Stage.
*
* @return TableQualifiedName.
Expand Down
40 changes: 0 additions & 40 deletions core/src/main/java/org/opensearch/sql/catalog/CatalogService.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.datasource;

import java.util.Set;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.storage.StorageEngine;

/**
* DataSource Service manages datasources.
*/
public interface DataSourceService {

/**
* Returns all datasource objects.
*
* @return DataSource datasources.
*/
Set<DataSource> getDataSources();

/**
* Returns DataSource with corresponding to the datasource name.
*
* @param dataSourceName Name of the datasource.
* @return DataSource datasource.
*/
DataSource getDataSource(String dataSourceName);

/**
* Default opensearch engine is not defined in datasources config.
* So the registration of default datasource happens separately.
*
* @param storageEngine StorageEngine.
*/
void registerDefaultOpenSearchDataSource(StorageEngine storageEngine);

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.catalog.model;
package org.opensearch.sql.datasource.model;

public enum ConnectorType {
PROMETHEUS,OPENSEARCH
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*
*/

package org.opensearch.sql.catalog.model;
package org.opensearch.sql.datasource.model;

import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand All @@ -15,7 +15,7 @@
@Getter
@RequiredArgsConstructor
@EqualsAndHashCode
public class Catalog {
public class DataSource {

private final String name;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.catalog.model;
package org.opensearch.sql.datasource.model;

import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
Expand All @@ -15,7 +15,7 @@
@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
public class CatalogMetadata {
public class DataSourceMetadata {

@JsonProperty(required = true)
private String name;
Expand Down
Loading

0 comments on commit fef20f8

Please sign in to comment.