From 014cc97b013deb0e50f843c6a0f1b8ff9b359827 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Fri, 6 Oct 2023 04:00:40 +0000 Subject: [PATCH] Fix Unit tests for FlintIndexReader (#2238) Signed-off-by: Vamsi Manohar (cherry picked from commit 55e8e841c8bf69a924bed2c735fc47554c111ac4) Signed-off-by: github-actions[bot] --- spark/build.gradle | 1 - .../flint/FlintIndexMetadataReaderImpl.java | 15 ++-- .../sql/spark/flint/FlintIndexType.java | 2 +- .../FlintIndexMetadataReaderImplTest.java | 77 +++++++++++++++++-- ...lt_http_logs_size_year_covering_index.json | 32 -------- ...lint_mys3_default_http_logs_cv1_index.json | 41 ++++++++++ ...mys3_default_http_logs_skipping_index.json | 41 ++++++++++ .../flint-index-mappings/npe_mapping.json | 35 +++++++++ 8 files changed, 197 insertions(+), 47 deletions(-) delete mode 100644 spark/src/test/resources/flint-index-mappings/flint_my_glue_default_http_logs_size_year_covering_index.json create mode 100644 spark/src/test/resources/flint-index-mappings/flint_mys3_default_http_logs_cv1_index.json create mode 100644 spark/src/test/resources/flint-index-mappings/flint_mys3_default_http_logs_skipping_index.json create mode 100644 spark/src/test/resources/flint-index-mappings/npe_mapping.json diff --git a/spark/build.gradle b/spark/build.gradle index d506bfbeca..eca0a3ad24 100644 --- a/spark/build.gradle +++ b/spark/build.gradle @@ -92,7 +92,6 @@ jacocoTestCoverageVerification { 'org.opensearch.sql.spark.asyncquery.exceptions.*', 'org.opensearch.sql.spark.dispatcher.model.*', 'org.opensearch.sql.spark.flint.FlintIndexType', - 'org.opensearch.sql.spark.flint.FlintIndexMetadataReaderImpl' ] limit { counter = 'LINE' diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java index 5e1f210d08..d56c57a627 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java @@ -12,6 +12,11 @@ @AllArgsConstructor public class FlintIndexMetadataReaderImpl implements FlintIndexMetadataReader { + protected static final String META_KEY = "_meta"; + protected static final String PROPERTIES_KEY = "properties"; + protected static final String ENV_KEY = "env"; + protected static final String JOB_ID_KEY = "SERVERLESS_EMR_JOB_ID"; + private final Client client; @Override @@ -22,12 +27,12 @@ public String getJobIdFromFlintIndexMetadata(IndexDetails indexDetails) { try { MappingMetadata mappingMetadata = mappingsResponse.mappings().get(indexName); Map mappingSourceMap = mappingMetadata.getSourceAsMap(); - Map metaMap = (Map) mappingSourceMap.get("_meta"); - Map propertiesMap = (Map) metaMap.get("properties"); - Map envMap = (Map) propertiesMap.get("env"); - return (String) envMap.get("SERVERLESS_EMR_JOB_ID"); + Map metaMap = (Map) mappingSourceMap.get(META_KEY); + Map propertiesMap = (Map) metaMap.get(PROPERTIES_KEY); + Map envMap = (Map) propertiesMap.get(ENV_KEY); + return (String) envMap.get(JOB_ID_KEY); } catch (NullPointerException npe) { - throw new IllegalArgumentException("Index doesn't exist"); + throw new IllegalArgumentException("Provided Index doesn't exist"); } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexType.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexType.java index 432370a62f..8922f638e0 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexType.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexType.java @@ -9,7 +9,7 @@ public enum FlintIndexType { SKIPPING("skipping_index"), COVERING("index"), - MATERIALIZED("materialized_view"); + MATERIALIZED_VIEW("materialized_view"); private final String suffix; diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java index 5d89a4e4c6..61fabe142a 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java @@ -12,6 +12,7 @@ import java.util.Map; import lombok.SneakyThrows; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -31,26 +32,86 @@ public class FlintIndexMetadataReaderImplTest { @Mock(answer = RETURNS_DEEP_STUBS) private Client client; - // TODO FIX this @SneakyThrows - // @Test - void testGetJobIdFromFlintIndexMetadata() { + @Test + void testGetJobIdFromFlintSkippingIndexMetadata() { URL url = Resources.getResource( - "flint-index-mappings/flint_my_glue_default_http_logs_size_year_covering_index.json"); + "flint-index-mappings/flint_mys3_default_http_logs_skipping_index.json"); String mappings = Resources.toString(url, Charsets.UTF_8); - String indexName = "flint_my_glue_default_http_logs_size_year_covering_index"; + String indexName = "flint_mys3_default_http_logs_skipping_index"; mockNodeClientIndicesMappings(indexName, mappings); FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client); String jobId = flintIndexMetadataReader.getJobIdFromFlintIndexMetadata( new IndexDetails( - "size_year", - new FullyQualifiedTableName("my_glue.default.http_logs"), + null, + new FullyQualifiedTableName("mys3.default.http_logs"), + false, + true, + FlintIndexType.SKIPPING)); + Assertions.assertEquals("00fdmvv9hp8u0o0q", jobId); + } + + @SneakyThrows + @Test + void testGetJobIdFromFlintCoveringIndexMetadata() { + URL url = + Resources.getResource("flint-index-mappings/flint_mys3_default_http_logs_cv1_index.json"); + String mappings = Resources.toString(url, Charsets.UTF_8); + String indexName = "flint_mys3_default_http_logs_cv1_index"; + mockNodeClientIndicesMappings(indexName, mappings); + FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client); + String jobId = + flintIndexMetadataReader.getJobIdFromFlintIndexMetadata( + new IndexDetails( + "cv1", + new FullyQualifiedTableName("mys3.default.http_logs"), false, true, FlintIndexType.COVERING)); - Assertions.assertEquals("00fdlum58g9g1g0q", jobId); + Assertions.assertEquals("00fdmvv9hp8u0o0q", jobId); + } + + @SneakyThrows + @Test + void testGetJobIDWithNPEException() { + URL url = Resources.getResource("flint-index-mappings/npe_mapping.json"); + String mappings = Resources.toString(url, Charsets.UTF_8); + String indexName = "flint_mys3_default_http_logs_cv1_index"; + mockNodeClientIndicesMappings(indexName, mappings); + FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client); + IllegalArgumentException illegalArgumentException = + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + flintIndexMetadataReader.getJobIdFromFlintIndexMetadata( + new IndexDetails( + "cv1", + new FullyQualifiedTableName("mys3.default.http_logs"), + false, + true, + FlintIndexType.COVERING))); + Assertions.assertEquals("Provided Index doesn't exist", illegalArgumentException.getMessage()); + } + + @SneakyThrows + @Test + void testGetJobIdFromUnsupportedIndex() { + FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client); + UnsupportedOperationException unsupportedOperationException = + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> + flintIndexMetadataReader.getJobIdFromFlintIndexMetadata( + new IndexDetails( + "cv1", + new FullyQualifiedTableName("mys3.default.http_logs"), + false, + true, + FlintIndexType.MATERIALIZED_VIEW))); + Assertions.assertEquals( + "Unsupported Index Type : MATERIALIZED_VIEW", unsupportedOperationException.getMessage()); } @SneakyThrows diff --git a/spark/src/test/resources/flint-index-mappings/flint_my_glue_default_http_logs_size_year_covering_index.json b/spark/src/test/resources/flint-index-mappings/flint_my_glue_default_http_logs_size_year_covering_index.json deleted file mode 100644 index 201aa539bb..0000000000 --- a/spark/src/test/resources/flint-index-mappings/flint_my_glue_default_http_logs_size_year_covering_index.json +++ /dev/null @@ -1,32 +0,0 @@ -{ - "mappings": { - "_meta": { - "kind": "skipping", - "indexedColumns": [ - { - "columnType": "int", - "kind": "VALUE_SET", - "columnName": "status" - } - ], - "name": "flint_mys3_default_http_logs_skipping_index", - "options": {}, - "source": "mys3.default.http_logs", - "version": "0.1.0", - "properties": { - "env": { - "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p", - "SERVERLESS_EMR_JOB_ID": "00fdmvv9hp8u0o0q" - } - } - }, - "properties": { - "file_path": { - "type": "keyword" - }, - "status": { - "type": "integer" - } - } - } -} \ No newline at end of file diff --git a/spark/src/test/resources/flint-index-mappings/flint_mys3_default_http_logs_cv1_index.json b/spark/src/test/resources/flint-index-mappings/flint_mys3_default_http_logs_cv1_index.json new file mode 100644 index 0000000000..e7ca1ff440 --- /dev/null +++ b/spark/src/test/resources/flint-index-mappings/flint_mys3_default_http_logs_cv1_index.json @@ -0,0 +1,41 @@ +{ + "flint_mys3_default_http_logs_cv1_index": { + "mappings": { + "_doc": { + "_meta": { + "kind": "skipping", + "indexedColumns": [ + { + "columnType": "int", + "kind": "VALUE_SET", + "columnName": "status" + } + ], + "name": "flint_mys3_default_http_logs_cv1_index", + "options": {}, + "source": "mys3.default.http_logs", + "version": "0.1.0", + "properties": { + "env": { + "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p", + "SERVERLESS_EMR_JOB_ID": "00fdmvv9hp8u0o0q" + } + } + } + } + }, + "settings": { + "index": { + "number_of_shards": 5, + "number_of_replicas": 0, + "max_result_window": 100, + "version": { + "created": "6050399" + } + } + }, + "mapping_version": "1", + "settings_version": "1", + "aliases_version": "1" + } +} \ No newline at end of file diff --git a/spark/src/test/resources/flint-index-mappings/flint_mys3_default_http_logs_skipping_index.json b/spark/src/test/resources/flint-index-mappings/flint_mys3_default_http_logs_skipping_index.json new file mode 100644 index 0000000000..24e14c12ba --- /dev/null +++ b/spark/src/test/resources/flint-index-mappings/flint_mys3_default_http_logs_skipping_index.json @@ -0,0 +1,41 @@ +{ + "flint_mys3_default_http_logs_skipping_index": { + "mappings": { + "_doc": { + "_meta": { + "kind": "skipping", + "indexedColumns": [ + { + "columnType": "int", + "kind": "VALUE_SET", + "columnName": "status" + } + ], + "name": "flint_mys3_default_http_logs_skipping_index", + "options": {}, + "source": "mys3.default.http_logs", + "version": "0.1.0", + "properties": { + "env": { + "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p", + "SERVERLESS_EMR_JOB_ID": "00fdmvv9hp8u0o0q" + } + } + } + } + }, + "settings": { + "index": { + "number_of_shards": 5, + "number_of_replicas": 0, + "max_result_window": 100, + "version": { + "created": "6050399" + } + } + }, + "mapping_version": "1", + "settings_version": "1", + "aliases_version": "1" + } +} \ No newline at end of file diff --git a/spark/src/test/resources/flint-index-mappings/npe_mapping.json b/spark/src/test/resources/flint-index-mappings/npe_mapping.json new file mode 100644 index 0000000000..ff1d19f99f --- /dev/null +++ b/spark/src/test/resources/flint-index-mappings/npe_mapping.json @@ -0,0 +1,35 @@ +{ + "flint_mys3_default_http_logs_cv1_index": { + "mappings": { + "_doc": { + "_meta": { + "kind": "skipping", + "indexedColumns": [ + { + "columnType": "int", + "kind": "VALUE_SET", + "columnName": "status" + } + ], + "name": "flint_mys3_default_http_logs_cv1_index", + "options": {}, + "source": "mys3.default.http_logs", + "version": "0.1.0" + } + } + }, + "settings": { + "index": { + "number_of_shards": 5, + "number_of_replicas": 0, + "max_result_window": 100, + "version": { + "created": "6050399" + } + } + }, + "mapping_version": "1", + "settings_version": "1", + "aliases_version": "1" + } +} \ No newline at end of file