Skip to content

Commit

Permalink
Change CatalogService to DataSourceService
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo committed Nov 17, 2022
1 parent 0504c71 commit fb6970f
Show file tree
Hide file tree
Showing 33 changed files with 953 additions and 502 deletions.
26 changes: 9 additions & 17 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,10 @@
import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC;
import static org.opensearch.sql.ast.tree.Sort.SortOrder.DESC;
import static org.opensearch.sql.data.type.ExprCoreType.STRUCT;
import static org.opensearch.sql.utils.MLCommonsConstants.ACTION;
import static org.opensearch.sql.utils.MLCommonsConstants.MODELID;
import static org.opensearch.sql.utils.MLCommonsConstants.PREDICT;
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_ANOMALOUS;
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_ANOMALY_GRADE;
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_SCORE;
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_TIMESTAMP;
import static org.opensearch.sql.utils.MLCommonsConstants.STATUS;
import static org.opensearch.sql.utils.MLCommonsConstants.TASKID;
import static org.opensearch.sql.utils.MLCommonsConstants.TIME_FIELD;
import static org.opensearch.sql.utils.MLCommonsConstants.TRAIN;
import static org.opensearch.sql.utils.MLCommonsConstants.TRAINANDPREDICT;
import static org.opensearch.sql.utils.SystemIndexUtils.CATALOGS_TABLE_NAME;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -69,8 +61,8 @@
import org.opensearch.sql.ast.tree.TableFunction;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Values;
import org.opensearch.sql.catalog.CatalogService;
import org.opensearch.sql.catalog.model.Catalog;
import org.opensearch.sql.catalog.DataSourceService;
import org.opensearch.sql.catalog.model.DataSource;
import org.opensearch.sql.data.model.ExprMissingValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.exception.SemanticCheckException;
Expand Down Expand Up @@ -117,7 +109,7 @@ public class Analyzer extends AbstractNodeVisitor<LogicalPlan, AnalysisContext>

private final NamedExpressionAnalyzer namedExpressionAnalyzer;

private final CatalogService catalogService;
private final DataSourceService catalogService;

private final BuiltinFunctionRepository repository;

Expand All @@ -126,7 +118,7 @@ public class Analyzer extends AbstractNodeVisitor<LogicalPlan, AnalysisContext>
*/
public Analyzer(
ExpressionAnalyzer expressionAnalyzer,
CatalogService catalogService,
DataSourceService catalogService,
BuiltinFunctionRepository repository) {
this.expressionAnalyzer = expressionAnalyzer;
this.catalogService = catalogService;
Expand All @@ -142,9 +134,9 @@ public LogicalPlan analyze(UnresolvedPlan unresolved, AnalysisContext context) {
@Override
public LogicalPlan visitRelation(Relation node, AnalysisContext context) {
QualifiedName qualifiedName = node.getTableQualifiedName();
Set<String> allowedCatalogNames = catalogService.getCatalogs()
Set<String> allowedCatalogNames = catalogService.getDataSources()
.stream()
.map(Catalog::getName)
.map(DataSource::getName)
.collect(Collectors.toSet());
CatalogSchemaIdentifierNameResolver catalogSchemaIdentifierNameResolver
= new CatalogSchemaIdentifierNameResolver(qualifiedName.getParts(), allowedCatalogNames);
Expand All @@ -156,7 +148,7 @@ public LogicalPlan visitRelation(Relation node, AnalysisContext context) {
table = new CatalogTable(catalogService);
} else {
table = catalogService
.getCatalog(catalogSchemaIdentifierNameResolver.getCatalogName())
.getDataSource(catalogSchemaIdentifierNameResolver.getCatalogName())
.getStorageEngine()
.getTable(new CatalogSchemaName(catalogSchemaIdentifierNameResolver.getCatalogName(),
catalogSchemaIdentifierNameResolver.getSchemaName()),
Expand Down Expand Up @@ -188,9 +180,9 @@ public LogicalPlan visitRelationSubquery(RelationSubquery node, AnalysisContext
@Override
public LogicalPlan visitTableFunction(TableFunction node, AnalysisContext context) {
QualifiedName qualifiedName = node.getFunctionName();
Set<String> allowedCatalogNames = catalogService.getCatalogs()
Set<String> allowedCatalogNames = catalogService.getDataSources()
.stream()
.map(Catalog::getName)
.map(DataSource::getName)
.collect(Collectors.toSet());
CatalogSchemaIdentifierNameResolver catalogSchemaIdentifierNameResolver
= new CatalogSchemaIdentifierNameResolver(qualifiedName.getParts(), allowedCatalogNames);
Expand Down
40 changes: 0 additions & 40 deletions core/src/main/java/org/opensearch/sql/catalog/CatalogService.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.catalog;

import java.util.Set;
import org.opensearch.sql.catalog.model.DataSource;
import org.opensearch.sql.catalog.model.DataSourceMetadata;

/**
* DataSource Service manage {@link DataSource}.
*/
public interface DataSourceService {

/**
* Returns all DataSource objects.
*
* @return set of {@link DataSource}.
*/
Set<DataSource> getDataSources();

/**
* Returns {@link DataSource} with corresponding to the DataSource name.
*
* @param dataSourceName Name of the {@link DataSource}.
* @return {@link DataSource}.
*/
DataSource getDataSource(String dataSourceName);

/**
* Register {@link DataSource} defined by {@link DataSourceMetadata}.
*
* @param dataSourceMetadata {@link DataSourceMetadata}.
*/
void addDataSource(DataSourceMetadata dataSourceMetadata);

/**
* remove all the registered {@link DataSource}.
*/
void clear();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.catalog;

import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.catalog.model.DataSource;
import org.opensearch.sql.catalog.model.DataSourceMetadata;
import org.opensearch.sql.catalog.model.DataSourceType;
import org.opensearch.sql.storage.DataSourceFactory;

/**
* This class manages catalogs and responsible for creating connectors to these catalogs.
*/
public class DataSourceServiceImpl implements DataSourceService {

private static String DATASOURCE_NAME_REGEX = "[@*A-Za-z]+?[*a-zA-Z_\\-0-9]*";

private static final Logger LOG = LogManager.getLogger();

private final ConcurrentHashMap<String, DataSource> dataSourceMap;

private final Map<DataSourceType, DataSourceFactory> dataSourceFactoryMap;

/**
* Constructor.
*/
public DataSourceServiceImpl(Set<DataSourceFactory> dataSourceFactories) {
dataSourceFactoryMap =
dataSourceFactories.stream()
.collect(Collectors.toMap(DataSourceFactory::getDataSourceType, f -> f));
dataSourceMap = new ConcurrentHashMap<>();
}

@Override
public Set<DataSource> getDataSources() {
return new HashSet<>(dataSourceMap.values());
}

@Override
public DataSource getDataSource(String dataSourceName) {
if (!dataSourceMap.containsKey(dataSourceName)) {
throw new IllegalArgumentException(
String.format("DataSource with name %s doesn't exist.", dataSourceName));
}
return dataSourceMap.get(dataSourceName);
}

@Override
public void addDataSource(DataSourceMetadata metadata) {
validateDataSource(metadata);
dataSourceMap.put(
metadata.getName(),
dataSourceFactoryMap.get(metadata.getConnector()).createDataSource(metadata));
}

@Override
public void clear() {
dataSourceMap.clear();
}

/**
* This can be moved to a different validator class when we introduce more connectors.
*
* @param metadata {@link DataSourceMetadata}.
*/
private void validateDataSource(DataSourceMetadata metadata) {
if (StringUtils.isEmpty(metadata.getName())) {
throw new IllegalArgumentException(
"Missing Name Field from a DataSource. Name is a required parameter.");
}
if (!metadata.getName().matches(DATASOURCE_NAME_REGEX)) {
throw new IllegalArgumentException(
String.format(
"DataSource Name: %s contains illegal characters. Allowed characters: a-zA-Z0-9_-*@.",
metadata.getName()));
}
if (Objects.isNull(metadata.getProperties())) {
throw new IllegalArgumentException(
"Missing properties field in catalog configuration. Properties are required parameters.");
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
@Getter
@RequiredArgsConstructor
@EqualsAndHashCode
public class Catalog {
public class DataSource {

private final String name;

private final ConnectorType connectorType;
private final DataSourceType connectorType;

@EqualsAndHashCode.Exclude
private final StorageEngine storageEngine;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.catalog.model;

import static org.opensearch.sql.analysis.CatalogSchemaIdentifierNameResolver.DEFAULT_CATALOG_NAME;

import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import org.opensearch.sql.catalog.DataSourceService;

@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
@EqualsAndHashCode
public class DataSourceMetadata {

@JsonProperty(required = true)
private String name;

@JsonProperty(required = true)
@JsonFormat(with = JsonFormat.Feature.ACCEPT_CASE_INSENSITIVE_PROPERTIES)
private DataSourceType connector;

@JsonProperty(required = true)
private Map<String, String> properties;

/**
* Default OpenSearch {@link DataSourceMetadata}. Which is used to register default OpenSearch
* {@link DataSource} to {@link DataSourceService}.
*/
public static DataSourceMetadata defaultOpenSearchDataSourceMetadata() {
DataSourceMetadata dataSourceMetadata = new DataSourceMetadata();
dataSourceMetadata.setName(DEFAULT_CATALOG_NAME);
dataSourceMetadata.setConnector(DataSourceType.OPENSEARCH);
dataSourceMetadata.setProperties(ImmutableMap.of());
return dataSourceMetadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package org.opensearch.sql.catalog.model;

public enum ConnectorType {
/**
* Supported DataSource Type.
*/
public enum DataSourceType {
PROMETHEUS,OPENSEARCH
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public enum AuthenticationType {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import java.util.Map;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.catalog.CatalogService;
import org.opensearch.sql.catalog.DataSourceService;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.planner.DefaultImplementor;
import org.opensearch.sql.planner.logical.LogicalPlan;
Expand All @@ -30,7 +30,7 @@
@EqualsAndHashCode
public class CatalogTable implements Table {

private final CatalogService catalogService;
private final DataSourceService catalogService;

@Override
public Map<String, ExprType> getFieldTypes() {
Expand All @@ -47,7 +47,7 @@ public PhysicalPlan implement(LogicalPlan plan) {
public static class CatalogTableDefaultImplementor
extends DefaultImplementor<Object> {

private final CatalogService catalogService;
private final DataSourceService catalogService;

@Override
public PhysicalPlan visitRelation(LogicalRelation node, Object context) {
Expand Down
Loading

0 comments on commit fb6970f

Please sign in to comment.