From 15c990e4dd710f3a23332a763e03418f1f8b91f5 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 18 Jan 2024 09:21:09 -0800 Subject: [PATCH 1/4] remove-refresh-mode-in-refresh-index-api Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSpark.scala | 5 +++-- .../FlintSparkCoveringIndexAstBuilder.scala | 4 ++-- ...FlintSparkMaterializedViewAstBuilder.scala | 4 ++-- .../FlintSparkSkippingIndexAstBuilder.scala | 4 ++-- .../FlintSparkCoveringIndexITSuite.scala | 5 ++--- .../spark/FlintSparkIndexMonitorITSuite.scala | 3 +-- .../FlintSparkMaterializedViewITSuite.scala | 5 ++--- .../FlintSparkSkippingIndexITSuite.scala | 21 +++++++++---------- .../spark/FlintSparkTransactionITSuite.scala | 5 ++--- 9 files changed, 26 insertions(+), 30 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 122fea601..43b83d4cd 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -135,10 +135,11 @@ class FlintSpark(val spark: SparkSession) extends Logging { * @return * refreshing job ID (empty if batch job for now) */ - def refreshIndex(indexName: String, mode: RefreshMode): Option[String] = { - logInfo(s"Refreshing Flint index $indexName with mode $mode") + def refreshIndex(indexName: String): Option[String] = { + logInfo(s"Refreshing Flint index $indexName") val index = describeIndex(indexName) .getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist")) + val mode = if (index.options.autoRefresh()) INCREMENTAL else FULL try { flintClient diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala index 9b4816e71..eae401a69 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala @@ -53,7 +53,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A // Trigger auto refresh if enabled if (indexOptions.autoRefresh()) { val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName) - flint.refreshIndex(flintIndexName, RefreshMode.INCREMENTAL) + flint.refreshIndex(flintIndexName) } Seq.empty } @@ -63,7 +63,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A ctx: RefreshCoveringIndexStatementContext): Command = { FlintSparkSqlCommand() { flint => val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName) - flint.refreshIndex(flintIndexName, RefreshMode.FULL) + flint.refreshIndex(flintIndexName) Seq.empty } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala index 3ab164023..a67803a18 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala @@ -46,7 +46,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito // Trigger auto refresh if enabled if (indexOptions.autoRefresh()) { val flintIndexName = getFlintIndexName(flint, ctx.mvName) - flint.refreshIndex(flintIndexName, RefreshMode.INCREMENTAL) + flint.refreshIndex(flintIndexName) } Seq.empty } @@ -56,7 +56,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito ctx: RefreshMaterializedViewStatementContext): Command = { FlintSparkSqlCommand() { flint => val flintIndexName = getFlintIndexName(flint, ctx.mvName) - flint.refreshIndex(flintIndexName, RefreshMode.FULL) + flint.refreshIndex(flintIndexName) Seq.empty } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala index 46cf7eebd..893c2b127 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala @@ -59,7 +59,7 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A // Trigger auto refresh if enabled if (indexOptions.autoRefresh()) { val indexName = getSkippingIndexName(flint, ctx.tableName) - flint.refreshIndex(indexName, RefreshMode.INCREMENTAL) + flint.refreshIndex(indexName) } Seq.empty } @@ -68,7 +68,7 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A ctx: RefreshSkippingIndexStatementContext): Command = FlintSparkSqlCommand() { flint => val indexName = getSkippingIndexName(flint, ctx.tableName) - flint.refreshIndex(indexName, RefreshMode.FULL) + flint.refreshIndex(indexName) Seq.empty } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index c79069b9b..0d745bbb2 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -7,7 +7,6 @@ package org.opensearch.flint.spark import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson import org.opensearch.flint.core.FlintVersion.current -import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL} import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper @@ -85,7 +84,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { .addIndexColumns("name", "age") .create() - flint.refreshIndex(testFlintIndex, FULL) + flint.refreshIndex(testFlintIndex) val indexData = flint.queryIndex(testFlintIndex) checkAnswer(indexData, Seq(Row("Hello", 30), Row("World", 25))) @@ -99,7 +98,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { .addIndexColumns("name", "age") .create() - val jobId = flint.refreshIndex(testFlintIndex, INCREMENTAL) + val jobId = flint.refreshIndex(testFlintIndex) jobId shouldBe defined val job = spark.streams.get(jobId.get) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala index 4af147939..5204d21cc 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala @@ -16,7 +16,6 @@ import org.opensearch.action.admin.indices.delete.DeleteIndexRequest import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest import org.opensearch.client.RequestOptions import org.opensearch.flint.OpenSearchTransactionSuite -import org.opensearch.flint.spark.FlintSpark.RefreshMode._ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.scalatest.matchers.should.Matchers @@ -51,7 +50,7 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc .addValueSet("name") .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) .create() - flint.refreshIndex(testFlintIndex, INCREMENTAL) + flint.refreshIndex(testFlintIndex) // Wait for refresh complete and another 5 seconds to make sure monitor thread start val jobId = spark.streams.active.find(_.name == testFlintIndex).get.id.toString diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index 6bc85a241..d95e1b5b1 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -9,7 +9,6 @@ import java.sql.Timestamp import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson import org.opensearch.flint.core.FlintVersion.current -import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL} import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper @@ -99,7 +98,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { .query(testQuery) .create() - flint.refreshIndex(testFlintIndex, FULL) + flint.refreshIndex(testFlintIndex) val indexData = flint.queryIndex(testFlintIndex) checkAnswer( @@ -209,7 +208,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { .create() flint - .refreshIndex(testFlintIndex, INCREMENTAL) + .refreshIndex(testFlintIndex) .map(awaitStreamingComplete) .orElse(throw new RuntimeException) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 9cb4affec..9e9bf6d86 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -8,7 +8,6 @@ package org.opensearch.flint.spark import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson import org.json4s.native.JsonMethods._ import org.opensearch.flint.core.FlintVersion.current -import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL} import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN import org.opensearch.flint.spark.skipping.FlintSparkSkippingFileIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName @@ -147,7 +146,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { .onTable(testTable) .addPartitions("year") .create() - flint.refreshIndex(testIndex, FULL) + flint.refreshIndex(testIndex) val indexData = flint.queryIndex(testIndex) indexData.columns should not contain ID_COLUMN @@ -161,7 +160,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { .addPartitions("year", "month") .create() - val jobId = flint.refreshIndex(testIndex, FULL) + val jobId = flint.refreshIndex(testIndex) jobId shouldBe empty val indexData = flint.queryIndex(testIndex).collect().toSet @@ -176,7 +175,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { .addPartitions("year", "month") .create() - val jobId = flint.refreshIndex(testIndex, INCREMENTAL) + val jobId = flint.refreshIndex(testIndex) jobId shouldBe defined val job = spark.streams.get(jobId.get) @@ -195,14 +194,14 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { .addPartitions("year", "month") .create() - val jobId = flint.refreshIndex(testIndex, INCREMENTAL) + val jobId = flint.refreshIndex(testIndex) val job = spark.streams.get(jobId.get) failAfter(streamingTimeout) { job.processAllAvailable() } assertThrows[IllegalStateException] { - flint.refreshIndex(testIndex, FULL) + flint.refreshIndex(testIndex) } } @@ -254,7 +253,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { .onTable(testTable) .addPartitions("year", "month") .create() - flint.refreshIndex(testIndex, FULL) + flint.refreshIndex(testIndex) // Assert index data checkAnswer( @@ -282,7 +281,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { .onTable(testTable) .addValueSet("address") .create() - flint.refreshIndex(testIndex, FULL) + flint.refreshIndex(testIndex) // Assert index data checkAnswer( @@ -315,7 +314,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { .onTable(testTable) .addMinMax("age") .create() - flint.refreshIndex(testIndex, FULL) + flint.refreshIndex(testIndex) // Assert index data checkAnswer( @@ -388,7 +387,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { .onTable(testTable) .addPartitions("month") .create() - flint.refreshIndex(testIndex, FULL) + flint.refreshIndex(testIndex) // Generate a new source file which is not in index data sql(s""" @@ -634,7 +633,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { .addValueSet("varchar_col") .addValueSet("char_col") .create() - flint.refreshIndex(testIndex, FULL) + flint.refreshIndex(testIndex) val query = sql(s""" | SELECT varchar_col, char_col diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala index 643a35516..534d733c5 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala @@ -16,7 +16,6 @@ import org.opensearch.client.indices.GetIndexRequest import org.opensearch.flint.OpenSearchTransactionSuite import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.DELETED -import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL} import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.scalatest.matchers.should.Matchers @@ -77,7 +76,7 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match .onTable(testTable) .addPartitions("year", "month") .create() - flint.refreshIndex(testFlintIndex, FULL) + flint.refreshIndex(testFlintIndex) val latest = latestLogEntry(testLatestId) latest should contain("state" -> "active") @@ -91,7 +90,7 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match .addPartitions("year", "month") .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) .create() - flint.refreshIndex(testFlintIndex, INCREMENTAL) + flint.refreshIndex(testFlintIndex) // Job start time should be assigned var latest = latestLogEntry(testLatestId) From 2d7b1747ffd2c10cfc9e4e98f7bf6cd0136b5868 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 18 Jan 2024 10:30:49 -0800 Subject: [PATCH 2/4] Rename refresh mode to manual and auto Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSpark.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 43b83d4cd..d6e83a47e 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -12,7 +12,7 @@ import org.json4s.native.Serialization import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY -import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, RefreshMode} +import org.opensearch.flint.spark.FlintSpark.RefreshMode.{AUTO, MANUAL, RefreshMode} import org.opensearch.flint.spark.FlintSparkIndex.{ID_COLUMN, StreamingRefresh} import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.mv.FlintSparkMaterializedView @@ -139,7 +139,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { logInfo(s"Refreshing Flint index $indexName") val index = describeIndex(indexName) .getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist")) - val mode = if (index.options.autoRefresh()) INCREMENTAL else FULL + val mode = if (index.options.autoRefresh()) AUTO else MANUAL try { flintClient @@ -149,7 +149,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { latest.copy(state = REFRESHING, createTime = System.currentTimeMillis())) .finalLog(latest => { // Change state to active if full, otherwise update index state regularly - if (mode == FULL) { + if (mode == MANUAL) { logInfo("Updating index state to active") latest.copy(state = ACTIVE) } else { @@ -293,7 +293,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { flintIndexMonitor.startMonitor(indexName) latest.copy(state = REFRESHING) }) - .commit(_ => doRefreshIndex(index.get, indexName, INCREMENTAL)) + .commit(_ => doRefreshIndex(index.get, indexName, AUTO)) logInfo("Recovery complete") true @@ -344,17 +344,17 @@ class FlintSpark(val spark: SparkSession) extends Logging { } val jobId = mode match { - case FULL if isIncrementalRefreshing(indexName) => + case MANUAL if isIncrementalRefreshing(indexName) => throw new IllegalStateException( s"Index $indexName is incremental refreshing and cannot be manual refreshed") - case FULL => + case MANUAL => logInfo("Start refreshing index in batch style") batchRefresh() None // Flint index has specialized logic and capability for incremental refresh - case INCREMENTAL if index.isInstanceOf[StreamingRefresh] => + case AUTO if index.isInstanceOf[StreamingRefresh] => logInfo("Start refreshing index in streaming style") val job = index @@ -369,7 +369,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { Some(job.id.toString) // Otherwise, fall back to foreachBatch + batch refresh - case INCREMENTAL => + case AUTO => logInfo("Start refreshing index in foreach streaming style") val job = spark.readStream .options(options.extraSourceOptions(tableName)) @@ -439,6 +439,6 @@ object FlintSpark { */ object RefreshMode extends Enumeration { type RefreshMode = Value - val FULL, INCREMENTAL = Value + val MANUAL, AUTO = Value } } From 9cabe09095bcef6da20ef1063e09b7ace699fca4 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 18 Jan 2024 11:09:15 -0800 Subject: [PATCH 3/4] Fix broken IT and update user manual Signed-off-by: Chen Dai --- docs/index.md | 4 ---- .../opensearch/flint/spark/FlintSpark.scala | 7 ------- .../FlintSparkCoveringIndexITSuite.scala | 1 + .../FlintSparkSkippingIndexITSuite.scala | 19 +------------------ 4 files changed, 2 insertions(+), 29 deletions(-) diff --git a/docs/index.md b/docs/index.md index 88c2bc5e6..1e89a6aa1 100644 --- a/docs/index.md +++ b/docs/index.md @@ -515,10 +515,6 @@ CREATE INDEX Idx_elb ON alb_logs ... For now, only single or conjunct conditions (conditions connected by AND) in WHERE clause can be optimized by skipping index. -### Index Refresh Job Management - -Manual refreshing a table which already has skipping index being auto-refreshed, will be prevented. However, this assumption relies on the condition that the incremental refresh job is actively running in the same Spark cluster, which can be identified when performing the check. - ## Integration ### AWS EMR Spark Integration - Using execution role diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index cbcd517de..aa9dae660 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -320,9 +320,6 @@ class FlintSpark(val spark: SparkSession) extends Logging { spark.read.format(FLINT_DATASOURCE).load(indexName) } - private def isIncrementalRefreshing(indexName: String): Boolean = - spark.streams.active.exists(_.name == indexName) - // TODO: move to separate class private def doRefreshIndex( index: FlintSparkIndex, @@ -344,10 +341,6 @@ class FlintSpark(val spark: SparkSession) extends Logging { } val jobId = mode match { - case MANUAL if isIncrementalRefreshing(indexName) => - throw new IllegalStateException( - s"Index $indexName is incremental refreshing and cannot be manual refreshed") - case MANUAL => logInfo("Start refreshing index in batch style") batchRefresh() diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index 0d745bbb2..a177a9d1d 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -96,6 +96,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { .name(testIndex) .onTable(testTable) .addIndexColumns("name", "age") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) .create() val jobId = flint.refreshIndex(testFlintIndex) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 9e9bf6d86..a3bdb11f2 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -173,6 +173,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { .skippingIndex() .onTable(testTable) .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) .create() val jobId = flint.refreshIndex(testIndex) @@ -187,24 +188,6 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { indexData should have size 2 } - test("should fail to manual refresh an incremental refreshing index") { - flint - .skippingIndex() - .onTable(testTable) - .addPartitions("year", "month") - .create() - - val jobId = flint.refreshIndex(testIndex) - val job = spark.streams.get(jobId.get) - failAfter(streamingTimeout) { - job.processAllAvailable() - } - - assertThrows[IllegalStateException] { - flint.refreshIndex(testIndex) - } - } - test("can have only 1 skipping index on a table") { flint .skippingIndex() From 526e1d4c246fc551171ea11ed127c4f49c4e90cb Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 18 Jan 2024 13:28:15 -0800 Subject: [PATCH 4/4] Add IT for refresh an auto refresh index Signed-off-by: Chen Dai --- docs/index.md | 2 +- .../spark/FlintSparkSkippingIndexSqlITSuite.scala | 14 +++++++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/docs/index.md b/docs/index.md index 1e89a6aa1..375732179 100644 --- a/docs/index.md +++ b/docs/index.md @@ -425,7 +425,7 @@ flint.skippingIndex() .addMinMax("request_processing_time") .create() -flint.refreshIndex("flint_spark_catalog_default_alb_logs_skipping_index", FULL) +flint.refreshIndex("flint_spark_catalog_default_alb_logs_skipping_index") // Covering index flint.coveringIndex() diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index 21de15de7..964dafeee 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -14,7 +14,7 @@ import org.json4s.native.Serialization import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName -import org.scalatest.matchers.must.Matchers.{defined, have} +import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} import org.apache.spark.sql.Row @@ -144,6 +144,18 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { indexData.count() shouldBe 2 } + test("should fail if refresh an auto refresh skipping index") { + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( year PARTITION ) + | WITH (auto_refresh = true) + | """.stripMargin) + + assertThrows[IllegalStateException] { + sql(s"REFRESH SKIPPING INDEX ON $testTable") + } + } + test("create skipping index if not exists") { sql(s""" | CREATE SKIPPING INDEX