From ce4c5fdd2ff0366eb8055c9cee81e4e22558b152 Mon Sep 17 00:00:00 2001 From: ggzone Date: Tue, 7 Dec 2021 18:39:04 +0800 Subject: [PATCH] add spark job name and description (#33) --- .../src/main/scala/com/vesoft/nebula/exchange/Exchange.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 27f75101..903209fa 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -125,6 +125,7 @@ object Exchange { if (configs.tagsConfig.nonEmpty) { for (tagConfig <- configs.tagsConfig) { LOG.info(s"Processing Tag ${tagConfig.name}") + spark.sparkContext.setJobGroup(tagConfig.name, s"Tag: ${tagConfig.name}") val fieldKeys = tagConfig.fields LOG.info(s"field keys: ${fieldKeys.mkString(", ")}") @@ -168,6 +169,7 @@ object Exchange { if (configs.edgesConfig.nonEmpty) { for (edgeConfig <- configs.edgesConfig) { LOG.info(s"Processing Edge ${edgeConfig.name}") + spark.sparkContext.setJobGroup(edgeConfig.name, s"Edge: ${edgeConfig.name}") val fieldKeys = edgeConfig.fields LOG.info(s"field keys: ${fieldKeys.mkString(", ")}") @@ -211,6 +213,8 @@ object Exchange { // reimport for failed tags and edges if (failures > 0 && ErrorHandler.existError(configs.errorConfig.errorPath)) { + spark.sparkContext.setJobGroup("Reload", s"Reload: ${configs.errorConfig.errorPath}") + val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport") val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport") val data = spark.read.text(configs.errorConfig.errorPath)