From 204c7daf106de6d8c915ee653c6765b703ee4551 Mon Sep 17 00:00:00 2001 From: Vamsi Manohar Date: Tue, 16 Apr 2024 12:01:44 -0700 Subject: [PATCH] Refactoring of SparkQueryDispatcher by removing unnecessary class (#2615) Signed-off-by: Vamsi Manohar --- spark/src/main/antlr/SqlBaseLexer.g4 | 3 ++- spark/src/main/antlr/SqlBaseParser.g4 | 20 ++++++++++--------- .../dispatcher/SparkQueryDispatcher.java | 3 --- .../config/AsyncExecutorServiceModule.java | 12 +---------- .../AsyncQueryExecutorServiceSpec.java | 2 -- .../dispatcher/SparkQueryDispatcherTest.java | 3 --- 6 files changed, 14 insertions(+), 29 deletions(-) diff --git a/spark/src/main/antlr/SqlBaseLexer.g4 b/spark/src/main/antlr/SqlBaseLexer.g4 index 7c376e2268..e2b178d34b 100644 --- a/spark/src/main/antlr/SqlBaseLexer.g4 +++ b/spark/src/main/antlr/SqlBaseLexer.g4 @@ -129,6 +129,7 @@ CLUSTER: 'CLUSTER'; CLUSTERED: 'CLUSTERED'; CODEGEN: 'CODEGEN'; COLLATE: 'COLLATE'; +COLLATION: 'COLLATION'; COLLECTION: 'COLLECTION'; COLUMN: 'COLUMN'; COLUMNS: 'COLUMNS'; @@ -554,7 +555,7 @@ BRACKETED_COMMENT ; WS - : [ \r\n\t]+ -> channel(HIDDEN) + : [ \t\n\f\r\u000B\u00A0\u1680\u2000\u2001\u2002\u2003\u2004\u2005\u2006\u2007\u2008\u2009\u200A\u2028\u202F\u205F\u3000]+ -> channel(HIDDEN) ; // Catch-all for anything we can't recognize. diff --git a/spark/src/main/antlr/SqlBaseParser.g4 b/spark/src/main/antlr/SqlBaseParser.g4 index 41a5ec241c..3d00851658 100644 --- a/spark/src/main/antlr/SqlBaseParser.g4 +++ b/spark/src/main/antlr/SqlBaseParser.g4 @@ -76,7 +76,7 @@ statement | ctes? dmlStatementNoWith #dmlStatement | USE identifierReference #use | USE namespace identifierReference #useNamespace - | SET CATALOG (identifier | stringLit) #setCatalog + | SET CATALOG (errorCapturingIdentifier | stringLit) #setCatalog | CREATE namespace (IF NOT EXISTS)? identifierReference (commentSpec | locationSpec | @@ -210,6 +210,7 @@ statement | (MSCK)? REPAIR TABLE identifierReference (option=(ADD|DROP|SYNC) PARTITIONS)? #repairTable | op=(ADD | LIST) identifier .*? #manageResource + | SET COLLATION collationName=identifier #setCollation | SET ROLE .*? #failNativeCommand | SET TIME ZONE interval #setTimeZone | SET TIME ZONE timezone #setTimeZone @@ -392,7 +393,7 @@ describeFuncName ; describeColName - : nameParts+=identifier (DOT nameParts+=identifier)* + : nameParts+=errorCapturingIdentifier (DOT nameParts+=errorCapturingIdentifier)* ; ctes @@ -429,7 +430,7 @@ property ; propertyKey - : identifier (DOT identifier)* + : errorCapturingIdentifier (DOT errorCapturingIdentifier)* | stringLit ; @@ -683,18 +684,18 @@ pivotClause ; pivotColumn - : identifiers+=identifier - | LEFT_PAREN identifiers+=identifier (COMMA identifiers+=identifier)* RIGHT_PAREN + : identifiers+=errorCapturingIdentifier + | LEFT_PAREN identifiers+=errorCapturingIdentifier (COMMA identifiers+=errorCapturingIdentifier)* RIGHT_PAREN ; pivotValue - : expression (AS? identifier)? + : expression (AS? errorCapturingIdentifier)? ; unpivotClause : UNPIVOT nullOperator=unpivotNullClause? LEFT_PAREN operator=unpivotOperator - RIGHT_PAREN (AS? identifier)? + RIGHT_PAREN (AS? errorCapturingIdentifier)? ; unpivotNullClause @@ -736,7 +737,7 @@ unpivotColumn ; unpivotAlias - : AS? identifier + : AS? errorCapturingIdentifier ; lateralView @@ -1188,7 +1189,7 @@ complexColTypeList ; complexColType - : identifier COLON? dataType (NOT NULL)? commentSpec? + : errorCapturingIdentifier COLON? dataType (NOT NULL)? commentSpec? ; whenClause @@ -1662,6 +1663,7 @@ nonReserved | CLUSTERED | CODEGEN | COLLATE + | COLLATION | COLLECTION | COLUMN | COLUMNS diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index 2760b30123..c4f4c74868 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -12,7 +12,6 @@ import org.opensearch.client.Client; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasource.model.DataSourceMetadata; -import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; import org.opensearch.sql.spark.client.EMRServerlessClient; @@ -44,8 +43,6 @@ public class SparkQueryDispatcher { private DataSourceService dataSourceService; - private DataSourceUserAuthorizationHelperImpl dataSourceUserAuthorizationHelper; - private JobExecutionResponseReader jobExecutionResponseReader; private FlintIndexMetadataService flintIndexMetadataService; diff --git a/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java b/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java index 2c86a66fb2..9038870c63 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java +++ b/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java @@ -15,7 +15,6 @@ import org.opensearch.common.inject.Singleton; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasource.DataSourceService; -import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl; import org.opensearch.sql.legacy.metrics.GaugeMetric; import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService; @@ -68,7 +67,6 @@ public StateStore stateStore(NodeClient client, ClusterService clusterService) { public SparkQueryDispatcher sparkQueryDispatcher( EMRServerlessClientFactory emrServerlessClientFactory, DataSourceService dataSourceService, - DataSourceUserAuthorizationHelperImpl dataSourceUserAuthorizationHelper, JobExecutionResponseReader jobExecutionResponseReader, FlintIndexMetadataServiceImpl flintIndexMetadataReader, NodeClient client, @@ -78,7 +76,6 @@ public SparkQueryDispatcher sparkQueryDispatcher( return new SparkQueryDispatcher( emrServerlessClientFactory, dataSourceService, - dataSourceUserAuthorizationHelper, jobExecutionResponseReader, flintIndexMetadataReader, client, @@ -113,8 +110,7 @@ public SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier(Set @Provides @Singleton - public FlintIndexMetadataServiceImpl flintIndexMetadataReader( - NodeClient client, StateStore stateStore) { + public FlintIndexMetadataServiceImpl flintIndexMetadataReader(NodeClient client) { return new FlintIndexMetadataServiceImpl(client); } @@ -123,12 +119,6 @@ public JobExecutionResponseReader jobExecutionResponseReader(NodeClient client) return new JobExecutionResponseReader(client); } - @Provides - public DataSourceUserAuthorizationHelperImpl dataSourceUserAuthorizationHelper( - NodeClient client) { - return new DataSourceUserAuthorizationHelperImpl(client); - } - private void registerStateStoreMetrics(StateStore stateStore) { GaugeMetric activeSessionMetric = new GaugeMetric<>( diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index d1ca50343f..c4cb96391b 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -46,7 +46,6 @@ import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; -import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl; import org.opensearch.sql.datasources.encryptor.EncryptorImpl; import org.opensearch.sql.datasources.glue.GlueDataSourceFactory; import org.opensearch.sql.datasources.service.DataSourceMetadataStorage; @@ -205,7 +204,6 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService( new SparkQueryDispatcher( emrServerlessClientFactory, this.dataSourceService, - new DataSourceUserAuthorizationHelperImpl(client), jobExecutionResponseReader, new FlintIndexMetadataServiceImpl(client), client, diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 00c17a7b59..1f250a0aea 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -58,7 +58,6 @@ import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; -import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; import org.opensearch.sql.spark.client.EMRServerlessClient; @@ -86,7 +85,6 @@ public class SparkQueryDispatcherTest { @Mock private EMRServerlessClientFactory emrServerlessClientFactory; @Mock private DataSourceService dataSourceService; @Mock private JobExecutionResponseReader jobExecutionResponseReader; - @Mock private DataSourceUserAuthorizationHelperImpl dataSourceUserAuthorizationHelper; @Mock private FlintIndexMetadataService flintIndexMetadataService; @Mock(answer = RETURNS_DEEP_STUBS) @@ -116,7 +114,6 @@ void setUp() { new SparkQueryDispatcher( emrServerlessClientFactory, dataSourceService, - dataSourceUserAuthorizationHelper, jobExecutionResponseReader, flintIndexMetadataService, openSearchClient,