Skip to content

Commit

Permalink
add spark job name and description (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
ggzone authored Dec 7, 2021
1 parent cd851ea commit ce4c5fd
Showing 1 changed file with 4 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ object Exchange {
if (configs.tagsConfig.nonEmpty) {
for (tagConfig <- configs.tagsConfig) {
LOG.info(s"Processing Tag ${tagConfig.name}")
spark.sparkContext.setJobGroup(tagConfig.name, s"Tag: ${tagConfig.name}")

val fieldKeys = tagConfig.fields
LOG.info(s"field keys: ${fieldKeys.mkString(", ")}")
Expand Down Expand Up @@ -168,6 +169,7 @@ object Exchange {
if (configs.edgesConfig.nonEmpty) {
for (edgeConfig <- configs.edgesConfig) {
LOG.info(s"Processing Edge ${edgeConfig.name}")
spark.sparkContext.setJobGroup(edgeConfig.name, s"Edge: ${edgeConfig.name}")

val fieldKeys = edgeConfig.fields
LOG.info(s"field keys: ${fieldKeys.mkString(", ")}")
Expand Down Expand Up @@ -211,6 +213,8 @@ object Exchange {

// reimport for failed tags and edges
if (failures > 0 && ErrorHandler.existError(configs.errorConfig.errorPath)) {
spark.sparkContext.setJobGroup("Reload", s"Reload: ${configs.errorConfig.errorPath}")

val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport")
val data = spark.read.text(configs.errorConfig.errorPath)
Expand Down

0 comments on commit ce4c5fd

Please sign in to comment.