diff --git a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java index 6ef3921b39..ae1950d81c 100644 --- a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java +++ b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java @@ -40,11 +40,7 @@ public enum Key { SPARK_EXECUTION_ENGINE_CONFIG("plugins.query.executionengine.spark.config"), CLUSTER_NAME("cluster.name"), SPARK_EXECUTION_SESSION_ENABLED("plugins.query.executionengine.spark.session.enabled"), - SPARK_EXECUTION_SESSION_LIMIT("plugins.query.executionengine.spark.session.limit"), - SESSION_INDEX_TTL("plugins.query.executionengine.spark.session.index.ttl"), - RESULT_INDEX_TTL("plugins.query.executionengine.spark.result.index.ttl"), - AUTO_INDEX_MANAGEMENT_ENABLED( - "plugins.query.executionengine.spark.auto_index_management.enabled"); + SPARK_EXECUTION_SESSION_LIMIT("plugins.query.executionengine.spark.session.limit"); @Getter private final String keyValue; diff --git a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java index 9e47f9b37e..866e9cadef 100644 --- a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java +++ b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java @@ -16,7 +16,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.function.Function; +import lombok.AllArgsConstructor; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; @@ -25,24 +25,11 @@ @Getter @Setter +@AllArgsConstructor @EqualsAndHashCode @JsonIgnoreProperties(ignoreUnknown = true) public class DataSourceMetadata { - public static final String DEFAULT_RESULT_INDEX = "query_execution_result"; - public static final int MAX_RESULT_INDEX_NAME_SIZE = 255; - // OS doesn’t allow uppercase: https://tinyurl.com/yse2xdbx - public static final String RESULT_INDEX_NAME_PATTERN = "[a-z0-9_-]+"; - public static String INVALID_RESULT_INDEX_NAME_SIZE = - "Result index name size must contains less than " - + MAX_RESULT_INDEX_NAME_SIZE - + " characters"; - public static String INVALID_CHAR_IN_RESULT_INDEX_NAME = - "Result index name has invalid character. Valid characters are a-z, 0-9, -(hyphen) and" - + " _(underscore)"; - public static String INVALID_RESULT_INDEX_PREFIX = - "Result index must start with " + DEFAULT_RESULT_INDEX; - @JsonProperty private String name; @JsonProperty private String description; @@ -57,31 +44,18 @@ public class DataSourceMetadata { @JsonProperty private String resultIndex; - public static Function DATASOURCE_TO_RESULT_INDEX = - datasourceName -> String.format("%s_%s", DEFAULT_RESULT_INDEX, datasourceName); - public DataSourceMetadata( String name, - String description, DataSourceType connector, List allowedRoles, Map properties, String resultIndex) { this.name = name; - String errorMessage = validateCustomResultIndex(resultIndex); - if (errorMessage != null) { - throw new IllegalArgumentException(errorMessage); - } - if (resultIndex == null) { - this.resultIndex = fromNameToCustomResultIndex(); - } else { - this.resultIndex = resultIndex; - } - this.connector = connector; - this.description = description; + this.description = StringUtils.EMPTY; this.properties = properties; this.allowedRoles = allowedRoles; + this.resultIndex = resultIndex; } public DataSourceMetadata() { @@ -97,56 +71,9 @@ public DataSourceMetadata() { public static DataSourceMetadata defaultOpenSearchDataSourceMetadata() { return new DataSourceMetadata( DEFAULT_DATASOURCE_NAME, - StringUtils.EMPTY, DataSourceType.OPENSEARCH, Collections.emptyList(), ImmutableMap.of(), null); } - - public String validateCustomResultIndex(String resultIndex) { - if (resultIndex == null) { - return null; - } - if (resultIndex.length() > MAX_RESULT_INDEX_NAME_SIZE) { - return INVALID_RESULT_INDEX_NAME_SIZE; - } - if (!resultIndex.matches(RESULT_INDEX_NAME_PATTERN)) { - return INVALID_CHAR_IN_RESULT_INDEX_NAME; - } - if (resultIndex != null && !resultIndex.startsWith(DEFAULT_RESULT_INDEX)) { - return INVALID_RESULT_INDEX_PREFIX; - } - return null; - } - - /** - * Since we are using datasource name to create result index, we need to make sure that the final - * name is valid - * - * @param resultIndex result index name - * @return valid result index name - */ - private String convertToValidResultIndex(String resultIndex) { - // Limit Length - if (resultIndex.length() > MAX_RESULT_INDEX_NAME_SIZE) { - resultIndex = resultIndex.substring(0, MAX_RESULT_INDEX_NAME_SIZE); - } - - // Pattern Matching: Remove characters that don't match the pattern - StringBuilder validChars = new StringBuilder(); - for (char c : resultIndex.toCharArray()) { - if (String.valueOf(c).matches(RESULT_INDEX_NAME_PATTERN)) { - validChars.append(c); - } - } - return validChars.toString(); - } - - public String fromNameToCustomResultIndex() { - if (name == null) { - throw new IllegalArgumentException("Datasource name cannot be null"); - } - return convertToValidResultIndex(DATASOURCE_TO_RESULT_INDEX.apply(name.toLowerCase())); - } } diff --git a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java index bfd68ee53a..569cdd96f8 100644 --- a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java +++ b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java @@ -19,7 +19,6 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.opensearch.sql.DataSourceSchemaName; import org.opensearch.sql.analysis.symbol.Namespace; @@ -198,7 +197,6 @@ public Set getDataSourceMetadata(boolean isDefaultDataSource ds -> new DataSourceMetadata( ds.getName(), - StringUtils.EMPTY, ds.getConnectorType(), Collections.emptyList(), ImmutableMap.of(), diff --git a/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java b/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java index 0c9449e824..4aefc5521d 100644 --- a/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java @@ -18,7 +18,6 @@ import java.util.LinkedHashMap; import java.util.Set; import java.util.stream.Collectors; -import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -63,7 +62,6 @@ void testIterator() { dataSource -> new DataSourceMetadata( dataSource.getName(), - StringUtils.EMPTY, dataSource.getConnectorType(), Collections.emptyList(), ImmutableMap.of(), diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/service/DataSourceServiceImplTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/service/DataSourceServiceImplTest.java index bf88302833..c62e586dae 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/service/DataSourceServiceImplTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/service/DataSourceServiceImplTest.java @@ -29,7 +29,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -383,7 +382,6 @@ void testRemovalOfAuthorizationInfo() { DataSourceMetadata dataSourceMetadata = new DataSourceMetadata( "testDS", - StringUtils.EMPTY, DataSourceType.PROMETHEUS, Collections.singletonList("prometheus_access"), properties, @@ -409,7 +407,6 @@ void testRemovalOfAuthorizationInfoForAccessKeyAndSecretKye() { DataSourceMetadata dataSourceMetadata = new DataSourceMetadata( "testDS", - StringUtils.EMPTY, DataSourceType.PROMETHEUS, Collections.singletonList("prometheus_access"), properties, @@ -437,7 +434,6 @@ void testRemovalOfAuthorizationInfoForGlueWithRoleARN() { DataSourceMetadata dataSourceMetadata = new DataSourceMetadata( "testGlue", - StringUtils.EMPTY, DataSourceType.S3GLUE, Collections.singletonList("glue_access"), properties, @@ -502,7 +498,6 @@ void testGetRawDataSourceMetadata() { DataSourceMetadata dataSourceMetadata = new DataSourceMetadata( "testDS", - StringUtils.EMPTY, DataSourceType.PROMETHEUS, Collections.singletonList("prometheus_access"), properties, diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/utils/XContentParserUtilsTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/utils/XContentParserUtilsTest.java index 5a1f5e155f..e1e442d12b 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/utils/XContentParserUtilsTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/utils/XContentParserUtilsTest.java @@ -44,7 +44,6 @@ public void testToDataSourceMetadataFromJson() { dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS); dataSourceMetadata.setAllowedRoles(List.of("prometheus_access")); dataSourceMetadata.setProperties(Map.of("prometheus.uri", "https://localhost:9090")); - dataSourceMetadata.setResultIndex("query_execution_result2"); Gson gson = new Gson(); String json = gson.toJson(dataSourceMetadata); diff --git a/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java b/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java index 92c1a4df16..ff36d2a887 100644 --- a/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java @@ -21,7 +21,6 @@ import java.util.List; import java.util.Map; import lombok.SneakyThrows; -import org.apache.commons.lang3.StringUtils; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Test; @@ -104,7 +103,6 @@ public void updateDataSourceAPITest() { DataSourceMetadata createDSM = new DataSourceMetadata( "update_prometheus", - StringUtils.EMPTY, DataSourceType.PROMETHEUS, ImmutableList.of(), ImmutableMap.of("prometheus.uri", "https://localhost:9090"), @@ -118,7 +116,6 @@ public void updateDataSourceAPITest() { DataSourceMetadata updateDSM = new DataSourceMetadata( "update_prometheus", - StringUtils.EMPTY, DataSourceType.PROMETHEUS, ImmutableList.of(), ImmutableMap.of("prometheus.uri", "https://randomtest.com:9090"), @@ -178,7 +175,6 @@ public void deleteDataSourceTest() { DataSourceMetadata createDSM = new DataSourceMetadata( "delete_prometheus", - StringUtils.EMPTY, DataSourceType.PROMETHEUS, ImmutableList.of(), ImmutableMap.of("prometheus.uri", "https://localhost:9090"), @@ -218,7 +214,6 @@ public void getAllDataSourceTest() { DataSourceMetadata createDSM = new DataSourceMetadata( "get_all_prometheus", - StringUtils.EMPTY, DataSourceType.PROMETHEUS, ImmutableList.of(), ImmutableMap.of("prometheus.uri", "https://localhost:9090"), diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/InformationSchemaCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/InformationSchemaCommandIT.java index d916bfc4db..7b694ce222 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/InformationSchemaCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/InformationSchemaCommandIT.java @@ -15,7 +15,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.io.IOException; -import org.apache.commons.lang3.StringUtils; import org.json.JSONObject; import org.junit.After; import org.junit.Assert; @@ -45,7 +44,6 @@ protected void init() throws InterruptedException, IOException { DataSourceMetadata createDSM = new DataSourceMetadata( "my_prometheus", - StringUtils.EMPTY, DataSourceType.PROMETHEUS, ImmutableList.of(), ImmutableMap.of("prometheus.uri", "http://localhost:9090"), diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusDataSourceCommandsIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusDataSourceCommandsIT.java index 10fe13a8db..b81b7f9517 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusDataSourceCommandsIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusDataSourceCommandsIT.java @@ -56,7 +56,6 @@ protected void init() throws InterruptedException, IOException { DataSourceMetadata createDSM = new DataSourceMetadata( "my_prometheus", - StringUtils.EMPTY, DataSourceType.PROMETHEUS, ImmutableList.of(), ImmutableMap.of("prometheus.uri", "http://localhost:9090"), diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/ShowDataSourcesCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/ShowDataSourcesCommandIT.java index b6a34d5c41..c3d2bf5912 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/ShowDataSourcesCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/ShowDataSourcesCommandIT.java @@ -15,7 +15,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.io.IOException; -import org.apache.commons.lang3.StringUtils; import org.json.JSONObject; import org.junit.After; import org.junit.Assert; @@ -45,7 +44,6 @@ protected void init() throws InterruptedException, IOException { DataSourceMetadata createDSM = new DataSourceMetadata( "my_prometheus", - StringUtils.EMPTY, DataSourceType.PROMETHEUS, ImmutableList.of(), ImmutableMap.of("prometheus.uri", "http://localhost:9090"), diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java index d041eb386e..6554ef7f61 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java @@ -6,7 +6,6 @@ package org.opensearch.sql.opensearch.setting; import static org.opensearch.common.settings.Settings.EMPTY; -import static org.opensearch.common.unit.TimeValue.timeValueDays; import static org.opensearch.sql.common.setting.Settings.Key.ENCYRPTION_MASTER_KEY; import com.google.common.annotations.VisibleForTesting; @@ -26,7 +25,6 @@ import org.opensearch.common.settings.SecureSetting; import org.opensearch.common.settings.Setting; import org.opensearch.common.unit.MemorySizeValue; -import org.opensearch.common.unit.TimeValue; import org.opensearch.sql.common.setting.LegacySettings; import org.opensearch.sql.common.setting.Settings; @@ -151,27 +149,6 @@ public class OpenSearchSettings extends Settings { Setting.Property.NodeScope, Setting.Property.Dynamic); - public static final Setting SESSION_INDEX_TTL_SETTING = - Setting.positiveTimeSetting( - Key.SESSION_INDEX_TTL.getKeyValue(), - timeValueDays(14), - Setting.Property.NodeScope, - Setting.Property.Dynamic); - - public static final Setting RESULT_INDEX_TTL_SETTING = - Setting.positiveTimeSetting( - Key.RESULT_INDEX_TTL.getKeyValue(), - timeValueDays(60), - Setting.Property.NodeScope, - Setting.Property.Dynamic); - - public static final Setting AUTO_INDEX_MANAGEMENT_ENABLED_SETTING = - Setting.boolSetting( - Key.AUTO_INDEX_MANAGEMENT_ENABLED.getKeyValue(), - true, - Setting.Property.NodeScope, - Setting.Property.Dynamic); - /** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */ @SuppressWarnings("unchecked") public OpenSearchSettings(ClusterSettings clusterSettings) { @@ -254,24 +231,6 @@ public OpenSearchSettings(ClusterSettings clusterSettings) { Key.SPARK_EXECUTION_SESSION_LIMIT, SPARK_EXECUTION_SESSION_LIMIT_SETTING, new Updater(Key.SPARK_EXECUTION_SESSION_LIMIT)); - register( - settingBuilder, - clusterSettings, - Key.SESSION_INDEX_TTL, - SESSION_INDEX_TTL_SETTING, - new Updater(Key.SESSION_INDEX_TTL)); - register( - settingBuilder, - clusterSettings, - Key.RESULT_INDEX_TTL, - RESULT_INDEX_TTL_SETTING, - new Updater(Key.RESULT_INDEX_TTL)); - register( - settingBuilder, - clusterSettings, - Key.AUTO_INDEX_MANAGEMENT_ENABLED, - AUTO_INDEX_MANAGEMENT_ENABLED_SETTING, - new Updater(Key.AUTO_INDEX_MANAGEMENT_ENABLED)); registerNonDynamicSettings( settingBuilder, clusterSettings, Key.CLUSTER_NAME, ClusterName.CLUSTER_NAME_SETTING); defaultSettings = settingBuilder.build(); @@ -339,9 +298,6 @@ public static List> pluginSettings() { .add(SPARK_EXECUTION_ENGINE_CONFIG) .add(SPARK_EXECUTION_SESSION_ENABLED_SETTING) .add(SPARK_EXECUTION_SESSION_LIMIT_SETTING) - .add(SESSION_INDEX_TTL_SETTING) - .add(RESULT_INDEX_TTL_SETTING) - .add(AUTO_INDEX_MANAGEMENT_ENABLED_SETTING) .build(); } diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 905c697e5b..3d9740d84c 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -15,7 +15,6 @@ import com.google.common.collect.ImmutableSet; import java.security.AccessController; import java.security.PrivilegedAction; -import java.time.Clock; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -90,7 +89,6 @@ import org.opensearch.sql.spark.asyncquery.OpensearchAsyncQueryJobMetadataStorageService; import org.opensearch.sql.spark.client.EMRServerlessClient; import org.opensearch.sql.spark.client.EmrServerlessClientImpl; -import org.opensearch.sql.spark.cluster.ClusterManagerEventListener; import org.opensearch.sql.spark.config.SparkExecutionEngineConfig; import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier; import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplierImpl; @@ -247,18 +245,7 @@ public Collection createComponents( }); injector = modules.createInjector(); - ClusterManagerEventListener clusterManagerEventListener = - new ClusterManagerEventListener( - clusterService, - threadPool, - client, - Clock.systemUTC(), - OpenSearchSettings.SESSION_INDEX_TTL_SETTING, - OpenSearchSettings.RESULT_INDEX_TTL_SETTING, - OpenSearchSettings.AUTO_INDEX_MANAGEMENT_ENABLED_SETTING, - environment.settings()); - return ImmutableList.of( - dataSourceService, asyncQueryExecutorService, clusterManagerEventListener); + return ImmutableList.of(dataSourceService, asyncQueryExecutorService); } @Override diff --git a/spark/build.gradle b/spark/build.gradle index ed91b9820b..8f4388495e 100644 --- a/spark/build.gradle +++ b/spark/build.gradle @@ -122,11 +122,7 @@ jacocoTestCoverageVerification { // ignore because XContext IOException 'org.opensearch.sql.spark.execution.statestore.StateStore', 'org.opensearch.sql.spark.execution.session.SessionModel', - 'org.opensearch.sql.spark.execution.statement.StatementModel', - // TODO: add tests for purging flint indices - 'org.opensearch.sql.spark.cluster.ClusterManagerEventListener*', - 'org.opensearch.sql.spark.cluster.FlintIndexRetention', - 'org.opensearch.sql.spark.cluster.IndexCleanup' + 'org.opensearch.sql.spark.execution.statement.StatementModel' ] limit { counter = 'LINE' diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/EmrClientImpl.java b/spark/src/main/java/org/opensearch/sql/spark/client/EmrClientImpl.java index 87f35bbc1e..4e66cd9a00 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/client/EmrClientImpl.java +++ b/spark/src/main/java/org/opensearch/sql/spark/client/EmrClientImpl.java @@ -5,7 +5,7 @@ package org.opensearch.sql.spark.client; -import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_RESPONSE_BUFFER_INDEX_NAME; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_APPLICATION_JAR; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce; @@ -74,7 +74,7 @@ void runEmrApplication(String query) { flint.getFlintIntegrationJar(), sparkApplicationJar, query, - DEFAULT_RESULT_INDEX, + SPARK_RESPONSE_BUFFER_INDEX_NAME, flint.getFlintHost(), flint.getFlintPort(), flint.getFlintScheme(), diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java index 0da5ae7211..335f3b6fc8 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java +++ b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java @@ -5,7 +5,7 @@ package org.opensearch.sql.spark.client; -import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_RESPONSE_BUFFER_INDEX_NAME; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_APPLICATION_JAR; import com.amazonaws.services.emrserverless.AWSEMRServerless; @@ -36,7 +36,7 @@ public EmrServerlessClientImpl(AWSEMRServerless emrServerless) { public String startJobRun(StartJobRequest startJobRequest) { String resultIndex = startJobRequest.getResultIndex() == null - ? DEFAULT_RESULT_INDEX + ? SPARK_RESPONSE_BUFFER_INDEX_NAME : startJobRequest.getResultIndex(); StartJobRunRequest request = new StartJobRunRequest() diff --git a/spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java b/spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java deleted file mode 100644 index 3d004b548f..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.cluster; - -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_REQUEST_BUFFER_INDEX_NAME; - -import com.google.common.annotations.VisibleForTesting; -import java.time.Clock; -import java.time.Duration; -import java.util.Arrays; -import java.util.List; -import org.opensearch.client.Client; -import org.opensearch.cluster.LocalNodeClusterManagerListener; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.lifecycle.LifecycleListener; -import org.opensearch.common.settings.Setting; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.sql.datasource.model.DataSourceMetadata; -import org.opensearch.threadpool.Scheduler.Cancellable; -import org.opensearch.threadpool.ThreadPool; - -public class ClusterManagerEventListener implements LocalNodeClusterManagerListener { - - private Cancellable flintIndexRetentionCron; - private ClusterService clusterService; - private ThreadPool threadPool; - private Client client; - private Clock clock; - private Duration sessionTtlDuration; - private Duration resultTtlDuration; - private boolean isAutoIndexManagementEnabled; - - public ClusterManagerEventListener( - ClusterService clusterService, - ThreadPool threadPool, - Client client, - Clock clock, - Setting sessionTtl, - Setting resultTtl, - Setting isAutoIndexManagementEnabledSetting, - Settings settings) { - this.clusterService = clusterService; - this.threadPool = threadPool; - this.client = client; - this.clusterService.addLocalNodeClusterManagerListener(this); - this.clock = clock; - - this.sessionTtlDuration = toDuration(sessionTtl.get(settings)); - this.resultTtlDuration = toDuration(resultTtl.get(settings)); - - clusterService - .getClusterSettings() - .addSettingsUpdateConsumer( - sessionTtl, - it -> { - this.sessionTtlDuration = toDuration(it); - cancel(flintIndexRetentionCron); - reInitializeFlintIndexRetention(); - }); - - clusterService - .getClusterSettings() - .addSettingsUpdateConsumer( - resultTtl, - it -> { - this.resultTtlDuration = toDuration(it); - cancel(flintIndexRetentionCron); - reInitializeFlintIndexRetention(); - }); - - isAutoIndexManagementEnabled = isAutoIndexManagementEnabledSetting.get(settings); - clusterService - .getClusterSettings() - .addSettingsUpdateConsumer( - isAutoIndexManagementEnabledSetting, - it -> { - if (isAutoIndexManagementEnabled != it) { - this.isAutoIndexManagementEnabled = it; - if (it) { - onClusterManager(); - } else { - offClusterManager(); - } - } - }); - } - - @Override - public void onClusterManager() { - - if (isAutoIndexManagementEnabled && flintIndexRetentionCron == null) { - reInitializeFlintIndexRetention(); - - clusterService.addLifecycleListener( - new LifecycleListener() { - @Override - public void beforeStop() { - cancel(flintIndexRetentionCron); - flintIndexRetentionCron = null; - } - }); - } - } - - private void reInitializeFlintIndexRetention() { - IndexCleanup indexCleanup = new IndexCleanup(client, clusterService); - flintIndexRetentionCron = - threadPool.scheduleWithFixedDelay( - new FlintIndexRetention( - sessionTtlDuration, - resultTtlDuration, - clock, - indexCleanup, - SPARK_REQUEST_BUFFER_INDEX_NAME + "*", - DataSourceMetadata.DEFAULT_RESULT_INDEX + "*"), - TimeValue.timeValueHours(24), - executorName()); - } - - @Override - public void offClusterManager() { - cancel(flintIndexRetentionCron); - flintIndexRetentionCron = null; - } - - private void cancel(Cancellable cron) { - if (cron != null) { - cron.cancel(); - } - } - - @VisibleForTesting - public List getFlintIndexRetentionCron() { - return Arrays.asList(flintIndexRetentionCron); - } - - private String executorName() { - return ThreadPool.Names.GENERIC; - } - - public static Duration toDuration(TimeValue timeValue) { - return Duration.ofMillis(timeValue.millis()); - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/cluster/FlintIndexRetention.java b/spark/src/main/java/org/opensearch/sql/spark/cluster/FlintIndexRetention.java deleted file mode 100644 index 3ca56ca173..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/cluster/FlintIndexRetention.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.cluster; - -import static org.opensearch.sql.spark.execution.session.SessionModel.LAST_UPDATE_TIME; -import static org.opensearch.sql.spark.execution.statement.StatementModel.SUBMIT_TIME; - -import java.time.Clock; -import java.time.Duration; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.common.CheckedConsumer; -import org.opensearch.common.time.FormatNames; -import org.opensearch.core.action.ActionListener; -import org.opensearch.index.IndexNotFoundException; -import org.opensearch.index.query.QueryBuilders; - -public class FlintIndexRetention implements Runnable { - private static final Logger LOG = LogManager.getLogger(FlintIndexRetention.class); - - static final String SESSION_INDEX_NOT_EXIST_MSG = "Checkpoint index does not exist."; - - static final String RESULT_INDEX_NOT_EXIST_MSG = "Result index does not exist."; - - // timestamp field in result index - static final String UPDATE_TIME_FIELD = "updateTime"; - - private final Duration defaultSessionTtl; - private final Duration defaultResultTtl; - private final Clock clock; - private final IndexCleanup indexCleanup; - private final String sessionIndexNameRegex; - private final String resultIndexNameRegex; - - public FlintIndexRetention( - Duration defaultSessionTtl, - Duration defaultResultTtl, - Clock clock, - IndexCleanup indexCleanup, - String sessionIndexNameRegex, - String resultIndexNameRegex) { - this.defaultSessionTtl = defaultSessionTtl; - this.defaultResultTtl = defaultResultTtl; - this.clock = clock; - this.indexCleanup = indexCleanup; - this.sessionIndexNameRegex = sessionIndexNameRegex; - this.resultIndexNameRegex = resultIndexNameRegex; - } - - @Override - public void run() { - purgeSessionIndex(); - } - - private void purgeSessionIndex() { - purgeIndex( - sessionIndexNameRegex, - defaultSessionTtl, - LAST_UPDATE_TIME, - this::handleSessionPurgeResponse, - this::handleSessionPurgeError); - } - - private void handleSessionPurgeResponse(Long response) { - purgeStatementIndex(); - } - - private void handleSessionPurgeError(Exception exception) { - handlePurgeError(SESSION_INDEX_NOT_EXIST_MSG, "session index", exception); - purgeStatementIndex(); - } - - private void purgeStatementIndex() { - purgeIndex( - sessionIndexNameRegex, - defaultSessionTtl, - SUBMIT_TIME, - this::handleStatementPurgeResponse, - this::handleStatementPurgeError); - } - - private void handleStatementPurgeResponse(Long response) { - purgeResultIndex(); - } - - private void handleStatementPurgeError(Exception exception) { - handlePurgeError(SESSION_INDEX_NOT_EXIST_MSG, "session index", exception); - purgeResultIndex(); - } - - private void purgeResultIndex() { - purgeIndex( - resultIndexNameRegex, - defaultResultTtl, - UPDATE_TIME_FIELD, - this::handleResultPurgeResponse, - this::handleResultPurgeError); - } - - private void handleResultPurgeResponse(Long response) { - LOG.debug("purge result index done"); - } - - private void handleResultPurgeError(Exception exception) { - handlePurgeError(RESULT_INDEX_NOT_EXIST_MSG, "result index", exception); - } - - private void handlePurgeError(String notExistMsg, String indexType, Exception exception) { - if (exception instanceof IndexNotFoundException) { - LOG.debug(notExistMsg); - } else { - LOG.error("delete docs by query fails for " + indexType, exception); - } - } - - private void purgeIndex( - String indexName, - Duration ttl, - String timeStampField, - CheckedConsumer successHandler, - CheckedConsumer errorHandler) { - indexCleanup.deleteDocsByQuery( - indexName, - QueryBuilders.boolQuery() - .filter( - QueryBuilders.rangeQuery(timeStampField) - .lte(clock.millis() - ttl.toMillis()) - .format(FormatNames.EPOCH_MILLIS.getSnakeCaseName())), - ActionListener.wrap( - response -> { - try { - successHandler.accept(response); - } catch (Exception e) { - LOG.error("Error handling response for index " + indexName, e); - } - }, - ex -> { - try { - errorHandler.accept(ex); - } catch (Exception e) { - LOG.error("Error handling error for index " + indexName, e); - } - })); - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/cluster/IndexCleanup.java b/spark/src/main/java/org/opensearch/sql/spark/cluster/IndexCleanup.java deleted file mode 100644 index 562f12b69e..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/cluster/IndexCleanup.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.cluster; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.action.support.IndicesOptions; -import org.opensearch.client.Client; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.core.action.ActionListener; -import org.opensearch.index.query.QueryBuilder; -import org.opensearch.index.reindex.DeleteByQueryAction; -import org.opensearch.index.reindex.DeleteByQueryRequest; - -/** Clean up the old docs for indices. */ -public class IndexCleanup { - private static final Logger LOG = LogManager.getLogger(IndexCleanup.class); - - private final Client client; - private final ClusterService clusterService; - - public IndexCleanup(Client client, ClusterService clusterService) { - this.client = client; - this.clusterService = clusterService; - } - - /** - * Delete docs based on query request - * - * @param indexName index name - * @param queryForDeleteByQueryRequest query request - * @param listener action listener - */ - public void deleteDocsByQuery( - String indexName, QueryBuilder queryForDeleteByQueryRequest, ActionListener listener) { - DeleteByQueryRequest deleteRequest = - new DeleteByQueryRequest(indexName) - .setQuery(queryForDeleteByQueryRequest) - .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) - .setRefresh(true); - - try (ThreadContext.StoredContext context = - client.threadPool().getThreadContext().stashContext()) { - client.execute( - DeleteByQueryAction.INSTANCE, - deleteRequest, - ActionListener.wrap( - response -> { - long deleted = response.getDeleted(); - if (deleted > 0) { - // if 0 docs get deleted, it means our query cannot find any matching doc - // or the index does not exist at all - LOG.info("{} docs are deleted for index:{}", deleted, indexName); - } - listener.onResponse(response.getDeleted()); - }, - listener::onFailure)); - } - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java b/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java index 3a243cb5b3..e8659c680c 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java +++ b/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java @@ -20,6 +20,7 @@ public class SparkConstants { // EMR-S will download JAR to local maven public static final String SPARK_SQL_APPLICATION_JAR = "file:///home/hadoop/.ivy2/jars/org.opensearch_opensearch-spark-sql-application_2.12-0.1.0-SNAPSHOT.jar"; + public static final String SPARK_RESPONSE_BUFFER_INDEX_NAME = ".query_execution_result"; public static final String SPARK_REQUEST_BUFFER_INDEX_NAME = ".query_execution_request"; // TODO should be replaced with mvn jar. public static final String FLINT_INTEGRATION_JAR = diff --git a/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java b/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java index e4773310f0..2614992463 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java +++ b/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java @@ -5,9 +5,9 @@ package org.opensearch.sql.spark.response; -import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; import static org.opensearch.sql.spark.data.constants.SparkConstants.DATA_FIELD; import static org.opensearch.sql.spark.data.constants.SparkConstants.JOB_ID_FIELD; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_RESPONSE_BUFFER_INDEX_NAME; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -45,7 +45,7 @@ public JSONObject getResultWithQueryId(String queryId, String resultIndex) { private JSONObject searchInSparkIndex(QueryBuilder query, String resultIndex) { SearchRequest searchRequest = new SearchRequest(); - String searchResultIndex = resultIndex == null ? DEFAULT_RESULT_INDEX : resultIndex; + String searchResultIndex = resultIndex == null ? SPARK_RESPONSE_BUFFER_INDEX_NAME : resultIndex; searchRequest.indices(searchResultIndex); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(query); diff --git a/spark/src/main/java/org/opensearch/sql/spark/response/SparkResponse.java b/spark/src/main/java/org/opensearch/sql/spark/response/SparkResponse.java index e225804043..496caba2c9 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/response/SparkResponse.java +++ b/spark/src/main/java/org/opensearch/sql/spark/response/SparkResponse.java @@ -5,7 +5,7 @@ package org.opensearch.sql.spark.response; -import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_RESPONSE_BUFFER_INDEX_NAME; import com.google.common.annotations.VisibleForTesting; import lombok.Data; @@ -51,7 +51,7 @@ public JSONObject getResultFromOpensearchIndex() { private JSONObject searchInSparkIndex(QueryBuilder query) { SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices(DEFAULT_RESULT_INDEX); + searchRequest.indices(SPARK_RESPONSE_BUFFER_INDEX_NAME); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(query); searchRequest.source(searchSourceBuilder); @@ -65,7 +65,7 @@ private JSONObject searchInSparkIndex(QueryBuilder query) { if (searchResponse.status().getStatus() != 200) { throw new RuntimeException( "Fetching result from " - + DEFAULT_RESULT_INDEX + + SPARK_RESPONSE_BUFFER_INDEX_NAME + " index failed with status : " + searchResponse.status()); } else { @@ -80,7 +80,7 @@ private JSONObject searchInSparkIndex(QueryBuilder query) { @VisibleForTesting void deleteInSparkIndex(String id) { - DeleteRequest deleteRequest = new DeleteRequest(DEFAULT_RESULT_INDEX); + DeleteRequest deleteRequest = new DeleteRequest(SPARK_RESPONSE_BUFFER_INDEX_NAME); deleteRequest.id(id); ActionFuture deleteResponseActionFuture; try { diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java index 5b04c8f7ea..39ec132442 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java @@ -11,6 +11,7 @@ import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_SESSION_ID; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_SESSION_CLASS_NAME; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_REQUEST_BUFFER_INDEX_NAME; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_RESPONSE_BUFFER_INDEX_NAME; import static org.opensearch.sql.spark.execution.session.SessionModel.SESSION_DOC_TYPE; import static org.opensearch.sql.spark.execution.statement.StatementModel.SESSION_ID; import static org.opensearch.sql.spark.execution.statement.StatementModel.STATEMENT_DOC_TYPE; @@ -28,7 +29,6 @@ import com.google.common.collect.ImmutableSet; import java.util.*; import lombok.Getter; -import org.apache.commons.lang3.StringUtils; import org.junit.After; import org.junit.Before; import org.junit.Ignore; @@ -112,10 +112,9 @@ public void setup() { clusterSettings = clusterService.getClusterSettings(); pluginSettings = new OpenSearchSettings(clusterSettings); dataSourceService = createDataSourceService(); - DataSourceMetadata dataSourceMetadata = + dataSourceService.createDataSource( new DataSourceMetadata( DATASOURCE, - StringUtils.EMPTY, DataSourceType.S3GLUE, ImmutableList.of(), ImmutableMap.of( @@ -127,10 +126,9 @@ public void setup() { "http://localhost:9200", "glue.indexstore.opensearch.auth", "noauth"), - null); - dataSourceService.createDataSource(dataSourceMetadata); + null)); stateStore = new StateStore(client, clusterService); - createIndex(dataSourceMetadata.fromNameToCustomResultIndex()); + createIndex(SPARK_RESPONSE_BUFFER_INDEX_NAME); } @After @@ -341,12 +339,7 @@ public void datasourceWithBasicAuth() { dataSourceService.createDataSource( new DataSourceMetadata( - "mybasicauth", - StringUtils.EMPTY, - DataSourceType.S3GLUE, - ImmutableList.of(), - properties, - null)); + "mybasicauth", DataSourceType.S3GLUE, ImmutableList.of(), properties, null)); LocalEMRSClient emrsClient = new LocalEMRSClient(); AsyncQueryExecutorService asyncQueryExecutorService = createAsyncQueryExecutorService(emrsClient); @@ -383,7 +376,7 @@ public void withSessionCreateAsyncQueryFailed() { assertTrue(statementModel.isPresent()); assertEquals(StatementState.WAITING, statementModel.get().getStatementState()); - // 2. fetch async query result. not result write to DEFAULT_RESULT_INDEX yet. + // 2. fetch async query result. not result write to SPARK_RESPONSE_BUFFER_INDEX_NAME yet. // mock failed statement. StatementModel submitted = statementModel.get(); StatementModel mocked = @@ -503,7 +496,6 @@ public void datasourceNameIncludeUppercase() { dataSourceService.createDataSource( new DataSourceMetadata( "TESTS3", - StringUtils.EMPTY, DataSourceType.S3GLUE, ImmutableList.of(), ImmutableMap.of( diff --git a/spark/src/test/java/org/opensearch/sql/spark/response/AsyncQueryExecutionResponseReaderTest.java b/spark/src/test/java/org/opensearch/sql/spark/response/AsyncQueryExecutionResponseReaderTest.java index bbaf6f0f59..fefc951dd7 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/response/AsyncQueryExecutionResponseReaderTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/response/AsyncQueryExecutionResponseReaderTest.java @@ -10,8 +10,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; -import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; import static org.opensearch.sql.spark.constants.TestConstants.EMR_JOB_ID; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_RESPONSE_BUFFER_INDEX_NAME; import java.util.Map; import org.apache.lucene.search.TotalHits; @@ -79,7 +79,7 @@ public void testInvalidSearchResponse() { () -> jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, null)); Assertions.assertEquals( "Fetching result from " - + DEFAULT_RESULT_INDEX + + SPARK_RESPONSE_BUFFER_INDEX_NAME + " index failed with status : " + RestStatus.NO_CONTENT, exception.getMessage()); diff --git a/spark/src/test/java/org/opensearch/sql/spark/response/SparkResponseTest.java b/spark/src/test/java/org/opensearch/sql/spark/response/SparkResponseTest.java index bad26a2792..e234454021 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/response/SparkResponseTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/response/SparkResponseTest.java @@ -9,8 +9,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; -import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; import static org.opensearch.sql.spark.constants.TestConstants.EMR_CLUSTER_ID; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_RESPONSE_BUFFER_INDEX_NAME; import java.util.Map; import org.apache.lucene.search.TotalHits; @@ -69,7 +69,7 @@ public void testInvalidSearchResponse() { assertThrows(RuntimeException.class, () -> sparkResponse.getResultFromOpensearchIndex()); Assertions.assertEquals( "Fetching result from " - + DEFAULT_RESULT_INDEX + + SPARK_RESPONSE_BUFFER_INDEX_NAME + " index failed with status : " + RestStatus.NO_CONTENT, exception.getMessage());