Skip to content

Commit

Permalink
Move DataSourceServiceImpl to core module
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo committed Nov 18, 2022
1 parent fef20f8 commit 457b2e2
Show file tree
Hide file tree
Showing 22 changed files with 634 additions and 369 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,37 @@

import java.util.Set;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.storage.StorageEngine;
import org.opensearch.sql.datasource.model.DataSourceMetadata;

/**
* DataSource Service manages datasources.
* DataSource Service manage {@link DataSource}.
*/
public interface DataSourceService {

/**
* Returns all datasource objects.
* Returns all DataSource objects.
*
* @return DataSource datasources.
* @return set of {@link DataSource}.
*/
Set<DataSource> getDataSources();

/**
* Returns DataSource with corresponding to the datasource name.
* Returns {@link DataSource} with corresponding to the DataSource name.
*
* @param dataSourceName Name of the datasource.
* @return DataSource datasource.
* @param dataSourceName Name of the {@link DataSource}.
* @return {@link DataSource}.
*/
DataSource getDataSource(String dataSourceName);

/**
* Default opensearch engine is not defined in datasources config.
* So the registration of default datasource happens separately.
* Register {@link DataSource} defined by {@link DataSourceMetadata}.
*
* @param storageEngine StorageEngine.
* @param dataSourceMetadata {@link DataSourceMetadata}.
*/
void registerDefaultOpenSearchDataSource(StorageEngine storageEngine);
void addDataSource(DataSourceMetadata dataSourceMetadata);

/**
* remove all the registered {@link DataSource}.
*/
void clear();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.datasource;

import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.storage.DataSourceFactory;

/**
* Default implementation of {@link DataSourceService}. It is per-jvm single instance.
*
* <p>{@link DataSourceService} is constructed by the list of {@link DataSourceFactory} at service
* bootstrap time. The set of {@link DataSourceFactory} is immutable. Client could add {@link
* DataSource} defined by {@link DataSourceMetadata} at any time. {@link DataSourceService} use
* {@link DataSourceFactory} to create {@link DataSource}.
*/
public class DataSourceServiceImpl implements DataSourceService {

private static String DATASOURCE_NAME_REGEX = "[@*A-Za-z]+?[*a-zA-Z_\\-0-9]*";

private final ConcurrentHashMap<String, DataSource> dataSourceMap;

private final Map<DataSourceType, DataSourceFactory> dataSourceFactoryMap;

/**
* Construct from the set of {@link DataSourceFactory} at bootstrap time.
*/
public DataSourceServiceImpl(Set<DataSourceFactory> dataSourceFactories) {
dataSourceFactoryMap =
dataSourceFactories.stream()
.collect(Collectors.toMap(DataSourceFactory::getDataSourceType, f -> f));
dataSourceMap = new ConcurrentHashMap<>();
}

@Override
public Set<DataSource> getDataSources() {
return new HashSet<>(dataSourceMap.values());
}

@Override
public DataSource getDataSource(String dataSourceName) {
if (!dataSourceMap.containsKey(dataSourceName)) {
throw new IllegalArgumentException(
String.format("DataSource with name %s doesn't exist.", dataSourceName));
}
return dataSourceMap.get(dataSourceName);
}

@Override
public void addDataSource(DataSourceMetadata metadata) {
validateDataSource(metadata);
dataSourceMap.put(
metadata.getName(),
dataSourceFactoryMap.get(metadata.getConnector()).createDataSource(metadata));
}

@Override
public void clear() {
dataSourceMap.clear();
}

/**
* This can be moved to a different validator class when we introduce more connectors.
*
* @param metadata {@link DataSourceMetadata}.
*/
private void validateDataSource(DataSourceMetadata metadata) {
if (StringUtils.isEmpty(metadata.getName())) {
throw new IllegalArgumentException(
"Missing Name Field from a DataSource. Name is a required parameter.");
}
if (!metadata.getName().matches(DATASOURCE_NAME_REGEX)) {
throw new IllegalArgumentException(
String.format(
"DataSource Name: %s contains illegal characters. Allowed characters: a-zA-Z0-9_-*@.",
metadata.getName()));
}
if (Objects.isNull(metadata.getProperties())) {
throw new IllegalArgumentException(
"Missing properties field in catalog configuration. Properties are required parameters.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.storage.StorageEngine;

/**
* Each user configured datasource mapping to one instance of DataSource per JVM.
*/
@Getter
@RequiredArgsConstructor
@EqualsAndHashCode
public class DataSource {

private final String name;

private final ConnectorType connectorType;
private final DataSourceType connectorType;

@EqualsAndHashCode.Exclude
private final StorageEngine storageEngine;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,44 @@

package org.opensearch.sql.datasource.model;


import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;

import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import org.opensearch.sql.datasource.DataSourceService;

@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
@EqualsAndHashCode
public class DataSourceMetadata {

@JsonProperty(required = true)
private String name;

@JsonProperty(required = true)
@JsonFormat(with = JsonFormat.Feature.ACCEPT_CASE_INSENSITIVE_PROPERTIES)
private ConnectorType connector;
private DataSourceType connector;

@JsonProperty(required = true)
private Map<String, String> properties;

/**
* Default OpenSearch {@link DataSourceMetadata}. Which is used to register default OpenSearch
* {@link DataSource} to {@link DataSourceService}.
*/
public static DataSourceMetadata defaultOpenSearchDataSourceMetadata() {
DataSourceMetadata dataSourceMetadata = new DataSourceMetadata();
dataSourceMetadata.setName(DEFAULT_DATASOURCE_NAME);
dataSourceMetadata.setConnector(DataSourceType.OPENSEARCH);
dataSourceMetadata.setProperties(ImmutableMap.of());
return dataSourceMetadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@

package org.opensearch.sql.datasource.model;

public enum ConnectorType {
public enum DataSourceType {
PROMETHEUS,OPENSEARCH
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.storage;

import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;

/**
* {@link DataSourceFactory} is used to create {@link DataSource} from {@link DataSourceMetadata}.
* Each data source define {@link DataSourceFactory} and register to {@link DataSourceService}.
* {@link DataSourceFactory} is one instance per JVM . Each {@link DataSourceType} mapping to one
* {@link DataSourceFactory}.
*/
public interface DataSourceFactory {
/**
* Get {@link DataSourceType}.
*/
DataSourceType getDataSourceType();

/**
* Create {@link DataSource}.
*/
DataSource createDataSource(DataSourceMetadata metadata);
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import org.opensearch.sql.config.TestConfig;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.ConnectorType;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.exception.ExpressionEvaluationException;
import org.opensearch.sql.expression.DSL;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.expression.env.Environment;
Expand Down Expand Up @@ -144,9 +144,7 @@ protected Environment<Expression, ExprType> typeEnv() {
@Bean
protected Analyzer analyzer(ExpressionAnalyzer expressionAnalyzer,
DataSourceService dataSourceService,
StorageEngine storageEngine,
Table table) {
dataSourceService.registerDefaultOpenSearchDataSource(storageEngine);
BuiltinFunctionRepository functionRepository = BuiltinFunctionRepository.getInstance();
functionRepository.register("prometheus", new FunctionResolver() {

Expand Down Expand Up @@ -195,7 +193,7 @@ private class DefaultDataSourceService implements DataSourceService {

private StorageEngine storageEngine = storageEngine();
private final DataSource dataSource
= new DataSource("prometheus", ConnectorType.PROMETHEUS, storageEngine);
= new DataSource("prometheus", DataSourceType.PROMETHEUS, storageEngine);


@Override
Expand All @@ -209,8 +207,13 @@ public DataSource getDataSource(String dataSourceName) {
}

@Override
public void registerDefaultOpenSearchDataSource(StorageEngine storageEngine) {
this.storageEngine = storageEngine;
public void addDataSource(DataSourceMetadata metadata) {
throw new UnsupportedOperationException();
}

@Override
public void clear() {
throw new UnsupportedOperationException();
}
}

Expand Down
Loading

0 comments on commit 457b2e2

Please sign in to comment.