From b8e09b9ad0203518cb30efdac4a6056262a01c0a Mon Sep 17 00:00:00 2001 From: Nicole00 <16240361+Nicole00@users.noreply.github.com> Date: Wed, 16 Jun 2021 19:14:40 +0800 Subject: [PATCH 1/5] clean the error path before reload --- .../vesoft/nebula/exchange/ErrorHandler.scala | 12 ++++++ .../exchange/processor/ReloadProcessor.scala | 37 +++++++++++++------ .../exchange/writer/ServerBaseWriter.scala | 18 +++++++++ 3 files changed, 55 insertions(+), 12 deletions(-) diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/ErrorHandler.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/ErrorHandler.scala index 056d6545..dfdd44a5 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/ErrorHandler.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/ErrorHandler.scala @@ -11,6 +11,18 @@ import org.apache.hadoop.fs.{FileSystem, Path} import scala.collection.mutable.ArrayBuffer object ErrorHandler { + + /** + * clean all the failed data for error path before reload. + */ + def clear(path: String): Unit = { + val fileSystem = FileSystem.get(new Configuration()) + fileSystem.removeAcl(new Path(path)) + } + + /** + * save the failed execute statement. + */ def save(buffer: ArrayBuffer[String], path: String): Unit = { val fileSystem = FileSystem.get(new Configuration()) val errors = fileSystem.create(new Path(path)) diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/ReloadProcessor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/ReloadProcessor.scala index 2bc353b3..b91d8098 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/ReloadProcessor.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/ReloadProcessor.scala @@ -8,6 +8,8 @@ package com.vesoft.nebula.exchange.processor import com.vesoft.nebula.exchange.{ErrorHandler, GraphProvider} import com.vesoft.nebula.exchange.config.Configs +import com.vesoft.nebula.exchange.writer.NebulaGraphClientWriter +import org.apache.log4j.Logger import org.apache.spark.TaskContext import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.util.LongAccumulator @@ -19,6 +21,7 @@ class ReloadProcessor(data: DataFrame, batchSuccess: LongAccumulator, batchFailure: LongAccumulator) extends Processor { + private val LOG = Logger.getLogger(this.getClass) override def process(): Unit = { data.foreachPartition(processEachPartition(_)) @@ -27,27 +30,37 @@ class ReloadProcessor(data: DataFrame, private def processEachPartition(iterator: Iterator[Row]): Unit = { val graphProvider = new GraphProvider(config.databaseConfig.getGraphAddress, config.connectionConfig.timeout) - val session = graphProvider.getGraphClient(config.userConfig) - if (session == null) { - throw new IllegalArgumentException("connect to graph failed.") - } + + val writer = new NebulaGraphClientWriter(config.databaseConfig, + config.userConfig, + config.rateConfig, + null, + graphProvider) val errorBuffer = ArrayBuffer[String]() + ErrorHandler.clear(config.errorConfig.errorPath) - iterator.foreach(row => { - val exec = row.getString(0) - val result = session.execute(exec) - if (result == null || !result.isSucceeded) { - errorBuffer.append(exec) - batchFailure.add(1) - } else { + writer.prepare() + // batch write tags + val startTime = System.currentTimeMillis + iterator.foreach { row => + val failStatement = writer.writeNgql(row.getString(0)) + if (failStatement == null) { batchSuccess.add(1) + } else { + errorBuffer.append(failStatement) + batchFailure.add(1) } - }) + } if (errorBuffer.nonEmpty) { ErrorHandler.save(errorBuffer, s"${config.errorConfig.errorPath}/reload.${TaskContext.getPartitionId()}") errorBuffer.clear() } + LOG.info( + s"spark partition for vertex reload time:" + + s"${TaskContext.getPartitionId()}-${System.currentTimeMillis() - startTime}") + writer.close() + graphProvider.close() } } diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/writer/ServerBaseWriter.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/writer/ServerBaseWriter.scala index be25ba5d..cd674639 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/writer/ServerBaseWriter.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/writer/ServerBaseWriter.scala @@ -111,6 +111,8 @@ abstract class ServerBaseWriter extends Writer { def writeVertices(vertices: Vertices): String def writeEdges(edges: Edges): String + + def writeNgql(ngql: String): String } /** @@ -173,6 +175,20 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry, sentence } + override def writeNgql(ngql: String): String = { + if (rateLimiter.tryAcquire(rateConfig.timeout, TimeUnit.MILLISECONDS)) { + val result = graphProvider.submit(session, ngql) + if (result.isSucceeded) { + return null + } + LOG.error(s"rewrite ngql failed for ${result.getErrorMessage}") + } else { + LOG.error(s"rewrite ngql failed because write speed is too fast") + } + LOG.info(ngql) + ngql + } + override def close(): Unit = { graphProvider.releaseGraphClient(session) } @@ -241,5 +257,7 @@ class NebulaStorageClientWriter(addresses: List[(String, Int)], space: String) override def writeEdges(edges: Edges): String = ??? + override def writeNgql(ngql: String): String = ??? + override def close(): Unit = {} } From b088ec35ab9550dc311adc596e8f348ca1126b03 Mon Sep 17 00:00:00 2001 From: Nicole00 <16240361+Nicole00@users.noreply.github.com> Date: Fri, 25 Jun 2021 16:42:12 +0800 Subject: [PATCH 2/5] add more comment --- .../main/scala/com/vesoft/nebula/exchange/ErrorHandler.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/ErrorHandler.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/ErrorHandler.scala index dfdd44a5..7a274400 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/ErrorHandler.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/ErrorHandler.scala @@ -37,6 +37,9 @@ object ErrorHandler { } } + /** + * check if path exists + */ def existError(path: String): Boolean = { val fileSystem = FileSystem.get(new Configuration()) fileSystem.exists(new Path(path)) From 1af6bdea2b1bcb94815ea35212a57750e946d985 Mon Sep 17 00:00:00 2001 From: Nicole00 <16240361+Nicole00@users.noreply.github.com> Date: Wed, 30 Jun 2021 15:22:20 +0800 Subject: [PATCH 3/5] optimize reload --- .../src/main/resources/application.conf | 4 ++-- .../vesoft/nebula/exchange/ErrorHandler.scala | 21 +++++++++++++++++-- .../com/vesoft/nebula/exchange/Exchange.scala | 10 +++++++-- .../exchange/processor/EdgeProcessor.scala | 5 ++--- .../exchange/processor/ReloadProcessor.scala | 12 +++++------ .../processor/VerticesProcessor.scala | 5 ++--- .../exchange/writer/ServerBaseWriter.scala | 4 ++-- 7 files changed, 41 insertions(+), 20 deletions(-) diff --git a/nebula-exchange/src/main/resources/application.conf b/nebula-exchange/src/main/resources/application.conf index a64d625b..288b34c0 100644 --- a/nebula-exchange/src/main/resources/application.conf +++ b/nebula-exchange/src/main/resources/application.conf @@ -39,8 +39,8 @@ graph:["127.0.0.1:9669"] meta:["127.0.0.1:9559"] } - user: user - pswd: password + user: root + pswd: nebula space: test # parameters for SST import, not required diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/ErrorHandler.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/ErrorHandler.scala index 7a274400..8803544a 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/ErrorHandler.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/ErrorHandler.scala @@ -6,8 +6,11 @@ package com.vesoft.nebula.exchange +import java.io.File + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} + import scala.collection.mutable.ArrayBuffer object ErrorHandler { @@ -16,8 +19,22 @@ object ErrorHandler { * clean all the failed data for error path before reload. */ def clear(path: String): Unit = { - val fileSystem = FileSystem.get(new Configuration()) - fileSystem.removeAcl(new Path(path)) + if (path.startsWith("hdfs://")) { + val fileSystem = FileSystem.get(new Configuration()) + fileSystem.removeAcl(new Path(path)) + } else { + val directory = new File(path) + if (directory.exists()) { + val content: Array[String] = directory.list() + for (fileName <- content) { + if (!fileName.startsWith("reload.")) { + val tmp = new File(path, fileName) + tmp.delete() + } + } + } + } + } /** diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 696d34a7..c1ba0379 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -117,6 +117,9 @@ object Exchange { sys.exit(0) } + // record the failed batch number + var failures: Long = 0L + // import tags if (configs.tagsConfig.nonEmpty) { for (tagConfig <- configs.tagsConfig) { @@ -146,6 +149,7 @@ object Exchange { if (tagConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) { LOG.info(s"Client-Import: batchSuccess.${tagConfig.name}: ${batchSuccess.value}") LOG.info(s"Client-Import: batchFailure.${tagConfig.name}: ${batchFailure.value}") + failures += batchFailure.value } else { LOG.info(s"SST-Import: failure.${tagConfig.name}: ${batchFailure.value}") } @@ -182,6 +186,7 @@ object Exchange { if (edgeConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) { LOG.info(s"Client-Import: batchSuccess.${edgeConfig.name}: ${batchSuccess.value}") LOG.info(s"Client-Import: batchFailure.${edgeConfig.name}: ${batchFailure.value}") + failures += batchFailure.value } else { LOG.info(s"SST-Import: failure.${edgeConfig.name}: ${batchFailure.value}") } @@ -192,11 +197,12 @@ object Exchange { } // reimport for failed tags and edges - if (ErrorHandler.existError(configs.errorConfig.errorPath)) { + if (failures > 0 && ErrorHandler.existError(configs.errorConfig.errorPath)) { val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport") val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport") val data = spark.read.text(configs.errorConfig.errorPath) - val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure) + data.count() + val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure) processor.process() LOG.info(s"batchSuccess.reimport: ${batchSuccess.value}") LOG.info(s"batchFailure.reimport: ${batchFailure.value}") diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index 91a98bfa..ab82d635 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -85,9 +85,8 @@ class EdgeProcessor(data: DataFrame, s"${config.errorConfig.errorPath}/${edgeConfig.name}.${TaskContext.getPartitionId}") errorBuffer.clear() } - LOG.info( - s"spark partition for edge cost time:" + - s"${TaskContext.getPartitionId()}-${System.currentTimeMillis() - startTime}") + LOG.info(s"edge ${edgeConfig.name} import in spark partition ${TaskContext + .getPartitionId()} cost ${System.currentTimeMillis() - startTime}ms") writer.close() graphProvider.close() } diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/ReloadProcessor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/ReloadProcessor.scala index b91d8098..62fedd9d 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/ReloadProcessor.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/ReloadProcessor.scala @@ -21,10 +21,12 @@ class ReloadProcessor(data: DataFrame, batchSuccess: LongAccumulator, batchFailure: LongAccumulator) extends Processor { - private val LOG = Logger.getLogger(this.getClass) + @transient + private[this] lazy val LOG = Logger.getLogger(this.getClass) override def process(): Unit = { data.foreachPartition(processEachPartition(_)) + ErrorHandler.clear(config.errorConfig.errorPath) } private def processEachPartition(iterator: Iterator[Row]): Unit = { @@ -38,10 +40,9 @@ class ReloadProcessor(data: DataFrame, graphProvider) val errorBuffer = ArrayBuffer[String]() - ErrorHandler.clear(config.errorConfig.errorPath) writer.prepare() - // batch write tags + // batch write val startTime = System.currentTimeMillis iterator.foreach { row => val failStatement = writer.writeNgql(row.getString(0)) @@ -57,9 +58,8 @@ class ReloadProcessor(data: DataFrame, s"${config.errorConfig.errorPath}/reload.${TaskContext.getPartitionId()}") errorBuffer.clear() } - LOG.info( - s"spark partition for vertex reload time:" + - s"${TaskContext.getPartitionId()}-${System.currentTimeMillis() - startTime}") + LOG.info(s"data reload in partition ${TaskContext + .getPartitionId()} cost ${System.currentTimeMillis() - startTime}ms") writer.close() graphProvider.close() } diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index f6568ab8..7a24a3c3 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -93,9 +93,8 @@ class VerticesProcessor(data: DataFrame, s"${config.errorConfig.errorPath}/${tagConfig.name}.${TaskContext.getPartitionId()}") errorBuffer.clear() } - LOG.info( - s"spark partition for vertex cost time:" + - s"${TaskContext.getPartitionId()}-${System.currentTimeMillis() - startTime}") + LOG.info(s"tag ${tagConfig.name} import in spark partition ${TaskContext + .getPartitionId()} cost ${System.currentTimeMillis() - startTime} ms") writer.close() graphProvider.close() } diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/writer/ServerBaseWriter.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/writer/ServerBaseWriter.scala index cd674639..e0bc2d4e 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/writer/ServerBaseWriter.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/writer/ServerBaseWriter.scala @@ -181,9 +181,9 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry, if (result.isSucceeded) { return null } - LOG.error(s"rewrite ngql failed for ${result.getErrorMessage}") + LOG.error(s"reimport ngql failed for ${result.getErrorMessage}") } else { - LOG.error(s"rewrite ngql failed because write speed is too fast") + LOG.error(s"reimport ngql failed because write speed is too fast") } LOG.info(ngql) ngql From 42766a07ff5f8d995e5c5d667d2d9f0f7ea4ed78 Mon Sep 17 00:00:00 2001 From: Nicole00 <16240361+Nicole00@users.noreply.github.com> Date: Thu, 1 Jul 2021 15:25:14 +0800 Subject: [PATCH 4/5] clean the error path before reimport --- .../vesoft/nebula/exchange/ErrorHandler.scala | 39 ++++++++++++------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/ErrorHandler.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/ErrorHandler.scala index 8803544a..4917bc93 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/ErrorHandler.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/ErrorHandler.scala @@ -6,41 +6,47 @@ package com.vesoft.nebula.exchange -import java.io.File - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.log4j.Logger import scala.collection.mutable.ArrayBuffer object ErrorHandler { + @transient + private[this] val LOG = Logger.getLogger(this.getClass) /** * clean all the failed data for error path before reload. + * + * @param path path to clean */ def clear(path: String): Unit = { - if (path.startsWith("hdfs://")) { - val fileSystem = FileSystem.get(new Configuration()) - fileSystem.removeAcl(new Path(path)) - } else { - val directory = new File(path) - if (directory.exists()) { - val content: Array[String] = directory.list() - for (fileName <- content) { - if (!fileName.startsWith("reload.")) { - val tmp = new File(path, fileName) - tmp.delete() - } + try { + val fileSystem = FileSystem.get(new Configuration()) + val filesStatus = fileSystem.listStatus(new Path(path)) + for (file <- filesStatus) { + if (!file.getPath.getName.startsWith("reload.")) { + fileSystem.delete(file.getPath, true) } } + } catch { + case e: Throwable => { + LOG.error(s"$path cannot be clean, but this error does not affect the import result, " + + s"you can only focus on the reload files.", + e) + } } - } /** * save the failed execute statement. + * + * @param buffer buffer saved failed ngql + * @param path path to write these buffer ngql */ def save(buffer: ArrayBuffer[String], path: String): Unit = { + LOG.info(s"create reload path $path") val fileSystem = FileSystem.get(new Configuration()) val errors = fileSystem.create(new Path(path)) @@ -56,6 +62,9 @@ object ErrorHandler { /** * check if path exists + * + * @param path error path + *@return true if path exists */ def existError(path: String): Boolean = { val fileSystem = FileSystem.get(new Configuration()) From 5be083b95444aef0190da15f1432edca4a22437e Mon Sep 17 00:00:00 2001 From: Nicole00 <16240361+Nicole00@users.noreply.github.com> Date: Fri, 2 Jul 2021 12:43:26 +0800 Subject: [PATCH 5/5] remove the clear operation --- .../com/vesoft/nebula/exchange/processor/ReloadProcessor.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/ReloadProcessor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/ReloadProcessor.scala index 62fedd9d..29401e05 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/ReloadProcessor.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/ReloadProcessor.scala @@ -26,7 +26,6 @@ class ReloadProcessor(data: DataFrame, override def process(): Unit = { data.foreachPartition(processEachPartition(_)) - ErrorHandler.clear(config.errorConfig.errorPath) } private def processEachPartition(iterator: Iterator[Row]): Unit = {