From 93fe258ff300d9c6a2ac698f343e9605f0f57c65 Mon Sep 17 00:00:00 2001 From: Anqi Date: Fri, 13 Oct 2023 17:54:39 +0800 Subject: [PATCH] add import status and records count (#162) * add import status and records count * update test --- .../exchange/common/GraphProvider.scala | 2 +- .../exchange/common/config/Configs.scala | 18 ++- .../common/processor/ReloadProcessor.scala | 2 +- .../common/writer/FileBaseWriter.scala | 6 +- .../common/writer/ServerBaseWriter.scala | 18 +-- .../com/vesoft/nebula/exchange/Exchange.scala | 146 +++++++++++------ .../exchange/processor/EdgeProcessor.scala | 8 +- .../processor/VerticesProcessor.scala | 10 +- .../processor/EdgeProcessorSuite.scala | 54 ++++--- .../processor/VerticesProcessorSuite.scala | 48 +++++- .../com/vesoft/nebula/exchange/Exchange.scala | 147 ++++++++++++------ .../exchange/processor/EdgeProcessor.scala | 12 +- .../processor/VerticesProcessor.scala | 12 +- .../exchange/reader/ServerBaseReader.scala | 4 +- .../processor/EdgeProcessorSuite.scala | 13 +- .../processor/VerticesProcessorSuite.scala | 35 ++++- .../com/vesoft/nebula/exchange/Exchange.scala | 144 +++++++++++------ .../exchange/processor/EdgeProcessor.scala | 13 +- .../processor/VerticesProcessor.scala | 12 +- .../processor/EdgeProcessorSuite.scala | 13 +- .../processor/VerticesProcessorSuite.scala | 37 ++++- 21 files changed, 540 insertions(+), 214 deletions(-) diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/GraphProvider.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/GraphProvider.scala index 68d0f964..5c7dcb02 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/GraphProvider.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/GraphProvider.scala @@ -65,7 +65,7 @@ class GraphProvider(addresses: List[HostAddress], timeout: Int, sslConfigEntry: def switchSpace(session: Session, space: String): ResultSet = { val switchStatment = s"use $space" - LOG.info(s"switch space $space") + LOG.info(s">>>>>> switch space $space") val result = submit(session, switchStatment) result } diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala index 5497a584..0627c393 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala @@ -66,7 +66,9 @@ case class DataBaseConfigEntry(graphAddress: List[String], "nebula.address.meta has wrong format, please make sure the format is [\"ip1:port1\",\"ip2:port2\"]") } - override def toString: String = super.toString + override def toString: String = { + s"DataBaseConfigEntry:{graphAddress:$graphAddress, space:$space, metaAddress:$metaAddresses}" + } def getGraphAddress: List[HostAddress] = { val hostAndPorts = new ListBuffer[HostAddress] @@ -94,7 +96,8 @@ case class DataBaseConfigEntry(graphAddress: List[String], case class UserConfigEntry(user: String, password: String) { require(user.trim.nonEmpty && password.trim.nonEmpty) - override def toString: String = super.toString + override def toString: String = + s"UserConfigEntry{user:$user, password:xxxxx}" } /** @@ -106,7 +109,7 @@ case class UserConfigEntry(user: String, password: String) { case class ConnectionConfigEntry(timeout: Int, retry: Int) { require(timeout > 0 && retry > 0) - override def toString: String = super.toString + override def toString: String = s"cConnectionConfigEntry:{timeout:$timeout, retry:$retry}" } /** @@ -119,7 +122,7 @@ case class ConnectionConfigEntry(timeout: Int, retry: Int) { case class ExecutionConfigEntry(timeout: Int, retry: Int, interval: Int) { require(timeout > 0 && retry > 0 && interval > 0) - override def toString: String = super.toString + override def toString: String = s"ExecutionConfigEntry:{timeout:$timeout, retry:$retry}" } /** @@ -131,7 +134,8 @@ case class ExecutionConfigEntry(timeout: Int, retry: Int, interval: Int) { case class ErrorConfigEntry(errorPath: String, errorMaxSize: Int) { require(errorPath.trim.nonEmpty && errorMaxSize > 0) - override def toString: String = super.toString + override def toString: String = + s"ErrorConfigEntry:{errorPath:$errorPath, errorMaxSize:$errorMaxSize}" } /** @@ -143,7 +147,7 @@ case class ErrorConfigEntry(errorPath: String, errorMaxSize: Int) { case class RateConfigEntry(limit: Int, timeout: Int) { require(limit > 0 && timeout > 0) - override def toString: String = super.toString + override def toString: String = s"RateConfigEntry:{limit:$limit, timeout:$timeout}" } /** @@ -168,7 +172,7 @@ case class SslConfigEntry(enableGraph: Boolean, } } - override def toString: String = super.toString + override def toString: String = s"SslConfigEntry:{enableGraph:$enableGraph, enableMeta:$enableMeta, signType:${signType.toString}}" } case class CaSignParam(caCrtFilePath: String, crtFilePath: String, keyFilePath: String) diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/processor/ReloadProcessor.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/processor/ReloadProcessor.scala index 0b40d40d..836ada79 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/processor/ReloadProcessor.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/processor/ReloadProcessor.scala @@ -61,7 +61,7 @@ class ReloadProcessor(data: DataFrame, .getPartitionId()}") errorBuffer.clear() } - LOG.info(s"data reload in partition ${TaskContext + LOG.info(s">>>>> data reload in partition ${TaskContext .getPartitionId()} cost ${System.currentTimeMillis() - startTime}ms") writer.close() graphProvider.close() diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/FileBaseWriter.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/FileBaseWriter.scala index 9359a43b..724f9779 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/FileBaseWriter.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/FileBaseWriter.scala @@ -26,10 +26,10 @@ class NebulaSSTWriter(path: String) extends Writer { try { RocksDB.loadLibrary() - LOG.info("Loading RocksDB successfully") + LOG.info(">>>>> Loading RocksDB successfully") } catch { case _: Exception => - LOG.error("Can't load RocksDB library!") + LOG.error(">>>>> Can't load RocksDB library!") } // TODO More Config ... @@ -108,7 +108,7 @@ class GenerateSstFile extends Serializable { } } catch { case e: Throwable => { - LOG.error("sst file write error,", e) + LOG.error(">>>>> sst file write error,", e) batchFailure.add(1) } } finally { diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/ServerBaseWriter.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/ServerBaseWriter.scala index 46b3b120..56005139 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/ServerBaseWriter.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/ServerBaseWriter.scala @@ -292,7 +292,7 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry, throw new RuntimeException("Switch Failed for " + switchResult.getErrorMessage) } - LOG.info(s"Connection to ${dataBaseConfigEntry.graphAddress}") + LOG.info(s">>>>>> Connection to ${dataBaseConfigEntry.graphAddress}") } def execute(vertices: Vertices, writeMode: WriteMode.Mode): String = { @@ -329,16 +329,16 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry, val result = graphProvider.submit(session, statement) if (result.isSucceeded) { LOG.info( - s" write ${config.name}, batch size(${vertices.values.size}), latency(${result.getLatency})") + s">>>>> write ${config.name}, batch size(${vertices.values.size}), latency(${result.getLatency})") return null } - LOG.error(s"write vertex failed for ${result.getErrorMessage}") + LOG.error(s">>>>> write vertex failed for ${result.getErrorMessage} statement: \n $statement") if (result.getErrorCode == ErrorCode.E_BAD_PERMISSION.getValue) { throw new RuntimeException( s"write ${config.name} failed for E_BAD_PERMISSION: ${result.getErrorMessage}") } } else { - LOG.error(s"write vertex failed because write speed is too fast") + LOG.error(s">>>>>> write vertex failed because write speed is too fast") } statement } @@ -349,16 +349,16 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry, val result = graphProvider.submit(session, statement) if (result.isSucceeded) { LOG.info( - s" write ${config.name}, batch size(${edges.values.size}), latency(${result.getLatency}us)") + s">>>>>> write ${config.name}, batch size(${edges.values.size}), latency(${result.getLatency}us)") return null } - LOG.error(s"write edge failed for ${result.getErrorMessage}") + LOG.error(s">>>>>> write edge failed for ${result.getErrorMessage}") if (result.getErrorCode == ErrorCode.E_BAD_PERMISSION.getValue) { throw new RuntimeException( s"write ${config.name} failed for E_BAD_PERMISSION: ${result.getErrorMessage}") } } else { - LOG.error(s"write vertex failed because write speed is too fast") + LOG.error(s">>>>>> write vertex failed because write speed is too fast") } statement } @@ -369,9 +369,9 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry, if (result.isSucceeded) { return null } - LOG.error(s"reimport ngql failed for ${result.getErrorMessage}") + LOG.error(s">>>>>> reimport ngql failed for ${result.getErrorMessage}") } else { - LOG.error(s"reimport ngql failed because write speed is too fast") + LOG.error(s">>>>>> reimport ngql failed because write speed is too fast") } ngql } diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 356b92bf..59405337 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -72,12 +72,12 @@ object Exchange { val c: Argument = options match { case Some(config) => config case _ => - LOG.error("Argument parse failed") + LOG.error(">>>>>> Argument parse failed") sys.exit(-1) } val configs = Configs.parse(c.config, c.variable, c.param) - LOG.info(s"Config ${configs}") + LOG.info(s">>>>>> Config ${configs}") val session = SparkSession .builder() @@ -95,7 +95,7 @@ object Exchange { if (c.hive) { if (configs.hiveConfigEntry.isEmpty) { LOG.info( - "you don't com.vesoft.exchange.common.config hive source, so using hive tied with spark.") + ">>>>>> you don't com.vesoft.exchange.common.config hive source, so using hive tied with spark.") } else { val hiveConfig = configs.hiveConfigEntry.get sparkConf.set("spark.sql.warehouse.dir", hiveConfig.warehouse) @@ -116,17 +116,29 @@ object Exchange { val spark = session.getOrCreate() // check the spark version SparkValidate.validate(spark.version, "2.2.*") + val startTime = System.currentTimeMillis() + var totalClientBatchSuccess: Long = 0L + var totalClientBatchFailure: Long = 0L + var totalClientRecordSuccess: Long = 0L + var totalClientRecordFailure: Long = 0L + var totalSstRecordSuccess: Long = 0l + var totalSstRecordFailure: Long = 0L // reload for failed import tasks - if (!c.reload.isEmpty) { + if (c.reload.nonEmpty) { val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reload") val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reload") + val start = System.currentTimeMillis() val data = spark.read.text(c.reload) val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure) processor.process() - LOG.info(s"batchSuccess.reload: ${batchSuccess.value}") - LOG.info(s"batchFailure.reload: ${batchFailure.value}") + LOG.info(s">>>>> batchSuccess.reload: ${batchSuccess.value}") + LOG.info(s">>>>> batchFailure.reload: ${batchFailure.value}") + LOG.info( + s">>>>> exchange reload job finished, cost:${((System.currentTimeMillis() - start) / 1000.0) + .formatted("%.2f")}s") + sys.exit(0) } @@ -136,13 +148,15 @@ object Exchange { // import tags if (configs.tagsConfig.nonEmpty) { for (tagConfig <- configs.tagsConfig) { - LOG.info(s"Processing Tag ${tagConfig.name}") + LOG.info(s">>>>>> Processing Tag ${tagConfig.name}") spark.sparkContext.setJobGroup(tagConfig.name, s"Tag: ${tagConfig.name}") + val start = System.currentTimeMillis() + val fieldKeys = tagConfig.fields - LOG.info(s"field keys: ${fieldKeys.mkString(", ")}") + LOG.info(s">>>>>> field keys: ${fieldKeys.mkString(", ")}") val nebulaKeys = tagConfig.nebulaFields - LOG.info(s"nebula keys: ${nebulaKeys.mkString(", ")}") + LOG.info(s">>>>>> nebula keys: ${nebulaKeys.mkString(", ")}") val fields = tagConfig.vertexField :: tagConfig.fields val data = createDataSource(spark, tagConfig.dataSourceConfigEntry, fields) @@ -155,11 +169,12 @@ object Exchange { } else { data.get } - val startTime = System.currentTimeMillis() val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}") val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.${tagConfig.name}") + val recordSuccess = spark.sparkContext.longAccumulator(s"recordSuccess.${tagConfig.name}") + val recordFailure = spark.sparkContext.longAccumulator(s"recordFailure.${tagConfig.name}") val processor = new VerticesProcessor( spark, @@ -169,34 +184,49 @@ object Exchange { nebulaKeys, configs, batchSuccess, - batchFailure + batchFailure, + recordSuccess, + recordFailure ) processor.process() - val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f") - LOG.info(s"import for tag ${tagConfig.name}, total time: ${costTime}s") + val costTime = ((System.currentTimeMillis() - start) / 1000.0).formatted("%.2f") + LOG.info(s">>>>>> import for tag ${tagConfig.name}, total time: ${costTime}s") if (tagConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) { - LOG.info(s"Client-Import: batchSuccess.${tagConfig.name}: ${batchSuccess.value}") - LOG.info(s"Client-Import: batchFailure.${tagConfig.name}: ${batchFailure.value}") + LOG.info(s">>>>> Client-Import: batchSuccess.${tagConfig.name}: ${batchSuccess.value}") + LOG.info( + s">>>>> Client-Import: recordSuccess.${tagConfig.name}: ${recordSuccess.value}") + LOG.info(s">>>>> Client-Import: batchFailure.${tagConfig.name}: ${batchFailure.value}") + LOG.info( + s">>>>> Client-Import: recordFailure.${tagConfig.name}: ${recordFailure.value}") failures += batchFailure.value + totalClientRecordSuccess += recordSuccess.value + totalClientRecordFailure += recordFailure.value + totalClientBatchSuccess += batchSuccess.value + totalClientBatchFailure += batchFailure.value } else { - LOG.info(s"SST-Import: failure.${tagConfig.name}: ${batchFailure.value}") + LOG.info(s">>>>> SST-Import: success.${tagConfig.name}: ${recordSuccess.value}") + LOG.info(s">>>>> SST-Import: failure.${tagConfig.name}: ${recordFailure.value}") + totalSstRecordSuccess += recordSuccess.value + totalSstRecordFailure += recordFailure.value } } } } else { - LOG.warn("Tag is not defined") + LOG.warn(">>>>>> Tag is not defined") } // import edges if (configs.edgesConfig.nonEmpty) { for (edgeConfig <- configs.edgesConfig) { - LOG.info(s"Processing Edge ${edgeConfig.name}") + LOG.info(s">>>>>> Processing Edge ${edgeConfig.name}") spark.sparkContext.setJobGroup(edgeConfig.name, s"Edge: ${edgeConfig.name}") + val start = System.currentTimeMillis() + val fieldKeys = edgeConfig.fields - LOG.info(s"field keys: ${fieldKeys.mkString(", ")}") + LOG.info(s">>>>>> field keys: ${fieldKeys.mkString(", ")}") val nebulaKeys = edgeConfig.nebulaFields - LOG.info(s"nebula keys: ${nebulaKeys.mkString(", ")}") + LOG.info(s">>>>>> nebula keys: ${nebulaKeys.mkString(", ")}") val fields = if (edgeConfig.rankingField.isDefined) { edgeConfig.rankingField.get :: edgeConfig.sourceField :: edgeConfig.targetField :: edgeConfig.fields } else { @@ -215,9 +245,12 @@ object Exchange { df = dataUdf(df, edgeConfig.dstVertexUdf.get) } - val startTime = System.currentTimeMillis() val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${edgeConfig.name}") val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.${edgeConfig.name}") + val recordSuccess = + spark.sparkContext.longAccumulator(s"recordSuccess.${edgeConfig.name}") + val recordFailure = + spark.sparkContext.longAccumulator(s"recordFailure.${edgeConfig.name}") val processor = new EdgeProcessor( spark, @@ -227,22 +260,34 @@ object Exchange { nebulaKeys, configs, batchSuccess, - batchFailure + batchFailure, + recordSuccess, + recordFailure ) processor.process() - val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f") - LOG.info(s"import for edge ${edgeConfig.name}, total time: ${costTime}s") + val costTime = ((System.currentTimeMillis() - start) / 1000.0).formatted("%.2f") + LOG.info(s">>>>>> import for edge ${edgeConfig.name}, total time: ${costTime}s") if (edgeConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) { - LOG.info(s"Client-Import: batchSuccess.${edgeConfig.name}: ${batchSuccess.value}") - LOG.info(s"Client-Import: batchFailure.${edgeConfig.name}: ${batchFailure.value}") + LOG.info(s">>>>> Client-Import: batchSuccess.${edgeConfig.name}: ${batchSuccess.value}") + LOG.info( + s">>>>> Client-Import: recordSuccess.${edgeConfig.name}: ${recordSuccess.value}") + LOG.info(s">>>>> Client-Import: batchFailure.${edgeConfig.name}: ${batchFailure.value}") + LOG.info( + s">>>>> Client-Import: recordFailure.${edgeConfig.name}: ${recordFailure.value}") failures += batchFailure.value + totalClientRecordSuccess += recordSuccess.value + totalClientRecordFailure += recordFailure.value + totalClientBatchSuccess += batchSuccess.value + totalClientBatchFailure += batchFailure.value } else { - LOG.info(s"SST-Import: failure.${edgeConfig.name}: ${batchFailure.value}") + LOG.info(s">>>>> SST-Import: failure.${edgeConfig.name}: ${recordFailure.value}") + totalSstRecordSuccess += recordSuccess.value + totalSstRecordFailure += recordFailure.value } } } } else { - LOG.warn("Edge is not defined") + LOG.warn(">>>>>> Edge is not defined") } // reimport for failed tags and edges @@ -253,15 +298,26 @@ object Exchange { val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport") val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport") val data = spark.read.text(errorPath) - val startTime = System.currentTimeMillis() + val start = System.currentTimeMillis() val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure) processor.process() - val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f") - LOG.info(s"reimport ngql cost time: ${costTime}") - LOG.info(s"batchSuccess.reimport: ${batchSuccess.value}") - LOG.info(s"batchFailure.reimport: ${batchFailure.value}") + val costTime = ((System.currentTimeMillis() - start) / 1000.0).formatted("%.2f") + LOG.info(s">>>>> reimport ngql cost time: ${costTime}") + LOG.info(s">>>>> batchSuccess.reimport: ${batchSuccess.value}") + LOG.info(s">>>>> batchFailure.reimport: ${batchFailure.value}") + totalClientBatchSuccess += batchSuccess.value + totalClientBatchFailure -= batchSuccess.value } spark.close() + LOG.info( + s"\n>>>>>> exchange job finished, cost ${((System.currentTimeMillis() - startTime) / 1000.0) + .formatted("%.2f")}s \n" + + s">>>>>> total client batchSuccess:${totalClientBatchSuccess} \n" + + s">>>>>> total client recordsSuccess:${totalClientRecordSuccess} \n" + + s">>>>>> total client batchFailure:${totalClientBatchFailure} \n" + + s">>>>>> total client recordsFailure:${totalClientRecordFailure} \n" + + s">>>>>> total SST failure:${totalSstRecordFailure} \n" + + s">>>>>> total SST Success:${totalSstRecordSuccess}") } /** @@ -279,54 +335,56 @@ object Exchange { config.category match { case SourceCategory.PARQUET => val parquetConfig = config.asInstanceOf[FileBaseSourceConfigEntry] - LOG.info(s"""Loading Parquet files from ${parquetConfig.path}""") + LOG.info(s""">>>>>> Loading Parquet files from ${parquetConfig.path}""") val reader = new ParquetReader(session, parquetConfig) Some(reader.read()) case SourceCategory.ORC => val orcConfig = config.asInstanceOf[FileBaseSourceConfigEntry] - LOG.info(s"""Loading ORC files from ${orcConfig.path}""") + LOG.info(s""">>>>>> Loading ORC files from ${orcConfig.path}""") val reader = new ORCReader(session, orcConfig) Some(reader.read()) case SourceCategory.JSON => val jsonConfig = config.asInstanceOf[FileBaseSourceConfigEntry] - LOG.info(s"""Loading JSON files from ${jsonConfig.path}""") + LOG.info(s""">>>>>> Loading JSON files from ${jsonConfig.path}""") val reader = new JSONReader(session, jsonConfig) Some(reader.read()) case SourceCategory.CSV => val csvConfig = config.asInstanceOf[FileBaseSourceConfigEntry] - LOG.info(s"""Loading CSV files from ${csvConfig.path}""") + LOG.info(s""">>>>>> Loading CSV files from ${csvConfig.path}""") val reader = new CSVReader(session, csvConfig) Some(reader.read()) case SourceCategory.HIVE => val hiveConfig = config.asInstanceOf[HiveSourceConfigEntry] - LOG.info(s"""Loading from Hive and exec ${hiveConfig.sentence}""") + LOG.info(s""">>>>>> Loading from Hive and exec ${hiveConfig.sentence}""") val reader = new HiveReader(session, hiveConfig) Some(reader.read()) case SourceCategory.KAFKA => { val kafkaConfig = config.asInstanceOf[KafkaSourceConfigEntry] - LOG.info(s"""Loading from Kafka ${kafkaConfig.server} and subscribe ${kafkaConfig.topic}""") + LOG.info( + s""">>>>>> Loading from Kafka ${kafkaConfig.server} and subscribe ${kafkaConfig.topic}""") val reader = new KafkaReader(session, kafkaConfig, fields) Some(reader.read()) } case SourceCategory.NEO4J => val neo4jConfig = config.asInstanceOf[Neo4JSourceConfigEntry] - LOG.info(s"Loading from neo4j com.vesoft.exchange.common.config: ${neo4jConfig}") + LOG.info(s">>>>>> Loading from neo4j com.vesoft.exchange.common.config: ${neo4jConfig}") val reader = new Neo4JReader(session, neo4jConfig) Some(reader.read()) case SourceCategory.MYSQL => val mysqlConfig = config.asInstanceOf[MySQLSourceConfigEntry] - LOG.info(s"Loading from mysql com.vesoft.exchange.common.config: ${mysqlConfig}") + LOG.info(s">>>>>> Loading from mysql com.vesoft.exchange.common.config: ${mysqlConfig}") val reader = new MySQLReader(session, mysqlConfig) Some(reader.read()) case SourceCategory.POSTGRESQL => val postgreConfig = config.asInstanceOf[PostgreSQLSourceConfigEntry] - LOG.info(s"Loading from postgresql com.vesoft.exchange.common.config: ${postgreConfig}") + LOG.info( + s">>>>>> Loading from postgresql com.vesoft.exchange.common.config: ${postgreConfig}") val reader = new PostgreSQLReader(session, postgreConfig) Some(reader.read()) case SourceCategory.PULSAR => val pulsarConfig = config.asInstanceOf[PulsarSourceConfigEntry] - LOG.info(s"Loading from pulsar com.vesoft.exchange.common.config: ${pulsarConfig}") + LOG.info(s">>>>>> Loading from pulsar com.vesoft.exchange.common.config: ${pulsarConfig}") val reader = new PulsarReader(session, pulsarConfig) Some(reader.read()) case SourceCategory.JANUS_GRAPH => @@ -357,7 +415,7 @@ object Exchange { Some(reader.read()) } case _ => { - LOG.error(s"Data source ${config.category} not supported") + LOG.error(s">>>>>> Data source ${config.category} not supported") None } } diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index 1b51bfa7..471e5fbe 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -41,7 +41,9 @@ class EdgeProcessor(spark: SparkSession, nebulaKeys: List[String], config: Configs, batchSuccess: LongAccumulator, - batchFailure: LongAccumulator) + batchFailure: LongAccumulator, + recordSuccess: LongAccumulator, + recordFailure: LongAccumulator) extends Processor { @transient @@ -70,9 +72,11 @@ class EdgeProcessor(spark: SparkSession, val failStatement = writer.writeEdges(edges, edgeConfig.ignoreIndex) if (failStatement == null) { batchSuccess.add(1) + recordSuccess.add(edge.toList.size) } else { errorBuffer.append(failStatement) batchFailure.add(1) + recordFailure.add(edge.toList.size) if (batchFailure.value >= config.errorConfig.errorMaxSize) { throw TooManyErrorsException( s"There are too many failed batches, batch amount: ${batchFailure.value}, " + @@ -396,7 +400,7 @@ class EdgeProcessor(spark: SparkSession, srcBytes) val values = for { - property <- fieldKeys if property.trim.length != 0 + property <- fieldKeys if property.trim.nonEmpty } yield extraValueForSST(row, property, fieldTypeMap) .asInstanceOf[AnyRef] diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index 3de3ff71..1213c764 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -49,7 +49,9 @@ class VerticesProcessor(spark: SparkSession, nebulaKeys: List[String], config: Configs, batchSuccess: LongAccumulator, - batchFailure: LongAccumulator) + batchFailure: LongAccumulator, + recordSuccess: LongAccumulator, + recordFailure: LongAccumulator) extends Processor { @transient @@ -77,9 +79,11 @@ class VerticesProcessor(spark: SparkSession, val failStatement = writer.writeVertices(vertices, tagConfig.ignoreIndex) if (failStatement == null) { batchSuccess.add(1) + recordSuccess.add(vertex.toList.size) } else { errorBuffer.append(failStatement) batchFailure.add(1) + recordFailure.add(vertex.toList.size) if (batchFailure.value >= config.errorConfig.errorMaxSize) { throw TooManyErrorsException( s"There are too many failed batches, batch amount: ${batchFailure.value}, " + @@ -94,7 +98,7 @@ class VerticesProcessor(spark: SparkSession, s"${config.errorConfig.errorPath}/${appId}/${tagConfig.name}.${TaskContext.getPartitionId()}") errorBuffer.clear() } - LOG.info(s"tag ${tagConfig.name} import in spark partition ${TaskContext + LOG.info(s">>>>>> tag ${tagConfig.name} import in spark partition ${TaskContext .getPartitionId()} cost ${System.currentTimeMillis() - startTime} ms") writer.close() graphProvider.close() @@ -314,7 +318,7 @@ class VerticesProcessor(spark: SparkSession, val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id) val values = for { - property <- fieldKeys if property.trim.length != 0 + property <- fieldKeys if property.trim.nonEmpty } yield extraValueForSST(row, property, fieldTypeMap) .asInstanceOf[AnyRef] diff --git a/nebula-exchange_spark_2.2/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala b/nebula-exchange_spark_2.2/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala index d468eda7..cda11f92 100644 --- a/nebula-exchange_spark_2.2/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala +++ b/nebula-exchange_spark_2.2/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala @@ -14,7 +14,16 @@ import com.vesoft.exchange.common.utils.NebulaUtils.DEFAULT_EMPTY_VALUE import com.vesoft.nebula.meta.{ColumnDef, ColumnTypeDef, EdgeItem, Schema, SchemaProp} import org.apache.commons.codec.binary.Hex import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema -import org.apache.spark.sql.types.{BooleanType, DoubleType, IntegerType, LongType, ShortType, StringType, StructField, StructType} +import org.apache.spark.sql.types.{ + BooleanType, + DoubleType, + IntegerType, + LongType, + ShortType, + StringType, + StructField, + StructType +} import org.apache.spark.sql.{DataFrame, Row} import org.junit.Test import org.scalatest.Assertions.assertThrows @@ -57,7 +66,7 @@ class EdgeProcessorSuite { "col14") val processClazz = - new EdgeProcessor(null, data, edgeConfig, fieldKeys, nebulaKeys, config, null, null) + new EdgeProcessor(null, data, edgeConfig, fieldKeys, nebulaKeys, config, null, null, null, null) @Test def isEdgeValidSuite(): Unit = { val stringIdValue = List("Bob", "Tom") @@ -143,27 +152,28 @@ class EdgeProcessorSuite { assert(edge.toString.equals( "Edge: \"1\"->\"2\"@0 values: \"\", \"fixedBob\", 12, 200, 1000, 100000, date(\"2021-01-01\"), datetime(\"2021-01-01T12:00:00.100\"), time(\"12:00:00.100\"), 345436232, true, 12.01, 22.12, ST_GeogFromText(\"POINT(3 8)\")")) - val writeMode = WriteMode.INSERT + val writeMode = WriteMode.INSERT val edgeConfigEntryWithPrefix = EdgeConfigEntry("friend", - null, - null, - fieldKeys, - nebulaKeys, - writeMode, - "src", - None, - "src", - None, - "dst", - None, - "dst", - false, - None, - None, - 10, - 10, - None) - val edgeWithPrefix = processClazz.convertToEdge(row, edgeConfigEntryWithPrefix, true, fieldKeys, map) + null, + null, + fieldKeys, + nebulaKeys, + writeMode, + "src", + None, + "src", + None, + "dst", + None, + "dst", + false, + None, + None, + 10, + 10, + None) + val edgeWithPrefix = + processClazz.convertToEdge(row, edgeConfigEntryWithPrefix, true, fieldKeys, map) assert(edgeWithPrefix.source.equals("\"src_1\"")) assert(edgeWithPrefix.destination.equals("\"dst_2\"")) } diff --git a/nebula-exchange_spark_2.2/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala b/nebula-exchange_spark_2.2/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala index 36526133..bc051336 100644 --- a/nebula-exchange_spark_2.2/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala +++ b/nebula-exchange_spark_2.2/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala @@ -15,7 +15,16 @@ import com.vesoft.nebula.meta.{ColumnDef, ColumnTypeDef, Schema, SchemaProp, Tag import org.apache.commons.codec.binary.Hex import org.apache.log4j.Logger import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema -import org.apache.spark.sql.types.{BooleanType, DoubleType, IntegerType, LongType, ShortType, StringType, StructField, StructType} +import org.apache.spark.sql.types.{ + BooleanType, + DoubleType, + IntegerType, + LongType, + ShortType, + StringType, + StructField, + StructType +} import org.apache.spark.sql.{DataFrame, Row} import org.junit.Test import org.scalatest.Assertions.assertThrows @@ -60,7 +69,16 @@ class VerticesProcessorSuite { "col14") val processClazz = - new VerticesProcessor(null, data, tagConfig, fieldKeys, nebulaKeys, config, null, null) + new VerticesProcessor(null, + data, + tagConfig, + fieldKeys, + nebulaKeys, + config, + null, + null, + null, + null) @Test def isVertexValidSuite(): Unit = { val stringIdValue = List("Bob") @@ -70,7 +88,18 @@ class VerticesProcessorSuite { val intIdRow = new GenericRowWithSchema(intIdValue.toArray, schema) val writeMode = WriteMode.INSERT val tagConfigEntry = - TagConfigEntry("person", null, null, List(), List(), writeMode, "id", None, null, 10, 10, None) + TagConfigEntry("person", + null, + null, + List(), + List(), + writeMode, + "id", + None, + null, + 10, + 10, + None) // test for string id value without policy assert(processClazz.isVertexValid(stringIdRow, tagConfigEntry, false, true)) @@ -139,7 +168,18 @@ class VerticesProcessorSuite { "Vertex ID: \"1\", Values: \"\", \"fixedBob\", 12, 200, 1000, 100000, date(\"2021-01-01\"), datetime(\"2021-01-01T12:00:00.100\"), time(\"12:00:00.100\"), 345436232, true, 12.01, 22.12, ST_GeogFromText(\"POINT(3 8)\")")) val writeMode = WriteMode.INSERT val tagConfigEntryWithPrefix = - TagConfigEntry("person", null, null, List(), List(), writeMode, "id", None, "prefix", 10, 10, None) + TagConfigEntry("person", + null, + null, + List(), + List(), + writeMode, + "id", + None, + "prefix", + 10, + 10, + None) val vertexWithPrefix = processClazz.convertToVertex(row, tagConfigEntryWithPrefix, true, fieldKeys, map) assert(vertexWithPrefix.vertexID.equals("\"prefix_1\"")) diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index e1c0d14a..f4a930dc 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -72,12 +72,12 @@ object Exchange { val c: Argument = options match { case Some(config) => config case _ => - LOG.error("Argument parse failed") + LOG.error(">>>>> Argument parse failed") sys.exit(-1) } val configs = Configs.parse(c.config, c.variable, c.param) - LOG.info(s"Config ${configs}") + LOG.info(s">>>>> Config ${configs}") val session = SparkSession .builder() @@ -95,7 +95,7 @@ object Exchange { if (c.hive) { if (configs.hiveConfigEntry.isEmpty) { LOG.info( - "you don't com.vesoft.exchange.common.config hive source, so using hive tied with spark.") + ">>>>> you don't com.vesoft.exchange.common.config hive source, so using hive tied with spark.") } else { val hiveConfig = configs.hiveConfigEntry.get sparkConf.set("spark.sql.warehouse.dir", hiveConfig.warehouse) @@ -116,17 +116,28 @@ object Exchange { val spark = session.getOrCreate() // check the spark version SparkValidate.validate(spark.version, "2.4.*") + val startTime = System.currentTimeMillis() + var totalClientBatchSuccess: Long = 0L + var totalClientBatchFailure: Long = 0L + var totalClientRecordSuccess: Long = 0L + var totalClientRecordFailure: Long = 0L + var totalSstRecordSuccess: Long = 0l + var totalSstRecordFailure: Long = 0L // reload for failed import tasks - if (!c.reload.isEmpty) { + if (c.reload.nonEmpty) { val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reload") val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reload") + val start = System.currentTimeMillis() val data = spark.read.text(c.reload) val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure) processor.process() - LOG.info(s"batchSuccess.reload: ${batchSuccess.value}") - LOG.info(s"batchFailure.reload: ${batchFailure.value}") + LOG.info(s">>>>> batchSuccess.reload: ${batchSuccess.value}") + LOG.info(s">>>>> batchFailure.reload: ${batchFailure.value}") + LOG.info( + s">>>>> exchange reload job finished, cost:${((System.currentTimeMillis() - start) / 1000.0) + .formatted("%.2f")}s") sys.exit(0) } @@ -136,13 +147,15 @@ object Exchange { // import tags if (configs.tagsConfig.nonEmpty) { for (tagConfig <- configs.tagsConfig) { - LOG.info(s"Processing Tag ${tagConfig.name}") + LOG.info(s">>>>> Processing Tag ${tagConfig.name}") spark.sparkContext.setJobGroup(tagConfig.name, s"Tag: ${tagConfig.name}") + val start = System.currentTimeMillis() + val fieldKeys = tagConfig.fields - LOG.info(s"field keys: ${fieldKeys.mkString(", ")}") + LOG.info(s">>>>> field keys: ${fieldKeys.mkString(", ")}") val nebulaKeys = tagConfig.nebulaFields - LOG.info(s"nebula keys: ${nebulaKeys.mkString(", ")}") + LOG.info(s">>>>> nebula keys: ${nebulaKeys.mkString(", ")}") val fields = tagConfig.vertexField :: tagConfig.fields val data = createDataSource(spark, tagConfig.dataSourceConfigEntry, fields) @@ -155,11 +168,13 @@ object Exchange { } else { data.get } - val startTime = System.currentTimeMillis() + val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}") val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.${tagConfig.name}") + val recordSuccess = spark.sparkContext.longAccumulator(s"recordSuccess.${tagConfig.name}") + val recordFailure = spark.sparkContext.longAccumulator(s"recordFailure.${tagConfig.name}") val processor = new VerticesProcessor( spark, @@ -169,34 +184,49 @@ object Exchange { nebulaKeys, configs, batchSuccess, - batchFailure + batchFailure, + recordSuccess, + recordFailure ) processor.process() - val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f") - LOG.info(s"import for tag ${tagConfig.name}, cost time: ${costTime}s") + val costTime = ((System.currentTimeMillis() - start) / 1000.0).formatted("%.2f") + LOG.info(s">>>>> import for tag ${tagConfig.name}, cost time: ${costTime}s") if (tagConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) { - LOG.info(s"Client-Import: batchSuccess.${tagConfig.name}: ${batchSuccess.value}") - LOG.info(s"Client-Import: batchFailure.${tagConfig.name}: ${batchFailure.value}") + LOG.info(s">>>>> Client-Import: batchSuccess.${tagConfig.name}: ${batchSuccess.value}") + LOG.info( + s">>>>> Client-Import: recordSuccess.${tagConfig.name}: ${recordSuccess.value}") + LOG.info(s">>>>> Client-Import: batchFailure.${tagConfig.name}: ${batchFailure.value}") + LOG.info( + s">>>>> Client-Import: recordFailure.${tagConfig.name}: ${recordFailure.value}") failures += batchFailure.value + totalClientRecordSuccess += recordSuccess.value + totalClientRecordFailure += recordFailure.value + totalClientBatchSuccess += batchSuccess.value + totalClientBatchFailure += batchFailure.value } else { - LOG.info(s"SST-Import: failure.${tagConfig.name}: ${batchFailure.value}") + LOG.info(s">>>>> SST-Import: success.${tagConfig.name}: ${recordSuccess.value}") + LOG.info(s">>>>> SST-Import: failure.${tagConfig.name}: ${recordFailure.value}") + totalSstRecordSuccess += recordSuccess.value + totalSstRecordFailure += recordFailure.value } } } } else { - LOG.warn("Tag is not defined") + LOG.warn(">>>>>> Tag is not defined") } // import edges if (configs.edgesConfig.nonEmpty) { for (edgeConfig <- configs.edgesConfig) { - LOG.info(s"Processing Edge ${edgeConfig.name}") + LOG.info(s">>>>> Processing Edge ${edgeConfig.name}") spark.sparkContext.setJobGroup(edgeConfig.name, s"Edge: ${edgeConfig.name}") + val start = System.currentTimeMillis() + val fieldKeys = edgeConfig.fields - LOG.info(s"field keys: ${fieldKeys.mkString(", ")}") + LOG.info(s">>>>> field keys: ${fieldKeys.mkString(", ")}") val nebulaKeys = edgeConfig.nebulaFields - LOG.info(s"nebula keys: ${nebulaKeys.mkString(", ")}") + LOG.info(s">>>>> nebula keys: ${nebulaKeys.mkString(", ")}") val fields = if (edgeConfig.rankingField.isDefined) { edgeConfig.rankingField.get :: edgeConfig.sourceField :: edgeConfig.targetField :: edgeConfig.fields } else { @@ -215,9 +245,12 @@ object Exchange { df = dataUdf(df, edgeConfig.dstVertexUdf.get) } - val startTime = System.currentTimeMillis() val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${edgeConfig.name}") val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.${edgeConfig.name}") + val recordSuccess = + spark.sparkContext.longAccumulator(s"recordSuccess.${edgeConfig.name}") + val recordFailure = + spark.sparkContext.longAccumulator(s"recordFailure.${edgeConfig.name}") val processor = new EdgeProcessor( spark, @@ -227,41 +260,63 @@ object Exchange { nebulaKeys, configs, batchSuccess, - batchFailure + batchFailure, + recordSuccess, + recordFailure ) processor.process() - val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f") - LOG.info(s"import for edge ${edgeConfig.name}, cost time: ${costTime}s") + val costTime = ((System.currentTimeMillis() - start) / 1000.0).formatted("%.2f") + LOG.info(s">>>>> import for edge ${edgeConfig.name}, cost time: ${costTime}s") if (edgeConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) { - LOG.info(s"Client-Import: batchSuccess.${edgeConfig.name}: ${batchSuccess.value}") - LOG.info(s"Client-Import: batchFailure.${edgeConfig.name}: ${batchFailure.value}") + LOG.info(s">>>>> Client-Import: batchSuccess.${edgeConfig.name}: ${batchSuccess.value}") + LOG.info( + s">>>>> Client-Import: recordSuccess.${edgeConfig.name}: ${recordSuccess.value}") + LOG.info(s">>>>> Client-Import: batchFailure.${edgeConfig.name}: ${batchFailure.value}") + LOG.info( + s">>>>> Client-Import: recordFailure.${edgeConfig.name}: ${recordFailure.value}") failures += batchFailure.value + totalClientRecordSuccess += recordSuccess.value + totalClientRecordFailure += recordFailure.value + totalClientBatchSuccess += batchSuccess.value + totalClientBatchFailure += batchFailure.value } else { - LOG.info(s"SST-Import: failure.${edgeConfig.name}: ${batchFailure.value}") + LOG.info(s">>>>> SST-Import: failure.${edgeConfig.name}: ${recordFailure.value}") + totalSstRecordSuccess += recordSuccess.value + totalSstRecordFailure += recordFailure.value } } } } else { - LOG.warn("Edge is not defined") + LOG.warn(">>>>> Edge is not defined") } // reimport for failed tags and edges val errorPath = s"${configs.errorConfig.errorPath}/${SparkEnv.get.blockManager.conf.getAppId}" if (failures > 0 && ErrorHandler.existError(errorPath)) { spark.sparkContext.setJobGroup("Reload", s"Reload: ${errorPath}") - + val start = System.currentTimeMillis() val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport") val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport") val data = spark.read.text(errorPath) - val startTime = System.currentTimeMillis() val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure) processor.process() - val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f") - LOG.info(s"reimport ngql cost time: ${costTime}") - LOG.info(s"batchSuccess.reimport: ${batchSuccess.value}") - LOG.info(s"batchFailure.reimport: ${batchFailure.value}") + val costTime = ((System.currentTimeMillis() - start) / 1000.0).formatted("%.2f") + LOG.info(s">>>>> reimport ngql cost time: ${costTime}") + LOG.info(s">>>>> batchSuccess.reimport: ${batchSuccess.value}") + LOG.info(s">>>>> batchFailure.reimport: ${batchFailure.value}") + totalClientBatchSuccess += batchSuccess.value + totalClientBatchFailure -= batchSuccess.value } spark.close() + LOG.info( + s"\n>>>>>> exchange job finished, cost ${((System.currentTimeMillis() - startTime) / 1000.0) + .formatted("%.2f")}s \n" + + s">>>>>> total client batchSuccess:${totalClientBatchSuccess} \n" + + s">>>>>> total client recordsSuccess:${totalClientRecordSuccess} \n" + + s">>>>>> total client batchFailure:${totalClientBatchFailure} \n" + + s">>>>>> total client recordsFailure:${totalClientRecordFailure} \n" + + s">>>>>> total SST failure:${totalSstRecordFailure} \n" + + s">>>>>> total SST Success:${totalSstRecordSuccess}") } /** @@ -279,54 +334,56 @@ object Exchange { config.category match { case SourceCategory.PARQUET => val parquetConfig = config.asInstanceOf[FileBaseSourceConfigEntry] - LOG.info(s"""Loading Parquet files from ${parquetConfig.path}""") + LOG.info(s""">>>>> Loading Parquet files from ${parquetConfig.path}""") val reader = new ParquetReader(session, parquetConfig) Some(reader.read()) case SourceCategory.ORC => val orcConfig = config.asInstanceOf[FileBaseSourceConfigEntry] - LOG.info(s"""Loading ORC files from ${orcConfig.path}""") + LOG.info(s""">>>>> Loading ORC files from ${orcConfig.path}""") val reader = new ORCReader(session, orcConfig) Some(reader.read()) case SourceCategory.JSON => val jsonConfig = config.asInstanceOf[FileBaseSourceConfigEntry] - LOG.info(s"""Loading JSON files from ${jsonConfig.path}""") + LOG.info(s""">>>>> Loading JSON files from ${jsonConfig.path}""") val reader = new JSONReader(session, jsonConfig) Some(reader.read()) case SourceCategory.CSV => val csvConfig = config.asInstanceOf[FileBaseSourceConfigEntry] - LOG.info(s"""Loading CSV files from ${csvConfig.path}""") + LOG.info(s""">>>>> Loading CSV files from ${csvConfig.path}""") val reader = new CSVReader(session, csvConfig) Some(reader.read()) case SourceCategory.HIVE => val hiveConfig = config.asInstanceOf[HiveSourceConfigEntry] - LOG.info(s"""Loading from Hive and exec ${hiveConfig.sentence}""") + LOG.info(s""">>>>> Loading from Hive and exec ${hiveConfig.sentence}""") val reader = new HiveReader(session, hiveConfig) Some(reader.read()) case SourceCategory.KAFKA => { val kafkaConfig = config.asInstanceOf[KafkaSourceConfigEntry] - LOG.info(s"""Loading from Kafka ${kafkaConfig.server} and subscribe ${kafkaConfig.topic}""") + LOG.info( + s""">>>>> Loading from Kafka ${kafkaConfig.server} and subscribe ${kafkaConfig.topic}""") val reader = new KafkaReader(session, kafkaConfig, fields) Some(reader.read()) } case SourceCategory.NEO4J => val neo4jConfig = config.asInstanceOf[Neo4JSourceConfigEntry] - LOG.info(s"Loading from neo4j com.vesoft.exchange.common.config: ${neo4jConfig}") + LOG.info(s">>>>> Loading from neo4j com.vesoft.exchange.common.config: ${neo4jConfig}") val reader = new Neo4JReader(session, neo4jConfig) Some(reader.read()) case SourceCategory.MYSQL => val mysqlConfig = config.asInstanceOf[MySQLSourceConfigEntry] - LOG.info(s"Loading from mysql com.vesoft.exchange.common.config: ${mysqlConfig}") + LOG.info(s">>>>> Loading from mysql com.vesoft.exchange.common.config: ${mysqlConfig}") val reader = new MySQLReader(session, mysqlConfig) Some(reader.read()) case SourceCategory.POSTGRESQL => val postgreConfig = config.asInstanceOf[PostgreSQLSourceConfigEntry] - LOG.info(s"Loading from postgresql com.vesoft.exchange.common.config: ${postgreConfig}") + LOG.info( + s">>>>> Loading from postgresql com.vesoft.exchange.common.config: ${postgreConfig}") val reader = new PostgreSQLReader(session, postgreConfig) Some(reader.read()) case SourceCategory.PULSAR => val pulsarConfig = config.asInstanceOf[PulsarSourceConfigEntry] - LOG.info(s"Loading from pulsar com.vesoft.exchange.common.config: ${pulsarConfig}") + LOG.info(s">>>>> Loading from pulsar com.vesoft.exchange.common.config: ${pulsarConfig}") val reader = new PulsarReader(session, pulsarConfig) Some(reader.read()) case SourceCategory.JANUS_GRAPH => @@ -357,7 +414,7 @@ object Exchange { Some(reader.read()) } case _ => { - LOG.error(s"Data source ${config.category} not supported") + LOG.error(s">>>>> Data source ${config.category} not supported") None } } diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index f2425602..b9861a4c 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -42,7 +42,9 @@ class EdgeProcessor(spark: SparkSession, nebulaKeys: List[String], config: Configs, batchSuccess: LongAccumulator, - batchFailure: LongAccumulator) + batchFailure: LongAccumulator, + recordSuccess:LongAccumulator, + recordFailure:LongAccumulator) extends Processor { @transient @@ -71,9 +73,11 @@ class EdgeProcessor(spark: SparkSession, val failStatement = writer.writeEdges(edges, edgeConfig.ignoreIndex) if (failStatement == null) { batchSuccess.add(1) + recordSuccess.add(edge.toList.size) } else { errorBuffer.append(failStatement) batchFailure.add(1) + recordFailure.add(edge.toList.size) if (batchFailure.value >= config.errorConfig.errorMaxSize) { throw TooManyErrorsException( s"There are too many failed batches, batch amount: ${batchFailure.value}, " + @@ -88,7 +92,7 @@ class EdgeProcessor(spark: SparkSession, s"${config.errorConfig.errorPath}/${appId}/${edgeConfig.name}.${TaskContext.getPartitionId}") errorBuffer.clear() } - LOG.info(s"edge ${edgeConfig.name} import in spark partition ${TaskContext + LOG.info(s">>>>> edge ${edgeConfig.name} import in spark partition ${TaskContext .getPartitionId()} cost ${System.currentTimeMillis() - startTime}ms") writer.close() graphProvider.close() @@ -169,7 +173,7 @@ class EdgeProcessor(spark: SparkSession, wStream .foreachBatch((edges, batchId) => { - LOG.info(s"${edgeConfig.name} edge start batch ${batchId}.") + LOG.info(s">>>>> ${edgeConfig.name} edge start batch ${batchId}.") edges.foreachPartition(processEachPartition _) }) .trigger(Trigger.ProcessingTime(s"${streamingDataSourceConfig.intervalSeconds} seconds")) @@ -419,7 +423,7 @@ class EdgeProcessor(spark: SparkSession, srcBytes) val values = for { - property <- fieldKeys if property.trim.length != 0 + property <- fieldKeys if property.trim.nonEmpty } yield extraValueForSST(row, property, fieldTypeMap) .asInstanceOf[AnyRef] diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index 507ceb08..55c3a8a9 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -51,7 +51,9 @@ class VerticesProcessor(spark: SparkSession, nebulaKeys: List[String], config: Configs, batchSuccess: LongAccumulator, - batchFailure: LongAccumulator) + batchFailure: LongAccumulator, + recordSuccess:LongAccumulator, + recordFailure:LongAccumulator) extends Processor { @transient @@ -79,9 +81,11 @@ class VerticesProcessor(spark: SparkSession, val failStatement = writer.writeVertices(vertices, tagConfig.ignoreIndex) if (failStatement == null) { batchSuccess.add(1) + recordSuccess.add(vertex.toList.size) } else { errorBuffer.append(failStatement) batchFailure.add(1) + recordFailure.add(vertex.toList.size) if (batchFailure.value >= config.errorConfig.errorMaxSize) { throw TooManyErrorsException( s"There are too many failed batches, batch amount: ${batchFailure.value}, " + @@ -96,7 +100,7 @@ class VerticesProcessor(spark: SparkSession, s"${config.errorConfig.errorPath}/${appId}/${tagConfig.name}.${TaskContext.getPartitionId()}") errorBuffer.clear() } - LOG.info(s"tag ${tagConfig.name} import in spark partition ${TaskContext + LOG.info(s">>>>> tag ${tagConfig.name} import in spark partition ${TaskContext .getPartitionId()} cost ${System.currentTimeMillis() - startTime} ms") writer.close() graphProvider.close() @@ -181,7 +185,7 @@ class VerticesProcessor(spark: SparkSession, wStream .foreachBatch((vertexSet, batchId) => { - LOG.info(s"${tagConfig.name} tag start batch ${batchId}.") + LOG.info(s">>>>> ${tagConfig.name} tag start batch ${batchId}.") vertexSet.foreachPartition(processEachPartition _) }) .trigger(Trigger.ProcessingTime(s"${streamingDataSourceConfig.intervalSeconds} seconds")) @@ -328,7 +332,7 @@ class VerticesProcessor(spark: SparkSession, val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id) val values = for { - property <- fieldKeys if property.trim.length != 0 + property <- fieldKeys if property.trim.nonEmpty } yield extraValueForSST(row, property, fieldTypeMap) .asInstanceOf[AnyRef] diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala index 505b8443..862c6478 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala @@ -157,9 +157,9 @@ class Neo4JReader(override val session: SparkSession, neo4jConfig: Neo4JSourceCo val offsets = getOffsets(totalCount, neo4jConfig.parallel, neo4jConfig.checkPointPath, neo4jConfig.name) - LOG.info(s"${neo4jConfig.name} offsets: ${offsets.mkString(",")}") + LOG.info(s">>>>> ${neo4jConfig.name} offsets: ${offsets.mkString(",")}") if (offsets.forall(_.size == 0L)) { - LOG.warn(s"${neo4jConfig.name} already write done from check point.") + LOG.warn(s">>>>> ${neo4jConfig.name} already write done from check point.") return session.createDataFrame(session.sparkContext.emptyRDD[Row], new StructType()) } diff --git a/nebula-exchange_spark_2.4/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala b/nebula-exchange_spark_2.4/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala index 509df544..d444b6c6 100644 --- a/nebula-exchange_spark_2.4/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala +++ b/nebula-exchange_spark_2.4/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala @@ -16,7 +16,16 @@ import com.vesoft.nebula.meta.{ColumnDef, ColumnTypeDef, EdgeItem, Schema, Schem import org.apache.commons.codec.binary.Hex import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema -import org.apache.spark.sql.types.{BooleanType, DoubleType, IntegerType, LongType, ShortType, StringType, StructField, StructType} +import org.apache.spark.sql.types.{ + BooleanType, + DoubleType, + IntegerType, + LongType, + ShortType, + StringType, + StructField, + StructType +} import org.junit.Test import org.scalatest.Assertions.assertThrows @@ -58,7 +67,7 @@ class EdgeProcessorSuite { "col14") val processClazz = - new EdgeProcessor(null, data, edgeConfig, fieldKeys, nebulaKeys, config, null, null) + new EdgeProcessor(null, data, edgeConfig, fieldKeys, nebulaKeys, config, null, null, null, null) @Test def isEdgeValidSuite(): Unit = { val stringIdValue = List("Bob", "Tom") diff --git a/nebula-exchange_spark_2.4/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala b/nebula-exchange_spark_2.4/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala index ad030f59..2560784c 100644 --- a/nebula-exchange_spark_2.4/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala +++ b/nebula-exchange_spark_2.4/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala @@ -15,7 +15,16 @@ import com.vesoft.nebula.exchange.processor.VerticesProcessor import com.vesoft.nebula.meta.{ColumnDef, ColumnTypeDef, Schema, SchemaProp, TagItem} import org.apache.commons.codec.binary.Hex import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema -import org.apache.spark.sql.types.{BooleanType, DoubleType, IntegerType, LongType, ShortType, StringType, StructField, StructType} +import org.apache.spark.sql.types.{ + BooleanType, + DoubleType, + IntegerType, + LongType, + ShortType, + StringType, + StructField, + StructType +} import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.junit.Test import org.scalatest.Assertions.assertThrows @@ -58,7 +67,16 @@ class VerticesProcessorSuite { "col14") val processClazz = - new VerticesProcessor(null, data, tagConfig, fieldKeys, nebulaKeys, config, null, null) + new VerticesProcessor(null, + data, + tagConfig, + fieldKeys, + nebulaKeys, + config, + null, + null, + null, + null) @Test def isVertexValidSuite(): Unit = { val stringIdValue = List("Bob") @@ -68,7 +86,18 @@ class VerticesProcessorSuite { val intIdRow = new GenericRowWithSchema(intIdValue.toArray, schema) val writeMode = WriteMode.INSERT val tagConfigEntry = - TagConfigEntry("person", null, null, List(), List(), writeMode, "id", None, null, 10, 10, None) + TagConfigEntry("person", + null, + null, + List(), + List(), + writeMode, + "id", + None, + null, + 10, + 10, + None) // test for string id value without policy assert(processClazz.isVertexValid(stringIdRow, tagConfigEntry, false, true)) diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 62addea5..cfc8de0e 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -72,12 +72,12 @@ object Exchange { val c: Argument = options match { case Some(config) => config case _ => - LOG.error("Argument parse failed") + LOG.error(">>>>> Argument parse failed") sys.exit(-1) } val configs = Configs.parse(c.config, c.variable, c.param) - LOG.info(s"Config ${configs}") + LOG.info(s">>>>> Config ${configs}") val session = SparkSession .builder() @@ -95,7 +95,7 @@ object Exchange { if (c.hive) { if (configs.hiveConfigEntry.isEmpty) { LOG.info( - "you don't com.vesoft.exchange.common.config hive source, so using hive tied with spark.") + ">>>>> you don't com.vesoft.exchange.common.config hive source, so using hive tied with spark.") } else { val hiveConfig = configs.hiveConfigEntry.get sparkConf.set("spark.sql.warehouse.dir", hiveConfig.warehouse) @@ -116,17 +116,30 @@ object Exchange { val spark = session.getOrCreate() // check the spark version SparkValidate.validate(spark.version, "3.0.*", "3.1.*", "3.2.*", "3.3.*") + val startTime = System.currentTimeMillis() + var totalClientBatchSuccess: Long = 0L + var totalClientBatchFailure: Long = 0L + var totalClientRecordSuccess: Long = 0L + var totalClientRecordFailure: Long = 0L + var totalSstRecordSuccess: Long = 0l + var totalSstRecordFailure: Long = 0L // reload for failed import tasks - if (!c.reload.isEmpty) { + if (c.reload.nonEmpty) { val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reload") val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reload") + val start = System.currentTimeMillis() val data = spark.read.text(c.reload) val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure) processor.process() - LOG.info(s"batchSuccess.reload: ${batchSuccess.value}") - LOG.info(s"batchFailure.reload: ${batchFailure.value}") + LOG.info(s">>>>> batchSuccess.reload: ${batchSuccess.value}") + LOG.info(s">>>>> batchFailure.reload: ${batchFailure.value}") + LOG.info(s">>>>> batchSuccess.reload: ${batchSuccess.value}") + LOG.info(s">>>>> batchFailure.reload: ${batchFailure.value}") + LOG.info( + s">>>>> exchange reload job finished, cost:${((System.currentTimeMillis() - start) / 1000.0) + .formatted("%.2f")}s") sys.exit(0) } @@ -136,13 +149,15 @@ object Exchange { // import tags if (configs.tagsConfig.nonEmpty) { for (tagConfig <- configs.tagsConfig) { - LOG.info(s"Processing Tag ${tagConfig.name}") + LOG.info(s">>>>> Processing Tag ${tagConfig.name}") spark.sparkContext.setJobGroup(tagConfig.name, s"Tag: ${tagConfig.name}") + val start = System.currentTimeMillis() + val fieldKeys = tagConfig.fields - LOG.info(s"field keys: ${fieldKeys.mkString(", ")}") + LOG.info(s">>>>> field keys: ${fieldKeys.mkString(", ")}") val nebulaKeys = tagConfig.nebulaFields - LOG.info(s"nebula keys: ${nebulaKeys.mkString(", ")}") + LOG.info(s">>>>> nebula keys: ${nebulaKeys.mkString(", ")}") val fields = tagConfig.vertexField :: tagConfig.fields val data = createDataSource(spark, tagConfig.dataSourceConfigEntry, fields) @@ -155,11 +170,12 @@ object Exchange { } else { data.get } - val startTime = System.currentTimeMillis() val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}") val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.${tagConfig.name}") + val recordSuccess = spark.sparkContext.longAccumulator(s"recordSuccess.${tagConfig.name}") + val recordFailure = spark.sparkContext.longAccumulator(s"recordFailure.${tagConfig.name}") val processor = new VerticesProcessor( spark, @@ -169,34 +185,49 @@ object Exchange { nebulaKeys, configs, batchSuccess, - batchFailure + batchFailure, + recordSuccess, + recordFailure ) processor.process() - val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f") - LOG.info(s"import for tag ${tagConfig.name}, cost time: ${costTime}s") + val costTime = ((System.currentTimeMillis() - start) / 1000.0).formatted("%.2f") + LOG.info(s">>>>> import for tag ${tagConfig.name}, cost time: ${costTime}s") if (tagConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) { - LOG.info(s"Client-Import: batchSuccess.${tagConfig.name}: ${batchSuccess.value}") - LOG.info(s"Client-Import: batchFailure.${tagConfig.name}: ${batchFailure.value}") + LOG.info(s">>>>> Client-Import: batchSuccess.${tagConfig.name}: ${batchSuccess.value}") + LOG.info( + s">>>>> Client-Import: recordSuccess.${tagConfig.name}: ${recordSuccess.value}") + LOG.info(s">>>>> Client-Import: batchFailure.${tagConfig.name}: ${batchFailure.value}") + LOG.info( + s">>>>> Client-Import: recordFailure.${tagConfig.name}: ${recordFailure.value}") failures += batchFailure.value + totalClientRecordSuccess += recordSuccess.value + totalClientRecordFailure += recordFailure.value + totalClientBatchSuccess += batchSuccess.value + totalClientBatchFailure += batchFailure.value } else { - LOG.info(s"SST-Import: failure.${tagConfig.name}: ${batchFailure.value}") + LOG.info(s">>>>> SST-Import: success.${tagConfig.name}: ${recordSuccess.value}") + LOG.info(s">>>>> SST-Import: failure.${tagConfig.name}: ${recordFailure.value}") + totalSstRecordSuccess += recordSuccess.value + totalSstRecordFailure += recordFailure.value } } } } else { - LOG.warn("Tag is not defined") + LOG.warn(">>>>> Tag is not defined") } // import edges if (configs.edgesConfig.nonEmpty) { for (edgeConfig <- configs.edgesConfig) { - LOG.info(s"Processing Edge ${edgeConfig.name}") + LOG.info(s">>>>> Processing Edge ${edgeConfig.name}") spark.sparkContext.setJobGroup(edgeConfig.name, s"Edge: ${edgeConfig.name}") + val start = System.currentTimeMillis() + val fieldKeys = edgeConfig.fields - LOG.info(s"field keys: ${fieldKeys.mkString(", ")}") + LOG.info(s">>>>> field keys: ${fieldKeys.mkString(", ")}") val nebulaKeys = edgeConfig.nebulaFields - LOG.info(s"nebula keys: ${nebulaKeys.mkString(", ")}") + LOG.info(s">>>>> nebula keys: ${nebulaKeys.mkString(", ")}") val fields = if (edgeConfig.rankingField.isDefined) { edgeConfig.rankingField.get :: edgeConfig.sourceField :: edgeConfig.targetField :: edgeConfig.fields } else { @@ -215,9 +246,12 @@ object Exchange { df = dataUdf(df, edgeConfig.dstVertexUdf.get) } - val startTime = System.currentTimeMillis() val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${edgeConfig.name}") val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.${edgeConfig.name}") + val recordSuccess = + spark.sparkContext.longAccumulator(s"recordSuccess.${edgeConfig.name}") + val recordFailure = + spark.sparkContext.longAccumulator(s"recordFailure.${edgeConfig.name}") val processor = new EdgeProcessor( spark, @@ -227,41 +261,63 @@ object Exchange { nebulaKeys, configs, batchSuccess, - batchFailure + batchFailure, + recordSuccess, + recordFailure ) processor.process() val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f") - LOG.info(s"import for edge ${edgeConfig.name}, cost time: ${costTime}s") + LOG.info(s">>>>> import for edge ${edgeConfig.name}, cost time: ${costTime}s") if (edgeConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) { - LOG.info(s"Client-Import: batchSuccess.${edgeConfig.name}: ${batchSuccess.value}") - LOG.info(s"Client-Import: batchFailure.${edgeConfig.name}: ${batchFailure.value}") + LOG.info(s">>>>> Client-Import: batchSuccess.${edgeConfig.name}: ${batchSuccess.value}") + LOG.info( + s">>>>> Client-Import: recordSuccess.${edgeConfig.name}: ${recordSuccess.value}") + LOG.info(s">>>>> Client-Import: batchFailure.${edgeConfig.name}: ${batchFailure.value}") + LOG.info( + s">>>>> Client-Import: recordFailure.${edgeConfig.name}: ${recordFailure.value}") failures += batchFailure.value + totalClientRecordSuccess += recordSuccess.value + totalClientRecordFailure += recordFailure.value + totalClientBatchSuccess += batchSuccess.value + totalClientBatchFailure += batchFailure.value } else { - LOG.info(s"SST-Import: failure.${edgeConfig.name}: ${batchFailure.value}") + LOG.info(s">>>>> SST-Import: failure.${edgeConfig.name}: ${recordFailure.value}") + totalSstRecordSuccess += recordSuccess.value + totalSstRecordFailure += recordFailure.value } } } } else { - LOG.warn("Edge is not defined") + LOG.warn(">>>>> Edge is not defined") } // reimport for failed tags and edges val errorPath = s"${configs.errorConfig.errorPath}/${SparkEnv.get.blockManager.conf.getAppId}" if (failures > 0 && ErrorHandler.existError(errorPath)) { spark.sparkContext.setJobGroup("Reload", s"Reload: ${errorPath}") - + val start = System.currentTimeMillis() val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport") val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport") val data = spark.read.text(errorPath) - val startTime = System.currentTimeMillis() val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure) processor.process() - val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f") - LOG.info(s"reimport ngql cost time: ${costTime}") - LOG.info(s"batchSuccess.reimport: ${batchSuccess.value}") - LOG.info(s"batchFailure.reimport: ${batchFailure.value}") + val costTime = ((System.currentTimeMillis() - start) / 1000.0).formatted("%.2f") + LOG.info(s">>>>> reimport ngql cost time: ${costTime}") + LOG.info(s">>>>> batchSuccess.reimport: ${batchSuccess.value}") + LOG.info(s">>>>> batchFailure.reimport: ${batchFailure.value}") + totalClientBatchSuccess += batchSuccess.value + totalClientBatchFailure -= batchSuccess.value } spark.close() + LOG.info( + s"\n>>>>>> exchange job finished, cost ${((System.currentTimeMillis() - startTime) / 1000.0) + .formatted("%.2f")}s \n" + + s">>>>>> total client batchSuccess:${totalClientBatchSuccess} \n" + + s">>>>>> total client recordsSuccess:${totalClientRecordSuccess} \n" + + s">>>>>> total client batchFailure:${totalClientBatchFailure} \n" + + s">>>>>> total client recordsFailure:${totalClientRecordFailure} \n" + + s">>>>>> total SST failure:${totalSstRecordFailure} \n" + + s">>>>>> total SST Success:${totalSstRecordSuccess}") } /** @@ -279,54 +335,54 @@ object Exchange { config.category match { case SourceCategory.PARQUET => val parquetConfig = config.asInstanceOf[FileBaseSourceConfigEntry] - LOG.info(s"""Loading Parquet files from ${parquetConfig.path}""") + LOG.info(s""">>>>> Loading Parquet files from ${parquetConfig.path}""") val reader = new ParquetReader(session, parquetConfig) Some(reader.read()) case SourceCategory.ORC => val orcConfig = config.asInstanceOf[FileBaseSourceConfigEntry] - LOG.info(s"""Loading ORC files from ${orcConfig.path}""") + LOG.info(s""">>>>> Loading ORC files from ${orcConfig.path}""") val reader = new ORCReader(session, orcConfig) Some(reader.read()) case SourceCategory.JSON => val jsonConfig = config.asInstanceOf[FileBaseSourceConfigEntry] - LOG.info(s"""Loading JSON files from ${jsonConfig.path}""") + LOG.info(s""">>>>> Loading JSON files from ${jsonConfig.path}""") val reader = new JSONReader(session, jsonConfig) Some(reader.read()) case SourceCategory.CSV => val csvConfig = config.asInstanceOf[FileBaseSourceConfigEntry] - LOG.info(s"""Loading CSV files from ${csvConfig.path}""") + LOG.info(s""">>>>> Loading CSV files from ${csvConfig.path}""") val reader = new CSVReader(session, csvConfig) Some(reader.read()) case SourceCategory.HIVE => val hiveConfig = config.asInstanceOf[HiveSourceConfigEntry] - LOG.info(s"""Loading from Hive and exec ${hiveConfig.sentence}""") + LOG.info(s""">>>>> Loading from Hive and exec ${hiveConfig.sentence}""") val reader = new HiveReader(session, hiveConfig) Some(reader.read()) case SourceCategory.KAFKA => { val kafkaConfig = config.asInstanceOf[KafkaSourceConfigEntry] - LOG.info(s"""Loading from Kafka ${kafkaConfig.server} and subscribe ${kafkaConfig.topic}""") + LOG.info(s""">>>>> Loading from Kafka ${kafkaConfig.server} and subscribe ${kafkaConfig.topic}""") val reader = new KafkaReader(session, kafkaConfig, fields) Some(reader.read()) } case SourceCategory.NEO4J => val neo4jConfig = config.asInstanceOf[Neo4JSourceConfigEntry] - LOG.info(s"Loading from neo4j com.vesoft.exchange.common.config: ${neo4jConfig}") + LOG.info(s">>>>> Loading from neo4j com.vesoft.exchange.common.config: ${neo4jConfig}") val reader = new Neo4JReader(session, neo4jConfig) Some(reader.read()) case SourceCategory.MYSQL => val mysqlConfig = config.asInstanceOf[MySQLSourceConfigEntry] - LOG.info(s"Loading from mysql com.vesoft.exchange.common.config: ${mysqlConfig}") + LOG.info(s">>>>> Loading from mysql com.vesoft.exchange.common.config: ${mysqlConfig}") val reader = new MySQLReader(session, mysqlConfig) Some(reader.read()) case SourceCategory.POSTGRESQL => val postgreConfig = config.asInstanceOf[PostgreSQLSourceConfigEntry] - LOG.info(s"Loading from postgre com.vesoft.exchange.common.config: ${postgreConfig}") + LOG.info(s">>>>> Loading from postgre com.vesoft.exchange.common.config: ${postgreConfig}") val reader = new PostgreSQLReader(session, postgreConfig) Some(reader.read()) case SourceCategory.PULSAR => val pulsarConfig = config.asInstanceOf[PulsarSourceConfigEntry] - LOG.info(s"Loading from pulsar com.vesoft.exchange.common.config: ${pulsarConfig}") + LOG.info(s">>>>> Loading from pulsar com.vesoft.exchange.common.config: ${pulsarConfig}") val reader = new PulsarReader(session, pulsarConfig) Some(reader.read()) case SourceCategory.JANUS_GRAPH => @@ -357,7 +413,7 @@ object Exchange { Some(reader.read()) } case _ => { - LOG.error(s"Data source ${config.category} not supported") + LOG.error(s">>>>> Data source ${config.category} not supported") None } } diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index 0b3c4629..334274b6 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -42,7 +42,9 @@ class EdgeProcessor(spark: SparkSession, nebulaKeys: List[String], config: Configs, batchSuccess: LongAccumulator, - batchFailure: LongAccumulator) + batchFailure: LongAccumulator, + recordSuccess:LongAccumulator, + recordFailure:LongAccumulator) extends Processor { @transient @@ -71,9 +73,12 @@ class EdgeProcessor(spark: SparkSession, val failStatement = writer.writeEdges(edges, edgeConfig.ignoreIndex) if (failStatement == null) { batchSuccess.add(1) + recordSuccess.add(edge.toList.size) } else { errorBuffer.append(failStatement) batchFailure.add(1) + recordFailure.add(edge.toList.size) + if (batchFailure.value >= config.errorConfig.errorMaxSize) { throw TooManyErrorsException( s"There are too many failed batches, batch amount: ${batchFailure.value}, " + @@ -88,7 +93,7 @@ class EdgeProcessor(spark: SparkSession, s"${config.errorConfig.errorPath}/${appId}/${edgeConfig.name}.${TaskContext.getPartitionId}") errorBuffer.clear() } - LOG.info(s"edge ${edgeConfig.name} import in spark partition ${TaskContext + LOG.info(s">>>>> edge ${edgeConfig.name} import in spark partition ${TaskContext .getPartitionId()} cost ${System.currentTimeMillis() - startTime}ms") writer.close() graphProvider.close() @@ -168,7 +173,7 @@ class EdgeProcessor(spark: SparkSession, wStream .foreachBatch((edges: Dataset[Edge], batchId: Long) => { - LOG.info(s"${edgeConfig.name} edge start batch ${batchId}.") + LOG.info(s">>>>> ${edgeConfig.name} edge start batch ${batchId}.") edges.foreachPartition(processEachPartition _) }) .trigger(Trigger.ProcessingTime(s"${streamingDataSourceConfig.intervalSeconds} seconds")) @@ -418,7 +423,7 @@ class EdgeProcessor(spark: SparkSession, srcBytes) val values = for { - property <- fieldKeys if property.trim.length != 0 + property <- fieldKeys if property.trim.nonEmpty } yield extraValueForSST(row, property, fieldTypeMap) .asInstanceOf[AnyRef] diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index ee9a0958..1fe43ccb 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -51,7 +51,9 @@ class VerticesProcessor(spark: SparkSession, nebulaKeys: List[String], config: Configs, batchSuccess: LongAccumulator, - batchFailure: LongAccumulator) + batchFailure: LongAccumulator, + recordSuccess: LongAccumulator, + recordFailure: LongAccumulator) extends Processor { @transient @@ -79,9 +81,11 @@ class VerticesProcessor(spark: SparkSession, val failStatement = writer.writeVertices(vertices, tagConfig.ignoreIndex) if (failStatement == null) { batchSuccess.add(1) + recordSuccess.add(vertex.toList.size) } else { errorBuffer.append(failStatement) batchFailure.add(1) + recordFailure.add(vertex.toList.size) if (batchFailure.value >= config.errorConfig.errorMaxSize) { throw TooManyErrorsException( s"There are too many failed batches, batch amount: ${batchFailure.value}, " + @@ -96,7 +100,7 @@ class VerticesProcessor(spark: SparkSession, s"${config.errorConfig.errorPath}/${appId}/${tagConfig.name}.${TaskContext.getPartitionId()}") errorBuffer.clear() } - LOG.info(s"tag ${tagConfig.name} import in spark partition ${TaskContext + LOG.info(s">>>>> tag ${tagConfig.name} import in spark partition ${TaskContext .getPartitionId()} cost ${System.currentTimeMillis() - startTime} ms") writer.close() graphProvider.close() @@ -186,7 +190,7 @@ class VerticesProcessor(spark: SparkSession, wStream .foreachBatch((vertexSet: Dataset[Vertex], batchId: Long) => { - LOG.info(s"${tagConfig.name} tag start batch ${batchId}.") + LOG.info(s">>>>> ${tagConfig.name} tag start batch ${batchId}.") vertexSet.foreachPartition(processEachPartition _) }) .trigger(Trigger.ProcessingTime(s"${streamingDataSourceConfig.intervalSeconds} seconds")) @@ -333,7 +337,7 @@ class VerticesProcessor(spark: SparkSession, val vertexKey = codec.vertexKey(spaceVidLen, partitionId, vidBytes, tagItem.getTag_id) val values = for { - property <- fieldKeys if property.trim.length != 0 + property <- fieldKeys if property.trim.nonEmpty } yield extraValueForSST(row, property, fieldTypeMap) .asInstanceOf[AnyRef] diff --git a/nebula-exchange_spark_3.0/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala b/nebula-exchange_spark_3.0/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala index e7cea2cb..32665e64 100644 --- a/nebula-exchange_spark_3.0/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala +++ b/nebula-exchange_spark_3.0/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala @@ -14,7 +14,16 @@ import com.vesoft.exchange.common.utils.NebulaUtils.DEFAULT_EMPTY_VALUE import com.vesoft.nebula.meta.{ColumnDef, ColumnTypeDef, EdgeItem, Schema, SchemaProp} import org.apache.commons.codec.binary.Hex import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema -import org.apache.spark.sql.types.{BooleanType, DoubleType, IntegerType, LongType, ShortType, StringType, StructField, StructType} +import org.apache.spark.sql.types.{ + BooleanType, + DoubleType, + IntegerType, + LongType, + ShortType, + StringType, + StructField, + StructType +} import org.apache.spark.sql.{DataFrame, Row} import org.junit.Test import org.scalatest.Assertions.assertThrows @@ -57,7 +66,7 @@ class EdgeProcessorSuite { "col14") val processClazz = - new EdgeProcessor(null, data, edgeConfig, fieldKeys, nebulaKeys, config, null, null) + new EdgeProcessor(null, data, edgeConfig, fieldKeys, nebulaKeys, config, null, null, null, null) @Test def isEdgeValidSuite(): Unit = { val stringIdValue = List("Bob", "Tom") diff --git a/nebula-exchange_spark_3.0/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala b/nebula-exchange_spark_3.0/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala index 205545bc..54e39d29 100644 --- a/nebula-exchange_spark_3.0/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala +++ b/nebula-exchange_spark_3.0/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala @@ -14,7 +14,16 @@ import com.vesoft.exchange.common.utils.NebulaUtils.DEFAULT_EMPTY_VALUE import com.vesoft.nebula.meta.{ColumnDef, ColumnTypeDef, Schema, SchemaProp, TagItem} import org.apache.commons.codec.binary.Hex import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema -import org.apache.spark.sql.types.{BooleanType, DoubleType, IntegerType, LongType, ShortType, StringType, StructField, StructType} +import org.apache.spark.sql.types.{ + BooleanType, + DoubleType, + IntegerType, + LongType, + ShortType, + StringType, + StructField, + StructType +} import org.apache.spark.sql.{DataFrame, Row} import org.junit.Test import org.scalatest.Assertions.assertThrows @@ -57,7 +66,16 @@ class VerticesProcessorSuite { "col14") val processClazz = - new VerticesProcessor(null, data, tagConfig, fieldKeys, nebulaKeys, config, null, null) + new VerticesProcessor(null, + data, + tagConfig, + fieldKeys, + nebulaKeys, + config, + null, + null, + null, + null) @Test def isVertexValidSuite(): Unit = { val stringIdValue = List("Bob") @@ -65,9 +83,20 @@ class VerticesProcessorSuite { val schema: StructType = StructType(List(StructField("id", StringType, nullable = true))) val stringIdRow = new GenericRowWithSchema(stringIdValue.toArray, schema) val intIdRow = new GenericRowWithSchema(intIdValue.toArray, schema) - val writeMode = WriteMode.INSERT + val writeMode = WriteMode.INSERT val tagConfigEntry = - TagConfigEntry("person", null, null, List(), List(), writeMode, "id", None, null, 10, 10, None) + TagConfigEntry("person", + null, + null, + List(), + List(), + writeMode, + "id", + None, + null, + 10, + 10, + None) // test for string id value without policy assert(processClazz.isVertexValid(stringIdRow, tagConfigEntry, false, true))