diff --git a/README.md b/README.md index aec0cdc9..4a01cd27 100644 --- a/README.md +++ b/README.md @@ -98,7 +98,6 @@ to identify which configuration options belong to a certain transformer instance ##### Spark settings | Property Name | Required | Description | | :--- | :---: | :--- | -| `ingestor.spark.app.name` | Yes | User-defined name of the Spark application. See Spark property `spark.app.name` | | `ingestor.spark.termination.method` | No | Either `processAllAvailable` (stop query when no more messages are incoming) or `awaitTermination` (stop query on signal, e.g. Ctrl-C). Default: `awaitTermination`. See also [Combination of trigger and termination method](#combination-of-trigger-and-termination-method) | | `ingestor.spark.await.termination.timeout` | No | Timeout in milliseconds. Stops query when timeout is reached. This option is only valid with termination method `awaitTermination` | diff --git a/driver/src/main/resources/Ingestion.properties.template b/driver/src/main/resources/Ingestion.properties.template index 8cbfd437..ca4e38c2 100644 --- a/driver/src/main/resources/Ingestion.properties.template +++ b/driver/src/main/resources/Ingestion.properties.template @@ -24,9 +24,6 @@ component.transformer.id.0=[avro.decoder] component.transformer.class.[avro.decoder]=za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroDecodingTransformer component.writer=za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriter -# Spark settings -ingestor.spark.app.name=ingestor-app-pane - # Source(Kafka) settings reader.kafka.topic=souce-payload-topic reader.kafka.brokers=PLAINTEXT\://broker1\:9091,SSL\://broker2:9092 diff --git a/driver/src/main/scala/za/co/absa/hyperdrive/driver/SparkIngestor.scala b/driver/src/main/scala/za/co/absa/hyperdrive/driver/SparkIngestor.scala index f49137b2..e253db29 100644 --- a/driver/src/main/scala/za/co/absa/hyperdrive/driver/SparkIngestor.scala +++ b/driver/src/main/scala/za/co/absa/hyperdrive/driver/SparkIngestor.scala @@ -104,7 +104,7 @@ object SparkIngestor extends SparkIngestorAttributes { def apply(conf: Configuration): SparkIngestor = { ComponentFactoryUtil.validateConfiguration(conf, getProperties) - val spark = getSparkSession(conf) + val spark = SparkSession.builder().getOrCreate() val terminationMethod = getTerminationMethod(conf) val awaitTerminationTimeout = getAwaitTerminationTimeoutMs(conf) @@ -138,10 +138,4 @@ object SparkIngestor extends SparkIngestorAttributes { } ) } - - private def getSparkSession(conf: Configuration): SparkSession = { - val name = ConfigUtils.getOrThrow(KEY_APP_NAME, conf) - SparkSession.builder().appName(name).getOrCreate() - } - } diff --git a/driver/src/main/scala/za/co/absa/hyperdrive/driver/SparkIngestorAttributes.scala b/driver/src/main/scala/za/co/absa/hyperdrive/driver/SparkIngestorAttributes.scala index a82857e1..a60ebb34 100644 --- a/driver/src/main/scala/za/co/absa/hyperdrive/driver/SparkIngestorAttributes.scala +++ b/driver/src/main/scala/za/co/absa/hyperdrive/driver/SparkIngestorAttributes.scala @@ -20,7 +20,6 @@ import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetad trait SparkIngestorAttributes extends HasComponentAttributes { val keysPrefix = "ingestor.spark" - val KEY_APP_NAME = s"$keysPrefix.app.name" val KEY_TERMINATION_METHOD = s"$keysPrefix.termination.method" val KEY_AWAIT_TERMINATION_TIMEOUT = s"$keysPrefix.await.termination.timeout" @@ -29,7 +28,6 @@ trait SparkIngestorAttributes extends HasComponentAttributes { override def getDescription: String = "Component that invokes Spark for the ingestion" override def getProperties: Map[String, PropertyMetadata] = Map( - KEY_APP_NAME -> PropertyMetadata("Name of Spark application", None, required = true), KEY_TERMINATION_METHOD -> PropertyMetadata("Termination method", Some(s"Either '$ProcessAllAvailable' (stop when no more messages arrive) or '$AwaitTermination' (stop on signal)." + s" Default is '$ProcessAllAvailable'"), required = false), diff --git a/driver/src/test/resources/ingestion.properties b/driver/src/test/resources/ingestion.properties index 391881bd..9e73e2ed 100644 --- a/driver/src/test/resources/ingestion.properties +++ b/driver/src/test/resources/ingestion.properties @@ -13,7 +13,6 @@ # limitations under the License. # -ingestor.spark.app.name=any_name reader.kafka.brokers=localhost:9092,otherhost:9093 ssl.keystore.password=any-keystore!!@#$% password ssl.truststore.password=kd9910))383(((*-+ diff --git a/driver/src/test/scala/za/co/absa/hyperdrive/driver/TestSparkIngestor.scala b/driver/src/test/scala/za/co/absa/hyperdrive/driver/TestSparkIngestor.scala index a376e5bc..eddfde10 100644 --- a/driver/src/test/scala/za/co/absa/hyperdrive/driver/TestSparkIngestor.scala +++ b/driver/src/test/scala/za/co/absa/hyperdrive/driver/TestSparkIngestor.scala @@ -38,11 +38,7 @@ class TestSparkIngestor extends FlatSpec with BeforeAndAfterEach with MockitoSug private val dataFrame: DataFrame = mock[DataFrame] private val streamingQuery: StreamingQuery = mock[StreamingQuery] - private val configuration = { - val config = new BaseConfiguration - config.addProperty(SparkIngestor.KEY_APP_NAME, "my-app-name") - config - } + private val configuration = new BaseConfiguration override def beforeEach(): Unit = { reset( @@ -100,23 +96,8 @@ class TestSparkIngestor extends FlatSpec with BeforeAndAfterEach with MockitoSug verify(streamingQuery).awaitTermination() } - it should "use the configured app name" in { - val config = new BaseConfiguration - config.addProperty(SparkIngestor.KEY_APP_NAME, "my-app-name") - val sparkIngestor = SparkIngestor(config) - - sparkIngestor.spark.conf.get("spark.app.name") shouldBe "my-app-name" - } - - it should "throw if no app name is configured" in { - val throwable = intercept[IllegalArgumentException](SparkIngestor(new BaseConfiguration)) - - throwable.getMessage should include(SparkIngestor.KEY_APP_NAME) - } - it should "use terminationMethod awaitTermination if configured" in { val config = new BaseConfiguration - config.addProperty(SparkIngestor.KEY_APP_NAME, "my-spark-app") val sparkIngestor = SparkIngestor(config) when(streamReader.read(any[SparkSession])).thenReturn(dataFrame) when(streamTransformer.transform(dataFrame)).thenReturn(dataFrame) @@ -129,7 +110,6 @@ class TestSparkIngestor extends FlatSpec with BeforeAndAfterEach with MockitoSug it should "use timeout if configured with terminationMethod awaitTermination" in { val config = new BaseConfiguration - config.addProperty(SparkIngestor.KEY_APP_NAME, "my-spark-app") config.addProperty(s"${SparkIngestor.KEY_AWAIT_TERMINATION_TIMEOUT}", "10000") val sparkIngestor = SparkIngestor(config) when(streamReader.read(any[SparkSession])).thenReturn(dataFrame) @@ -143,7 +123,6 @@ class TestSparkIngestor extends FlatSpec with BeforeAndAfterEach with MockitoSug it should "throw if an invalid terminationMethod is configured" in { val config = new BaseConfiguration - config.addProperty(SparkIngestor.KEY_APP_NAME, "my-spark-app") config.addProperty(s"${SparkIngestor.KEY_TERMINATION_METHOD}", "non-existent") val throwable = intercept[IllegalArgumentException](SparkIngestor(config)) @@ -152,7 +131,6 @@ class TestSparkIngestor extends FlatSpec with BeforeAndAfterEach with MockitoSug it should "throw if a timeout is not a number" in { val config = new BaseConfiguration - config.addProperty(SparkIngestor.KEY_APP_NAME, "my-spark-app") config.addProperty(s"${SparkIngestor.KEY_AWAIT_TERMINATION_TIMEOUT}", "nan") val throwable = intercept[IllegalArgumentException](SparkIngestor(config)) diff --git a/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToKafkaDeduplicationAfterRetryDockerTest.scala b/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToKafkaDeduplicationAfterRetryDockerTest.scala index c675c66d..f6d78058 100644 --- a/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToKafkaDeduplicationAfterRetryDockerTest.scala +++ b/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToKafkaDeduplicationAfterRetryDockerTest.scala @@ -162,7 +162,6 @@ class KafkaToKafkaDeduplicationAfterRetryDockerTest extends FlatSpec with Matche "component.writer" -> "za.co.absa.hyperdrive.ingestor.implementation.writer.kafka.KafkaStreamWriter", // Spark settings - "ingestor.spark.app.name" -> "ingestor-app", "ingestor.spark.termination.timeout" -> "60000", // Source(Kafka) settings diff --git a/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToKafkaDockerTest.scala b/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToKafkaDockerTest.scala index 493ebd64..8e06bd9e 100644 --- a/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToKafkaDockerTest.scala +++ b/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToKafkaDockerTest.scala @@ -93,7 +93,6 @@ class KafkaToKafkaDockerTest extends FlatSpec with Matchers with SparkTestBase w "component.writer" -> "za.co.absa.hyperdrive.ingestor.implementation.writer.kafka.KafkaStreamWriter", // Spark settings - "ingestor.spark.app.name" -> "ingestor-app", "ingestor.spark.termination.method" -> "ProcessAllAvailable", // Source(Kafka) settings diff --git a/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToParquetDockerTest.scala b/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToParquetDockerTest.scala index 418fd4ea..b3f33293 100644 --- a/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToParquetDockerTest.scala +++ b/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToParquetDockerTest.scala @@ -78,9 +78,6 @@ class KafkaToParquetDockerTest extends FlatSpec with Matchers with SparkTestBase "component.transformer.class.column.selector" -> "za.co.absa.hyperdrive.ingestor.implementation.transformer.column.selection.ColumnSelectorStreamTransformer", "component.writer" -> "za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriter", - // Spark settings - "ingestor.spark.app.name" -> "ingestor-app", - // Source(Kafka) settings "reader.kafka.topic" -> topic, "reader.kafka.brokers" -> kafkaSchemaRegistryWrapper.kafkaUrl, diff --git a/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToParquetIncrementingVersionDockerTest.scala b/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToParquetIncrementingVersionDockerTest.scala index feaf3206..038d4ad0 100644 --- a/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToParquetIncrementingVersionDockerTest.scala +++ b/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToParquetIncrementingVersionDockerTest.scala @@ -68,9 +68,6 @@ class KafkaToParquetIncrementingVersionDockerTest extends FlatSpec with Matchers "component.transformer.class.[column.renamer]" -> "za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming.ColumnRenamingStreamTransformer", "component.writer" -> "za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriter", - // Spark settings - "ingestor.spark.app.name" -> "ingestor-app", - // Source(Kafka) settings "reader.kafka.topic" -> topic, "reader.kafka.brokers" -> kafkaSchemaRegistryWrapper.kafkaUrl, diff --git a/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/TestCommandLineIngestionDriver.scala b/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/TestCommandLineIngestionDriver.scala index 42765fea..53c82eff 100644 --- a/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/TestCommandLineIngestionDriver.scala +++ b/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/TestCommandLineIngestionDriver.scala @@ -16,7 +16,6 @@ package za.co.absa.hyperdrive.driver.drivers import org.scalatest.{FlatSpec, Matchers} -import za.co.absa.hyperdrive.driver.SparkIngestor.KEY_APP_NAME class TestCommandLineIngestionDriver extends FlatSpec with Matchers { @@ -27,7 +26,6 @@ class TestCommandLineIngestionDriver extends FlatSpec with Matchers { val config = CommandLineIngestionDriver.parseConfiguration(settings) - config.getString("ingestor.spark.app.name") shouldBe "any_name" config.getStringArray("reader.kafka.brokers") shouldBe Array("localhost:9092", "otherhost:9093") config.getString("ssl.keystore.password") shouldBe "any-keystore!!@#$% password" config.getString("ssl.truststore.password") shouldBe "kd9910))383(((*-+" @@ -54,7 +52,6 @@ class TestCommandLineIngestionDriver extends FlatSpec with Matchers { private def getSettings: Array[String] = { import za.co.absa.hyperdrive.ingestor.implementation.reader.kafka.KafkaStreamReader.KEY_BROKERS Array( - s"$KEY_APP_NAME=any_name", s"$KEY_BROKERS=localhost:9092,otherhost:9093", "ssl.keystore.password=any-keystore!!@#$% password", "ssl.truststore.password=kd9910))383(((*-+", diff --git a/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/TestPropertiesIngestionDriver.scala b/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/TestPropertiesIngestionDriver.scala index cd9452ab..e6e854d9 100644 --- a/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/TestPropertiesIngestionDriver.scala +++ b/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/TestPropertiesIngestionDriver.scala @@ -16,7 +16,6 @@ package za.co.absa.hyperdrive.driver.drivers import org.scalatest.{FlatSpec, Matchers} -import za.co.absa.hyperdrive.driver.SparkIngestor.KEY_APP_NAME class TestPropertiesIngestionDriver extends FlatSpec with Matchers { @@ -27,7 +26,6 @@ class TestPropertiesIngestionDriver extends FlatSpec with Matchers { val config = PropertiesIngestionDriver.loadConfiguration(configurationFile) - config.getString("ingestor.spark.app.name") shouldBe "any_name" config.getStringArray("reader.kafka.brokers") shouldBe Array("localhost:9092", "otherhost:9093") config.getString("ssl.keystore.password") shouldBe "any-keystore!!@#$% password" config.getString("ssl.truststore.password") shouldBe "kd9910))383(((*-+"