diff --git a/nebula-exchange/src/main/resources/application.conf b/nebula-exchange/src/main/resources/application.conf index 6b866dbd..33a3cd96 100644 --- a/nebula-exchange/src/main/resources/application.conf +++ b/nebula-exchange/src/main/resources/application.conf @@ -39,8 +39,8 @@ graph:["127.0.0.1:9669"] meta:["127.0.0.1:9559"] } - user: user - pswd: password + user: root + pswd: nebula space: test # parameters for SST import, not required diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/ErrorHandler.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/ErrorHandler.scala index 056d6545..4917bc93 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/ErrorHandler.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/ErrorHandler.scala @@ -8,10 +8,45 @@ package com.vesoft.nebula.exchange import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.log4j.Logger + import scala.collection.mutable.ArrayBuffer object ErrorHandler { + @transient + private[this] val LOG = Logger.getLogger(this.getClass) + + /** + * clean all the failed data for error path before reload. + * + * @param path path to clean + */ + def clear(path: String): Unit = { + try { + val fileSystem = FileSystem.get(new Configuration()) + val filesStatus = fileSystem.listStatus(new Path(path)) + for (file <- filesStatus) { + if (!file.getPath.getName.startsWith("reload.")) { + fileSystem.delete(file.getPath, true) + } + } + } catch { + case e: Throwable => { + LOG.error(s"$path cannot be clean, but this error does not affect the import result, " + + s"you can only focus on the reload files.", + e) + } + } + } + + /** + * save the failed execute statement. + * + * @param buffer buffer saved failed ngql + * @param path path to write these buffer ngql + */ def save(buffer: ArrayBuffer[String], path: String): Unit = { + LOG.info(s"create reload path $path") val fileSystem = FileSystem.get(new Configuration()) val errors = fileSystem.create(new Path(path)) @@ -25,6 +60,12 @@ object ErrorHandler { } } + /** + * check if path exists + * + * @param path error path + *@return true if path exists + */ def existError(path: String): Boolean = { val fileSystem = FileSystem.get(new Configuration()) fileSystem.exists(new Path(path)) diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index d2457b0e..21ffcc7c 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -121,6 +121,9 @@ object Exchange { sys.exit(0) } + // record the failed batch number + var failures: Long = 0L + // import tags if (configs.tagsConfig.nonEmpty) { for (tagConfig <- configs.tagsConfig) { @@ -150,6 +153,7 @@ object Exchange { if (tagConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) { LOG.info(s"Client-Import: batchSuccess.${tagConfig.name}: ${batchSuccess.value}") LOG.info(s"Client-Import: batchFailure.${tagConfig.name}: ${batchFailure.value}") + failures += batchFailure.value } else { LOG.info(s"SST-Import: failure.${tagConfig.name}: ${batchFailure.value}") } @@ -186,6 +190,7 @@ object Exchange { if (edgeConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) { LOG.info(s"Client-Import: batchSuccess.${edgeConfig.name}: ${batchSuccess.value}") LOG.info(s"Client-Import: batchFailure.${edgeConfig.name}: ${batchFailure.value}") + failures += batchFailure.value } else { LOG.info(s"SST-Import: failure.${edgeConfig.name}: ${batchFailure.value}") } @@ -196,11 +201,12 @@ object Exchange { } // reimport for failed tags and edges - if (ErrorHandler.existError(configs.errorConfig.errorPath)) { + if (failures > 0 && ErrorHandler.existError(configs.errorConfig.errorPath)) { val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport") val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport") val data = spark.read.text(configs.errorConfig.errorPath) - val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure) + data.count() + val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure) processor.process() LOG.info(s"batchSuccess.reimport: ${batchSuccess.value}") LOG.info(s"batchFailure.reimport: ${batchFailure.value}") diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index 91a98bfa..ab82d635 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -85,9 +85,8 @@ class EdgeProcessor(data: DataFrame, s"${config.errorConfig.errorPath}/${edgeConfig.name}.${TaskContext.getPartitionId}") errorBuffer.clear() } - LOG.info( - s"spark partition for edge cost time:" + - s"${TaskContext.getPartitionId()}-${System.currentTimeMillis() - startTime}") + LOG.info(s"edge ${edgeConfig.name} import in spark partition ${TaskContext + .getPartitionId()} cost ${System.currentTimeMillis() - startTime}ms") writer.close() graphProvider.close() } diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/ReloadProcessor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/ReloadProcessor.scala index 2bc353b3..29401e05 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/ReloadProcessor.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/ReloadProcessor.scala @@ -8,6 +8,8 @@ package com.vesoft.nebula.exchange.processor import com.vesoft.nebula.exchange.{ErrorHandler, GraphProvider} import com.vesoft.nebula.exchange.config.Configs +import com.vesoft.nebula.exchange.writer.NebulaGraphClientWriter +import org.apache.log4j.Logger import org.apache.spark.TaskContext import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.util.LongAccumulator @@ -19,6 +21,8 @@ class ReloadProcessor(data: DataFrame, batchSuccess: LongAccumulator, batchFailure: LongAccumulator) extends Processor { + @transient + private[this] lazy val LOG = Logger.getLogger(this.getClass) override def process(): Unit = { data.foreachPartition(processEachPartition(_)) @@ -27,27 +31,35 @@ class ReloadProcessor(data: DataFrame, private def processEachPartition(iterator: Iterator[Row]): Unit = { val graphProvider = new GraphProvider(config.databaseConfig.getGraphAddress, config.connectionConfig.timeout) - val session = graphProvider.getGraphClient(config.userConfig) - if (session == null) { - throw new IllegalArgumentException("connect to graph failed.") - } + + val writer = new NebulaGraphClientWriter(config.databaseConfig, + config.userConfig, + config.rateConfig, + null, + graphProvider) val errorBuffer = ArrayBuffer[String]() - iterator.foreach(row => { - val exec = row.getString(0) - val result = session.execute(exec) - if (result == null || !result.isSucceeded) { - errorBuffer.append(exec) - batchFailure.add(1) - } else { + writer.prepare() + // batch write + val startTime = System.currentTimeMillis + iterator.foreach { row => + val failStatement = writer.writeNgql(row.getString(0)) + if (failStatement == null) { batchSuccess.add(1) + } else { + errorBuffer.append(failStatement) + batchFailure.add(1) } - }) + } if (errorBuffer.nonEmpty) { ErrorHandler.save(errorBuffer, s"${config.errorConfig.errorPath}/reload.${TaskContext.getPartitionId()}") errorBuffer.clear() } + LOG.info(s"data reload in partition ${TaskContext + .getPartitionId()} cost ${System.currentTimeMillis() - startTime}ms") + writer.close() + graphProvider.close() } } diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index f6568ab8..7a24a3c3 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -93,9 +93,8 @@ class VerticesProcessor(data: DataFrame, s"${config.errorConfig.errorPath}/${tagConfig.name}.${TaskContext.getPartitionId()}") errorBuffer.clear() } - LOG.info( - s"spark partition for vertex cost time:" + - s"${TaskContext.getPartitionId()}-${System.currentTimeMillis() - startTime}") + LOG.info(s"tag ${tagConfig.name} import in spark partition ${TaskContext + .getPartitionId()} cost ${System.currentTimeMillis() - startTime} ms") writer.close() graphProvider.close() } diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/writer/ServerBaseWriter.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/writer/ServerBaseWriter.scala index 11295a4b..e469dd8a 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/writer/ServerBaseWriter.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/writer/ServerBaseWriter.scala @@ -111,6 +111,8 @@ abstract class ServerBaseWriter extends Writer { def writeVertices(vertices: Vertices): String def writeEdges(edges: Edges): String + + def writeNgql(ngql: String): String } /** @@ -173,6 +175,20 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry, sentence } + override def writeNgql(ngql: String): String = { + if (rateLimiter.tryAcquire(rateConfig.timeout, TimeUnit.MILLISECONDS)) { + val result = graphProvider.submit(session, ngql) + if (result.isSucceeded) { + return null + } + LOG.error(s"reimport ngql failed for ${result.getErrorMessage}") + } else { + LOG.error(s"reimport ngql failed because write speed is too fast") + } + LOG.info(ngql) + ngql + } + override def close(): Unit = { graphProvider.releaseGraphClient(session) } @@ -241,5 +257,7 @@ class NebulaStorageClientWriter(addresses: List[(String, Int)], space: String) override def writeEdges(edges: Edges): String = ??? + override def writeNgql(ngql: String): String = ??? + override def close(): Unit = {} }