Skip to content

Commit

Permalink
Remove ingestor.spark.app.name (#268)
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Wallimann <[email protected]>
  • Loading branch information
kevinwallimann authored Jun 23, 2022
1 parent 4029684 commit ed18c59
Show file tree
Hide file tree
Showing 12 changed files with 2 additions and 50 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ to identify which configuration options belong to a certain transformer instance
##### Spark settings
| Property Name | Required | Description |
| :--- | :---: | :--- |
| `ingestor.spark.app.name` | Yes | User-defined name of the Spark application. See Spark property `spark.app.name` |
| `ingestor.spark.termination.method` | No | Either `processAllAvailable` (stop query when no more messages are incoming) or `awaitTermination` (stop query on signal, e.g. Ctrl-C). Default: `awaitTermination`. See also [Combination of trigger and termination method](#combination-of-trigger-and-termination-method) |
| `ingestor.spark.await.termination.timeout` | No | Timeout in milliseconds. Stops query when timeout is reached. This option is only valid with termination method `awaitTermination` |

Expand Down
3 changes: 0 additions & 3 deletions driver/src/main/resources/Ingestion.properties.template
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ component.transformer.id.0=[avro.decoder]
component.transformer.class.[avro.decoder]=za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroDecodingTransformer
component.writer=za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriter

# Spark settings
ingestor.spark.app.name=ingestor-app-pane

# Source(Kafka) settings
reader.kafka.topic=souce-payload-topic
reader.kafka.brokers=PLAINTEXT\://broker1\:9091,SSL\://broker2:9092
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ object SparkIngestor extends SparkIngestorAttributes {

def apply(conf: Configuration): SparkIngestor = {
ComponentFactoryUtil.validateConfiguration(conf, getProperties)
val spark = getSparkSession(conf)
val spark = SparkSession.builder().getOrCreate()
val terminationMethod = getTerminationMethod(conf)
val awaitTerminationTimeout = getAwaitTerminationTimeoutMs(conf)

Expand Down Expand Up @@ -138,10 +138,4 @@ object SparkIngestor extends SparkIngestorAttributes {
}
)
}

private def getSparkSession(conf: Configuration): SparkSession = {
val name = ConfigUtils.getOrThrow(KEY_APP_NAME, conf)
SparkSession.builder().appName(name).getOrCreate()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetad

trait SparkIngestorAttributes extends HasComponentAttributes {
val keysPrefix = "ingestor.spark"
val KEY_APP_NAME = s"$keysPrefix.app.name"
val KEY_TERMINATION_METHOD = s"$keysPrefix.termination.method"
val KEY_AWAIT_TERMINATION_TIMEOUT = s"$keysPrefix.await.termination.timeout"

Expand All @@ -29,7 +28,6 @@ trait SparkIngestorAttributes extends HasComponentAttributes {
override def getDescription: String = "Component that invokes Spark for the ingestion"

override def getProperties: Map[String, PropertyMetadata] = Map(
KEY_APP_NAME -> PropertyMetadata("Name of Spark application", None, required = true),
KEY_TERMINATION_METHOD -> PropertyMetadata("Termination method",
Some(s"Either '$ProcessAllAvailable' (stop when no more messages arrive) or '$AwaitTermination' (stop on signal)." +
s" Default is '$ProcessAllAvailable'"), required = false),
Expand Down
1 change: 0 additions & 1 deletion driver/src/test/resources/ingestion.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.
#

ingestor.spark.app.name=any_name
reader.kafka.brokers=localhost:9092,otherhost:9093
ssl.keystore.password=any-keystore!!@#$% password
ssl.truststore.password=kd9910))383(((*-+
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@ class TestSparkIngestor extends FlatSpec with BeforeAndAfterEach with MockitoSug

private val dataFrame: DataFrame = mock[DataFrame]
private val streamingQuery: StreamingQuery = mock[StreamingQuery]
private val configuration = {
val config = new BaseConfiguration
config.addProperty(SparkIngestor.KEY_APP_NAME, "my-app-name")
config
}
private val configuration = new BaseConfiguration

override def beforeEach(): Unit = {
reset(
Expand Down Expand Up @@ -100,23 +96,8 @@ class TestSparkIngestor extends FlatSpec with BeforeAndAfterEach with MockitoSug
verify(streamingQuery).awaitTermination()
}

it should "use the configured app name" in {
val config = new BaseConfiguration
config.addProperty(SparkIngestor.KEY_APP_NAME, "my-app-name")
val sparkIngestor = SparkIngestor(config)

sparkIngestor.spark.conf.get("spark.app.name") shouldBe "my-app-name"
}

it should "throw if no app name is configured" in {
val throwable = intercept[IllegalArgumentException](SparkIngestor(new BaseConfiguration))

throwable.getMessage should include(SparkIngestor.KEY_APP_NAME)
}

it should "use terminationMethod awaitTermination if configured" in {
val config = new BaseConfiguration
config.addProperty(SparkIngestor.KEY_APP_NAME, "my-spark-app")
val sparkIngestor = SparkIngestor(config)
when(streamReader.read(any[SparkSession])).thenReturn(dataFrame)
when(streamTransformer.transform(dataFrame)).thenReturn(dataFrame)
Expand All @@ -129,7 +110,6 @@ class TestSparkIngestor extends FlatSpec with BeforeAndAfterEach with MockitoSug

it should "use timeout if configured with terminationMethod awaitTermination" in {
val config = new BaseConfiguration
config.addProperty(SparkIngestor.KEY_APP_NAME, "my-spark-app")
config.addProperty(s"${SparkIngestor.KEY_AWAIT_TERMINATION_TIMEOUT}", "10000")
val sparkIngestor = SparkIngestor(config)
when(streamReader.read(any[SparkSession])).thenReturn(dataFrame)
Expand All @@ -143,7 +123,6 @@ class TestSparkIngestor extends FlatSpec with BeforeAndAfterEach with MockitoSug

it should "throw if an invalid terminationMethod is configured" in {
val config = new BaseConfiguration
config.addProperty(SparkIngestor.KEY_APP_NAME, "my-spark-app")
config.addProperty(s"${SparkIngestor.KEY_TERMINATION_METHOD}", "non-existent")
val throwable = intercept[IllegalArgumentException](SparkIngestor(config))

Expand All @@ -152,7 +131,6 @@ class TestSparkIngestor extends FlatSpec with BeforeAndAfterEach with MockitoSug

it should "throw if a timeout is not a number" in {
val config = new BaseConfiguration
config.addProperty(SparkIngestor.KEY_APP_NAME, "my-spark-app")
config.addProperty(s"${SparkIngestor.KEY_AWAIT_TERMINATION_TIMEOUT}", "nan")
val throwable = intercept[IllegalArgumentException](SparkIngestor(config))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ class KafkaToKafkaDeduplicationAfterRetryDockerTest extends FlatSpec with Matche
"component.writer" -> "za.co.absa.hyperdrive.ingestor.implementation.writer.kafka.KafkaStreamWriter",

// Spark settings
"ingestor.spark.app.name" -> "ingestor-app",
"ingestor.spark.termination.timeout" -> "60000",

// Source(Kafka) settings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ class KafkaToKafkaDockerTest extends FlatSpec with Matchers with SparkTestBase w
"component.writer" -> "za.co.absa.hyperdrive.ingestor.implementation.writer.kafka.KafkaStreamWriter",

// Spark settings
"ingestor.spark.app.name" -> "ingestor-app",
"ingestor.spark.termination.method" -> "ProcessAllAvailable",

// Source(Kafka) settings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ class KafkaToParquetDockerTest extends FlatSpec with Matchers with SparkTestBase
"component.transformer.class.column.selector" -> "za.co.absa.hyperdrive.ingestor.implementation.transformer.column.selection.ColumnSelectorStreamTransformer",
"component.writer" -> "za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriter",

// Spark settings
"ingestor.spark.app.name" -> "ingestor-app",

// Source(Kafka) settings
"reader.kafka.topic" -> topic,
"reader.kafka.brokers" -> kafkaSchemaRegistryWrapper.kafkaUrl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ class KafkaToParquetIncrementingVersionDockerTest extends FlatSpec with Matchers
"component.transformer.class.[column.renamer]" -> "za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming.ColumnRenamingStreamTransformer",
"component.writer" -> "za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriter",

// Spark settings
"ingestor.spark.app.name" -> "ingestor-app",

// Source(Kafka) settings
"reader.kafka.topic" -> topic,
"reader.kafka.brokers" -> kafkaSchemaRegistryWrapper.kafkaUrl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package za.co.absa.hyperdrive.driver.drivers

import org.scalatest.{FlatSpec, Matchers}
import za.co.absa.hyperdrive.driver.SparkIngestor.KEY_APP_NAME

class TestCommandLineIngestionDriver extends FlatSpec with Matchers {

Expand All @@ -27,7 +26,6 @@ class TestCommandLineIngestionDriver extends FlatSpec with Matchers {

val config = CommandLineIngestionDriver.parseConfiguration(settings)

config.getString("ingestor.spark.app.name") shouldBe "any_name"
config.getStringArray("reader.kafka.brokers") shouldBe Array("localhost:9092", "otherhost:9093")
config.getString("ssl.keystore.password") shouldBe "any-keystore!!@#$% password"
config.getString("ssl.truststore.password") shouldBe "kd9910))383(((*-+"
Expand All @@ -54,7 +52,6 @@ class TestCommandLineIngestionDriver extends FlatSpec with Matchers {
private def getSettings: Array[String] = {
import za.co.absa.hyperdrive.ingestor.implementation.reader.kafka.KafkaStreamReader.KEY_BROKERS
Array(
s"$KEY_APP_NAME=any_name",
s"$KEY_BROKERS=localhost:9092,otherhost:9093",
"ssl.keystore.password=any-keystore!!@#$% password",
"ssl.truststore.password=kd9910))383(((*-+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package za.co.absa.hyperdrive.driver.drivers

import org.scalatest.{FlatSpec, Matchers}
import za.co.absa.hyperdrive.driver.SparkIngestor.KEY_APP_NAME

class TestPropertiesIngestionDriver extends FlatSpec with Matchers {

Expand All @@ -27,7 +26,6 @@ class TestPropertiesIngestionDriver extends FlatSpec with Matchers {

val config = PropertiesIngestionDriver.loadConfiguration(configurationFile)

config.getString("ingestor.spark.app.name") shouldBe "any_name"
config.getStringArray("reader.kafka.brokers") shouldBe Array("localhost:9092", "otherhost:9093")
config.getString("ssl.keystore.password") shouldBe "any-keystore!!@#$% password"
config.getString("ssl.truststore.password") shouldBe "kd9910))383(((*-+"
Expand Down

0 comments on commit ed18c59

Please sign in to comment.