Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move DataSourceServiceImpl to core module #1084

Merged
merged 3 commits into from
Nov 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,37 @@

import java.util.Set;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.storage.StorageEngine;
import org.opensearch.sql.datasource.model.DataSourceMetadata;

/**
* DataSource Service manages datasources.
* DataSource Service manage {@link DataSource}.
*/
public interface DataSourceService {

/**
* Returns all datasource objects.
* Returns all DataSource objects.
*
* @return DataSource datasources.
* @return set of {@link DataSource}.
*/
Set<DataSource> getDataSources();

/**
* Returns DataSource with corresponding to the datasource name.
* Returns {@link DataSource} with corresponding to the DataSource name.
*
* @param dataSourceName Name of the datasource.
* @return DataSource datasource.
* @param dataSourceName Name of the {@link DataSource}.
* @return {@link DataSource}.
*/
DataSource getDataSource(String dataSourceName);

/**
* Default opensearch engine is not defined in datasources config.
* So the registration of default datasource happens separately.
* Register {@link DataSource} defined by {@link DataSourceMetadata}.
*
* @param storageEngine StorageEngine.
* @param metadatas list of {@link DataSourceMetadata}.
*/
void registerDefaultOpenSearchDataSource(StorageEngine storageEngine);
void addDataSource(DataSourceMetadata... metadatas);

/**
* remove all the registered {@link DataSource}.
*/
void clear();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.datasource;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.opensearch.sql.common.utils.StringUtils;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.storage.DataSourceFactory;

/**
* Default implementation of {@link DataSourceService}. It is per-jvm single instance.
*
* <p>{@link DataSourceService} is constructed by the list of {@link DataSourceFactory} at service
* bootstrap time. The set of {@link DataSourceFactory} is immutable. Client could add {@link
* DataSource} defined by {@link DataSourceMetadata} at any time. {@link DataSourceService} use
* {@link DataSourceFactory} to create {@link DataSource}.
*/
public class DataSourceServiceImpl implements DataSourceService {
penghuo marked this conversation as resolved.
Show resolved Hide resolved

private static String DATASOURCE_NAME_REGEX = "[@*A-Za-z]+?[*a-zA-Z_\\-0-9]*";

private final ConcurrentHashMap<String, DataSource> dataSourceMap;

private final Map<DataSourceType, DataSourceFactory> dataSourceFactoryMap;

/**
* Construct from the set of {@link DataSourceFactory} at bootstrap time.
*/
public DataSourceServiceImpl(Set<DataSourceFactory> dataSourceFactories) {
dataSourceFactoryMap =
dataSourceFactories.stream()
.collect(Collectors.toMap(DataSourceFactory::getDataSourceType, f -> f));
dataSourceMap = new ConcurrentHashMap<>();
}

@Override
public Set<DataSource> getDataSources() {
return Set.copyOf(dataSourceMap.values());
}

@Override
public DataSource getDataSource(String dataSourceName) {
YANG-DB marked this conversation as resolved.
Show resolved Hide resolved
if (!dataSourceMap.containsKey(dataSourceName)) {
throw new IllegalArgumentException(
String.format("DataSource with name %s doesn't exist.", dataSourceName));
}
return dataSourceMap.get(dataSourceName);
}

@Override
public void addDataSource(DataSourceMetadata... metadatas) {
for (DataSourceMetadata metadata : metadatas) {
validateDataSourceMetaData(metadata);
dataSourceMap.put(
metadata.getName(),
dataSourceFactoryMap.get(metadata.getConnector()).createDataSource(metadata));
}
}

@Override
public void clear() {
dataSourceMap.clear();
}

/**
* This can be moved to a different validator class when we introduce more connectors.
*
* @param metadata {@link DataSourceMetadata}.
*/
private void validateDataSourceMetaData(DataSourceMetadata metadata) {
Preconditions.checkArgument(
!Strings.isNullOrEmpty(metadata.getName()),
"Missing Name Field from a DataSource. Name is a required parameter.");
Preconditions.checkArgument(
!dataSourceMap.containsKey(metadata.getName()),
StringUtils.format(
"Datasource name should be unique, Duplicate datasource found %s.",
metadata.getName()));
Preconditions.checkArgument(
metadata.getName().matches(DATASOURCE_NAME_REGEX),
StringUtils.format(
"DataSource Name: %s contains illegal characters. Allowed characters: a-zA-Z0-9_-*@.",
metadata.getName()));
Preconditions.checkArgument(
!Objects.isNull(metadata.getProperties()),
"Missing properties field in catalog configuration. Properties are required parameters.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.storage.StorageEngine;

/**
* Each user configured datasource mapping to one instance of DataSource per JVM.
*/
@Getter
@RequiredArgsConstructor
@EqualsAndHashCode
public class DataSource {

private final String name;

private final ConnectorType connectorType;
private final DataSourceType connectorType;

@EqualsAndHashCode.Exclude
private final StorageEngine storageEngine;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,44 @@

package org.opensearch.sql.datasource.model;


import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;

import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import org.opensearch.sql.datasource.DataSourceService;

@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
@EqualsAndHashCode
public class DataSourceMetadata {

@JsonProperty(required = true)
private String name;

@JsonProperty(required = true)
@JsonFormat(with = JsonFormat.Feature.ACCEPT_CASE_INSENSITIVE_PROPERTIES)
private ConnectorType connector;
private DataSourceType connector;

@JsonProperty(required = true)
private Map<String, String> properties;

/**
* Default OpenSearch {@link DataSourceMetadata}. Which is used to register default OpenSearch
* {@link DataSource} to {@link DataSourceService}.
*/
public static DataSourceMetadata defaultOpenSearchDataSourceMetadata() {
YANG-DB marked this conversation as resolved.
Show resolved Hide resolved
DataSourceMetadata dataSourceMetadata = new DataSourceMetadata();
dataSourceMetadata.setName(DEFAULT_DATASOURCE_NAME);
dataSourceMetadata.setConnector(DataSourceType.OPENSEARCH);
dataSourceMetadata.setProperties(ImmutableMap.of());
return dataSourceMetadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@

package org.opensearch.sql.datasource.model;

public enum ConnectorType {
public enum DataSourceType {
PROMETHEUS,OPENSEARCH
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.storage;

import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;

/**
* {@link DataSourceFactory} is used to create {@link DataSource} from {@link DataSourceMetadata}.
* Each data source define {@link DataSourceFactory} and register to {@link DataSourceService}.
* {@link DataSourceFactory} is one instance per JVM . Each {@link DataSourceType} mapping to one
* {@link DataSourceFactory}.
*/
public interface DataSourceFactory {
/**
* Get {@link DataSourceType}.
*/
DataSourceType getDataSourceType();

/**
* Create {@link DataSource}.
*/
DataSource createDataSource(DataSourceMetadata metadata);
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import org.opensearch.sql.config.TestConfig;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.ConnectorType;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.exception.ExpressionEvaluationException;
import org.opensearch.sql.expression.DSL;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.expression.env.Environment;
Expand Down Expand Up @@ -144,9 +144,7 @@ protected Environment<Expression, ExprType> typeEnv() {
@Bean
protected Analyzer analyzer(ExpressionAnalyzer expressionAnalyzer,
DataSourceService dataSourceService,
StorageEngine storageEngine,
Table table) {
dataSourceService.registerDefaultOpenSearchDataSource(storageEngine);
BuiltinFunctionRepository functionRepository = BuiltinFunctionRepository.getInstance();
functionRepository.register("prometheus", new FunctionResolver() {

Expand Down Expand Up @@ -195,7 +193,7 @@ private class DefaultDataSourceService implements DataSourceService {

private StorageEngine storageEngine = storageEngine();
private final DataSource dataSource
= new DataSource("prometheus", ConnectorType.PROMETHEUS, storageEngine);
= new DataSource("prometheus", DataSourceType.PROMETHEUS, storageEngine);


@Override
Expand All @@ -209,8 +207,13 @@ public DataSource getDataSource(String dataSourceName) {
}

@Override
public void registerDefaultOpenSearchDataSource(StorageEngine storageEngine) {
this.storageEngine = storageEngine;
public void addDataSource(DataSourceMetadata... metadatas) {
throw new UnsupportedOperationException();
}

@Override
public void clear() {
throw new UnsupportedOperationException();
}
}

Expand Down
Loading