Skip to content
This repository has been archived by the owner on Oct 18, 2021. It is now read-only.

Commit

Permalink
Merge pull request #104 from Nicole00/add_switch_space_for_reload
Browse files Browse the repository at this point in the history
add switch space for reload
  • Loading branch information
Nicole00 authored Jul 21, 2021
2 parents 64c4f70 + 6e789f7 commit b5f60c1
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 22 deletions.
4 changes: 2 additions & 2 deletions nebula-exchange/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
graph:["127.0.0.1:9669"]
meta:["127.0.0.1:9559"]
}
user: user
pswd: password
user: root
pswd: nebula
space: test

# parameters for SST import, not required
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,45 @@ package com.vesoft.nebula.exchange

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.log4j.Logger

import scala.collection.mutable.ArrayBuffer

object ErrorHandler {
@transient
private[this] val LOG = Logger.getLogger(this.getClass)

/**
* clean all the failed data for error path before reload.
*
* @param path path to clean
*/
def clear(path: String): Unit = {
try {
val fileSystem = FileSystem.get(new Configuration())
val filesStatus = fileSystem.listStatus(new Path(path))
for (file <- filesStatus) {
if (!file.getPath.getName.startsWith("reload.")) {
fileSystem.delete(file.getPath, true)
}
}
} catch {
case e: Throwable => {
LOG.error(s"$path cannot be clean, but this error does not affect the import result, " +
s"you can only focus on the reload files.",
e)
}
}
}

/**
* save the failed execute statement.
*
* @param buffer buffer saved failed ngql
* @param path path to write these buffer ngql
*/
def save(buffer: ArrayBuffer[String], path: String): Unit = {
LOG.info(s"create reload path $path")
val fileSystem = FileSystem.get(new Configuration())
val errors = fileSystem.create(new Path(path))

Expand All @@ -25,6 +60,12 @@ object ErrorHandler {
}
}

/**
* check if path exists
*
* @param path error path
*@return true if path exists
*/
def existError(path: String): Boolean = {
val fileSystem = FileSystem.get(new Configuration())
fileSystem.exists(new Path(path))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ object Exchange {
sys.exit(0)
}

// record the failed batch number
var failures: Long = 0L

// import tags
if (configs.tagsConfig.nonEmpty) {
for (tagConfig <- configs.tagsConfig) {
Expand Down Expand Up @@ -150,6 +153,7 @@ object Exchange {
if (tagConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) {
LOG.info(s"Client-Import: batchSuccess.${tagConfig.name}: ${batchSuccess.value}")
LOG.info(s"Client-Import: batchFailure.${tagConfig.name}: ${batchFailure.value}")
failures += batchFailure.value
} else {
LOG.info(s"SST-Import: failure.${tagConfig.name}: ${batchFailure.value}")
}
Expand Down Expand Up @@ -186,6 +190,7 @@ object Exchange {
if (edgeConfig.dataSinkConfigEntry.category == SinkCategory.CLIENT) {
LOG.info(s"Client-Import: batchSuccess.${edgeConfig.name}: ${batchSuccess.value}")
LOG.info(s"Client-Import: batchFailure.${edgeConfig.name}: ${batchFailure.value}")
failures += batchFailure.value
} else {
LOG.info(s"SST-Import: failure.${edgeConfig.name}: ${batchFailure.value}")
}
Expand All @@ -196,11 +201,12 @@ object Exchange {
}

// reimport for failed tags and edges
if (ErrorHandler.existError(configs.errorConfig.errorPath)) {
if (failures > 0 && ErrorHandler.existError(configs.errorConfig.errorPath)) {
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.reimport")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.reimport")
val data = spark.read.text(configs.errorConfig.errorPath)
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure)
data.count()
val processor = new ReloadProcessor(data, configs, batchSuccess, batchFailure)
processor.process()
LOG.info(s"batchSuccess.reimport: ${batchSuccess.value}")
LOG.info(s"batchFailure.reimport: ${batchFailure.value}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,8 @@ class EdgeProcessor(data: DataFrame,
s"${config.errorConfig.errorPath}/${edgeConfig.name}.${TaskContext.getPartitionId}")
errorBuffer.clear()
}
LOG.info(
s"spark partition for edge cost time:" +
s"${TaskContext.getPartitionId()}-${System.currentTimeMillis() - startTime}")
LOG.info(s"edge ${edgeConfig.name} import in spark partition ${TaskContext
.getPartitionId()} cost ${System.currentTimeMillis() - startTime}ms")
writer.close()
graphProvider.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package com.vesoft.nebula.exchange.processor

import com.vesoft.nebula.exchange.{ErrorHandler, GraphProvider}
import com.vesoft.nebula.exchange.config.Configs
import com.vesoft.nebula.exchange.writer.NebulaGraphClientWriter
import org.apache.log4j.Logger
import org.apache.spark.TaskContext
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.util.LongAccumulator
Expand All @@ -19,6 +21,8 @@ class ReloadProcessor(data: DataFrame,
batchSuccess: LongAccumulator,
batchFailure: LongAccumulator)
extends Processor {
@transient
private[this] lazy val LOG = Logger.getLogger(this.getClass)

override def process(): Unit = {
data.foreachPartition(processEachPartition(_))
Expand All @@ -27,27 +31,35 @@ class ReloadProcessor(data: DataFrame,
private def processEachPartition(iterator: Iterator[Row]): Unit = {
val graphProvider =
new GraphProvider(config.databaseConfig.getGraphAddress, config.connectionConfig.timeout)
val session = graphProvider.getGraphClient(config.userConfig)
if (session == null) {
throw new IllegalArgumentException("connect to graph failed.")
}

val writer = new NebulaGraphClientWriter(config.databaseConfig,
config.userConfig,
config.rateConfig,
null,
graphProvider)

val errorBuffer = ArrayBuffer[String]()

iterator.foreach(row => {
val exec = row.getString(0)
val result = session.execute(exec)
if (result == null || !result.isSucceeded) {
errorBuffer.append(exec)
batchFailure.add(1)
} else {
writer.prepare()
// batch write
val startTime = System.currentTimeMillis
iterator.foreach { row =>
val failStatement = writer.writeNgql(row.getString(0))
if (failStatement == null) {
batchSuccess.add(1)
} else {
errorBuffer.append(failStatement)
batchFailure.add(1)
}
})
}
if (errorBuffer.nonEmpty) {
ErrorHandler.save(errorBuffer,
s"${config.errorConfig.errorPath}/reload.${TaskContext.getPartitionId()}")
errorBuffer.clear()
}
LOG.info(s"data reload in partition ${TaskContext
.getPartitionId()} cost ${System.currentTimeMillis() - startTime}ms")
writer.close()
graphProvider.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,8 @@ class VerticesProcessor(data: DataFrame,
s"${config.errorConfig.errorPath}/${tagConfig.name}.${TaskContext.getPartitionId()}")
errorBuffer.clear()
}
LOG.info(
s"spark partition for vertex cost time:" +
s"${TaskContext.getPartitionId()}-${System.currentTimeMillis() - startTime}")
LOG.info(s"tag ${tagConfig.name} import in spark partition ${TaskContext
.getPartitionId()} cost ${System.currentTimeMillis() - startTime} ms")
writer.close()
graphProvider.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ abstract class ServerBaseWriter extends Writer {
def writeVertices(vertices: Vertices): String

def writeEdges(edges: Edges): String

def writeNgql(ngql: String): String
}

/**
Expand Down Expand Up @@ -173,6 +175,20 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry,
sentence
}

override def writeNgql(ngql: String): String = {
if (rateLimiter.tryAcquire(rateConfig.timeout, TimeUnit.MILLISECONDS)) {
val result = graphProvider.submit(session, ngql)
if (result.isSucceeded) {
return null
}
LOG.error(s"reimport ngql failed for ${result.getErrorMessage}")
} else {
LOG.error(s"reimport ngql failed because write speed is too fast")
}
LOG.info(ngql)
ngql
}

override def close(): Unit = {
graphProvider.releaseGraphClient(session)
}
Expand Down Expand Up @@ -241,5 +257,7 @@ class NebulaStorageClientWriter(addresses: List[(String, Int)], space: String)

override def writeEdges(edges: Edges): String = ???

override def writeNgql(ngql: String): String = ???

override def close(): Unit = {}
}

0 comments on commit b5f60c1

Please sign in to comment.