Skip to content

Commit

Permalink
[Backport 2.x] Move classes from spark to async-query-core and async-…
Browse files Browse the repository at this point in the history
…query (#2737) (#2750)

* Move classes from spark to async-query-core and async-query

Signed-off-by: Tomoyuki Morita <[email protected]>

(cherry picked from commit d5c2fed)

* Fix build.gradle

Signed-off-by: Tomoyuki Morita <[email protected]>
(cherry picked from commit 61091c1)

* Adjust build.gradle

Signed-off-by: Tomoyuki Morita <[email protected]>
(cherry picked from commit ebb07ef)

* Fix copyrights

Signed-off-by: Tomoyuki Morita <[email protected]>
(cherry picked from commit 084a3c8)
  • Loading branch information
ykmr1224 authored Jun 13, 2024
1 parent f1523d5 commit c233ada
Show file tree
Hide file tree
Showing 214 changed files with 277 additions and 170 deletions.
29 changes: 21 additions & 8 deletions async-query-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ tasks.register('downloadG4Files', Exec) {
}

generateGrammarSource {
arguments += ['-visitor', '-package', 'org.opensearch.sql.asyncquery.antlr.parser']
arguments += ['-visitor', '-package', 'org.opensearch.sql.spark.antlr.parser']
source = sourceSets.main.antlr
outputDirectory = file("build/generated-src/antlr/main/org/opensearch/sql/asyncquery/antlr/parser")
}
Expand All @@ -44,17 +44,18 @@ generateGrammarSource.dependsOn downloadG4Files
dependencies {
antlr "org.antlr:antlr4:4.7.1"

implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: "${versions.jackson}"
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: "${versions.jackson_databind}"
implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-cbor', version: "${versions.jackson}"
implementation project(':core')
implementation project(':spark') // TODO: dependency to spark should be eliminated
implementation project(':datasources') // TODO: dependency to datasources should be eliminated
implementation project(':legacy') // TODO: dependency to legacy should be eliminated
implementation 'org.json:json:20231013'
implementation 'com.google.code.gson:gson:2.8.9'

testImplementation(platform("org.junit:junit-bom:5.9.3"))

testCompileOnly('org.junit.jupiter:junit-jupiter')
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.7.0'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '5.7.0'
testImplementation 'org.mockito:mockito-core:5.7.0'
testImplementation 'org.mockito:mockito-junit-jupiter:5.7.0'

testCompileOnly('junit:junit:4.13.1') {
exclude group: 'org.hamcrest', module: 'hamcrest-core'
Expand Down Expand Up @@ -108,7 +109,19 @@ jacocoTestCoverageVerification {
violationRules {
rule {
element = 'CLASS'
excludes = []
// TODO: Add unit tests in async-query-core and remove exclusions
excludes = [
'org.opensearch.sql.spark.asyncquery.model.*',
'org.opensearch.sql.spark.data.constants.*',
'org.opensearch.sql.spark.dispatcher.model.*',
'org.opensearch.sql.spark.dispatcher.*',
'org.opensearch.sql.spark.execution.session.*',
'org.opensearch.sql.spark.execution.statement.*',
'org.opensearch.sql.spark.flint.*',
'org.opensearch.sql.spark.flint.operation.*',
'org.opensearch.sql.spark.rest.*',
'org.opensearch.sql.spark.utils.SQLQueryUtils.*'
]
limit {
counter = 'LINE'
minimum = 1.0
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
package org.opensearch.sql.spark.data.constants;

public class SparkConstants {
public static final String EMR = "emr";
public static final String STEP_ID_FIELD = "stepId.keyword";

public static final String JOB_ID_FIELD = "jobRunId";

Expand All @@ -21,16 +19,11 @@ public class SparkConstants {
public static final String SPARK_SQL_APPLICATION_JAR =
"file:///home/hadoop/.ivy2/jars/org.opensearch_opensearch-spark-sql-application_2.12-0.3.0-SNAPSHOT.jar";
public static final String SPARK_REQUEST_BUFFER_INDEX_NAME = ".query_execution_request";
// TODO should be replaced with mvn jar.
public static final String FLINT_INTEGRATION_JAR =
"s3://spark-datasource/flint-spark-integration-assembly-0.3.0-SNAPSHOT.jar";
// TODO should be replaced with mvn jar.
public static final String FLINT_DEFAULT_CLUSTER_NAME = "opensearch-cluster";
public static final String FLINT_DEFAULT_HOST = "localhost";
public static final String FLINT_DEFAULT_PORT = "9200";
public static final String FLINT_DEFAULT_SCHEME = "http";
public static final String FLINT_DEFAULT_AUTH = "noauth";
public static final String FLINT_DEFAULT_REGION = "us-west-2";
public static final String DEFAULT_CLASS_NAME = "org.apache.spark.sql.FlintJob";
public static final String S3_AWS_CREDENTIALS_PROVIDER_KEY =
"spark.hadoop.fs.s3.customAWSCredentialsProvider";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.config.OpenSearchSparkSubmitParameterModifier;
import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfig;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier;
import org.opensearch.sql.spark.config.SparkSubmitParameterModifier;
Expand Down Expand Up @@ -114,8 +114,10 @@ void testCreateAsyncQuery() {

@Test
void testCreateAsyncQueryWithExtraSparkSubmitParameter() {
OpenSearchSparkSubmitParameterModifier modifier =
new OpenSearchSparkSubmitParameterModifier("--conf spark.dynamicAllocation.enabled=false");
SparkSubmitParameterModifier modifier =
(SparkSubmitParameters parameters) -> {
parameters.setExtraParameters("--conf spark.dynamicAllocation.enabled=false");
};
when(sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig(any()))
.thenReturn(
SparkExecutionEngineConfig.builder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.constants;

public class TestConstants {
public static final String QUERY = "select 1";
public static final String EMR_JOB_ID = "job-123xxx";
public static final String EMRS_APPLICATION_ID = "app-xxxxx";
public static final String EMRS_EXECUTION_ROLE = "execution_role";
public static final String EMRS_JOB_NAME = "job_name";
public static final String SPARK_SUBMIT_PARAMETERS = "--conf org.flint.sql.SQLJob";
public static final String TEST_CLUSTER_NAME = "TEST_CLUSTER";
public static final String MOCK_SESSION_ID = "s-0123456";
public static final String MOCK_STATEMENT_ID = "st-0123456";
public static final String ENTRY_POINT_START_JAR =
"file:///home/hadoop/.ivy2/jars/org.opensearch_opensearch-spark-sql-application_2.12-0.3.0-SNAPSHOT.jar";
public static final String DEFAULT_RESULT_INDEX = "query_execution_result_ds1";
public static final String US_EAST_REGION = "us-east-1";
public static final String US_WEST_REGION = "us-west-1";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.leasemanager;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.api.Test;

class ConcurrencyLimitExceededExceptionTest {
@Test
public void test() {
ConcurrencyLimitExceededException e = new ConcurrencyLimitExceededException("Test");

assertEquals("Test", e.getMessage());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.utils;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import org.junit.jupiter.api.Test;

class IDUtilsTest {
public static final String DATASOURCE_NAME = "DATASOURCE_NAME";

@Test
public void encodeAndDecode() {
String id = IDUtils.encode(DATASOURCE_NAME);
String decoded = IDUtils.decode(id);

assertTrue(id.length() > IDUtils.PREFIX_LEN);
assertEquals(DATASOURCE_NAME, decoded);
}

@Test
public void generateUniqueIds() {
String id1 = IDUtils.encode(DATASOURCE_NAME);
String id2 = IDUtils.encode(DATASOURCE_NAME);

assertNotEquals(id1, id2);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.utils;

import static org.junit.jupiter.api.Assertions.assertTrue;

import org.junit.jupiter.api.Test;

class RealTimeProviderTest {
@Test
public void testCurrentEpochMillis() {
RealTimeProvider realTimeProvider = new RealTimeProvider();

assertTrue(realTimeProvider.currentEpochMillis() > 0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.utils;

import java.io.IOException;
import java.util.Objects;

public class TestUtils {
public static String getJson(String filename) throws IOException {
ClassLoader classLoader = TestUtils.class.getClassLoader();
return new String(
Objects.requireNonNull(classLoader.getResourceAsStream(filename)).readAllBytes());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mock-maker-inline
12 changes: 12 additions & 0 deletions async-query-core/src/test/resources/select_query_response.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"data": {
"result": [
"{'1':1}"
],
"schema": [
"{'column_name':'1','data_type':'integer'}"
],
"stepId": "s-123456789",
"applicationId": "application-abc"
}
}
21 changes: 6 additions & 15 deletions async-query/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ repositories {

dependencies {
api project(':core')
implementation project(':async-query-core')
api project(':async-query-core')
implementation project(':protocol')
implementation project(':datasources')
implementation project(':legacy')
Expand Down Expand Up @@ -91,22 +91,13 @@ jacocoTestCoverageVerification {
rule {
element = 'CLASS'
excludes = [
'org.opensearch.sql.spark.data.constants.*',
'org.opensearch.sql.spark.rest.*',
'org.opensearch.sql.spark.transport.model.*',
'org.opensearch.sql.spark.asyncquery.model.*',
'org.opensearch.sql.spark.asyncquery.exceptions.*',
'org.opensearch.sql.spark.dispatcher.model.*',
'org.opensearch.sql.spark.flint.FlintIndexType',
// ignore because XContext IOException
'org.opensearch.sql.spark.execution.statestore.StateStore',
'org.opensearch.sql.spark.execution.session.SessionModel',
'org.opensearch.sql.spark.execution.statement.StatementModel',
'org.opensearch.sql.spark.flint.FlintIndexStateModel',
// TODO: add tests for purging flint indices
'org.opensearch.sql.spark.cluster.ClusterManagerEventListener*',
'org.opensearch.sql.spark.cluster.FlintIndexRetention',
'org.opensearch.sql.spark.cluster.IndexCleanup'
'org.opensearch.sql.spark.cluster.IndexCleanup',
// ignore because XContext IOException
'org.opensearch.sql.spark.execution.statestore.StateStore',
'org.opensearch.sql.spark.rest.*',
'org.opensearch.sql.spark.transport.model.*'
]
limit {
counter = 'LINE'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.config;

import lombok.AllArgsConstructor;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.config;

import static org.opensearch.sql.common.setting.Settings.Key.CLUSTER_NAME;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.transport;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.transport.model;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.transport.model;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.transport.model;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.transport.model;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.transport.model;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.transport.model;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.asyncquery;

import com.amazonaws.services.emrserverless.model.CancelJobRunResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.spark.cluster;

import static org.opensearch.sql.datasource.model.DataSourceStatus.DISABLED;
import static org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorServiceSpec.MYGLUE_DATASOURCE;

import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import com.amazonaws.services.emrserverless.model.JobRun;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.config;

import static org.mockito.Mockito.when;
Expand Down
Loading

0 comments on commit c233ada

Please sign in to comment.