Skip to content

Commit

Permalink
add retry for write (#182)
Browse files Browse the repository at this point in the history
* add retry for write

* fix test
  • Loading branch information
Nicole00 authored Dec 1, 2023
1 parent 741e4ef commit 1e8a664
Show file tree
Hide file tree
Showing 10 changed files with 69 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ case class UserConfigEntry(user: String, password: String) {
* @param retry
*/
case class ConnectionConfigEntry(timeout: Int, retry: Int) {
require(timeout > 0 && retry > 0)
require(timeout > 0 && retry > 0, "connection timeout or retry must be larger than 0")

override def toString: String = s"cConnectionConfigEntry:{timeout:$timeout, retry:$retry}"
}
Expand All @@ -120,7 +120,7 @@ case class ConnectionConfigEntry(timeout: Int, retry: Int) {
* @param interval
*/
case class ExecutionConfigEntry(timeout: Int, retry: Int, interval: Int) {
require(timeout > 0 && retry > 0 && interval > 0)
require(timeout > 0, "execution timeout must be larger than 0")

override def toString: String = s"ExecutionConfigEntry:{timeout:$timeout, retry:$retry}"
}
Expand Down Expand Up @@ -172,7 +172,8 @@ case class SslConfigEntry(enableGraph: Boolean,
}
}

override def toString: String = s"SslConfigEntry:{enableGraph:$enableGraph, enableMeta:$enableMeta, signType:${signType.toString}}"
override def toString: String =
s"SslConfigEntry:{enableGraph:$enableGraph, enableMeta:$enableMeta, signType:${signType.toString}}"
}

case class CaSignParam(caCrtFilePath: String, crtFilePath: String, keyFilePath: String)
Expand Down Expand Up @@ -256,7 +257,7 @@ object Configs {
private[this] val DEFAULT_CONNECTION_RETRY = 3
private[this] val DEFAULT_EXECUTION_RETRY = 3
private[this] val DEFAULT_EXECUTION_TIMEOUT = Integer.MAX_VALUE
private[this] val DEFAULT_EXECUTION_INTERVAL = 3000
private[this] val DEFAULT_EXECUTION_INTERVAL = 0
private[this] val DEFAULT_ERROR_OUTPUT_PATH = "file:///tmp/nebula.writer.errors/"
private[this] val DEFAULT_ERROR_MAX_BATCH_SIZE = Int.MaxValue
private[this] val DEFAULT_RATE_LIMIT = 1024
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class ReloadProcessor(data: DataFrame,
config.userConfig,
config.rateConfig,
null,
graphProvider)
graphProvider,
config.executionConfig)

val errorBuffer = ArrayBuffer[String]()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,9 @@ import java.util.concurrent.TimeUnit
import com.google.common.util.concurrent.RateLimiter
import com.vesoft.exchange.common.GraphProvider
import com.vesoft.exchange.common.{Edges, KeyPolicy, Vertices}
import com.vesoft.exchange.common.config.{
DataBaseConfigEntry,
EdgeConfigEntry,
RateConfigEntry,
SchemaConfigEntry,
TagConfigEntry,
Type,
UserConfigEntry,
WriteMode
}
import com.vesoft.exchange.common.config.{DataBaseConfigEntry, EdgeConfigEntry, ExecutionConfigEntry, RateConfigEntry, SchemaConfigEntry, TagConfigEntry, Type, UserConfigEntry, WriteMode}
import com.vesoft.nebula.ErrorCode
import com.vesoft.nebula.client.graph.data.{HostAddress, ResultSet}
import org.apache.log4j.Logger

import scala.collection.JavaConversions.seqAsJavaList
Expand Down Expand Up @@ -282,7 +274,8 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry,
userConfigEntry: UserConfigEntry,
rateConfig: RateConfigEntry,
config: SchemaConfigEntry,
graphProvider: GraphProvider)
graphProvider: GraphProvider,
executeConfig: ExecutionConfigEntry)
extends ServerBaseWriter {
private val LOG = Logger.getLogger(this.getClass)

Expand Down Expand Up @@ -347,7 +340,7 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry,
throw new RuntimeException(
s"write ${config.name} failed for E_BAD_PERMISSION: ${result._2.getErrorMessage}")
}
LOG.error(
LOG.warn(
s">>>>>> write ${config.name} failed for: ${result._2.getErrorMessage}, now retry writing one by one.")
// re-execute the vertices one by one
vertices.values.foreach(value => {
Expand All @@ -362,16 +355,34 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry,
}

private def writeVertex(vertices: Vertices): String = {

val statement = execute(vertices, config.asInstanceOf[TagConfigEntry].writeMode)
val result = graphProvider.submit(session, statement)
if (result._2.isSucceeded) {
LOG.info(
s">>>>> write ${config.name}, batch size(${vertices.values.size}), latency(${result._2.getLatency})")
null
} else {
LOG.error(s">>>>> write vertex failed for ${result._2.getErrorMessage} statement: \n $statement")
statement
return null
}
// write failed for one record. retry for some storage error.
var finalResult: ResultSet = result._2
val retry = 0
while (retry < executeConfig.retry && (finalResult.getErrorMessage.contains(
"Storage Error: RPC failure, probably timeout")
|| finalResult.getErrorMessage.contains("raft buffer is full. Please retry later")
|| finalResult.getErrorMessage.contains("The leader has changed"))) {
Thread.sleep(executeConfig.interval)
val retryResult = graphProvider.submit(session, statement)
finalResult = retryResult._2
if (finalResult.isSucceeded) {
LOG.info(
s">>>>> write ${config.name}, batch size(${vertices.values.size}), latency(${finalResult.getLatency})")
return null
}
}
// the write still failed after retry.
LOG.error(
s">>>>> write vertex failed for ${finalResult.getErrorMessage} statement: \n $statement")
statement
}

override def writeEdges(edges: Edges, ignoreIndex: Boolean = false): List[String] = {
Expand Down Expand Up @@ -409,11 +420,27 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry,
if (result._2.isSucceeded) {
LOG.info(
s">>>>> write ${config.name}, batch size(${edges.values.size}), latency(${result._2.getLatency})")
null
} else {
LOG.error(s">>>>> write edge failed for ${result._2.getErrorMessage} statement: \n $statement")
statement
return null
}
// write failed for one record. retry for some storage error.
var finalResult: ResultSet = result._2
val retry = 0
while (retry < executeConfig.retry && (finalResult.getErrorMessage.contains(
"Storage Error: RPC failure, probably timeout")
|| finalResult.getErrorMessage.contains("raft buffer is full. Please retry later")
|| finalResult.getErrorMessage.contains("The leader has changed"))) {
Thread.sleep(executeConfig.interval)
val retryResult = graphProvider.submit(session, statement)
finalResult = retryResult._2
if (finalResult.isSucceeded) {
LOG.info(
s">>>>> write ${config.name}, batch size(${edges.values.size}), latency(${finalResult.getLatency})")
return null
}
}
LOG.error(s">>>>> write edge failed for ${result._2.getErrorMessage} statement: \n $statement")
statement

}

override def writeNgql(ngql: String): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class ConfigsSuite {
assert(connectionConfig.timeout == 3000)

assert(executionConfig.retry == 3)
assert(executionConfig.interval == 3000)
assert(executionConfig.interval == 0)
assert(executionConfig.timeout == Integer.MAX_VALUE)

assert(errorConfig.errorMaxSize == 32)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ class EdgeProcessor(spark: SparkSession,
config.userConfig,
config.rateConfig,
edgeConfig,
graphProvider)
graphProvider,
config.executionConfig)
val errorBuffer = ArrayBuffer[String]()

writer.prepare()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,16 @@ class VerticesProcessor(spark: SparkSession,
config.userConfig,
config.rateConfig,
tagConfig,
graphProvider)
graphProvider,
config.executionConfig)

val errorBuffer = ArrayBuffer[String]()

writer.prepare()
// batch write tags
val startTime = System.currentTimeMillis
iterator.grouped(tagConfig.batch).foreach { vertexSet =>
val vertices = Vertices(nebulaKeys, vertexSet.toList, tagConfig.vertexPolicy)
val vertices = Vertices(nebulaKeys, vertexSet.toList, tagConfig.vertexPolicy)
val failStatements = writer.writeVertices(vertices, tagConfig.ignoreIndex)
if (failStatements.isEmpty) {
batchSuccess.add(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ class EdgeProcessor(spark: SparkSession,
config.userConfig,
config.rateConfig,
edgeConfig,
graphProvider)
graphProvider,
config.executionConfig)
val errorBuffer = ArrayBuffer[String]()

writer.prepare()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class VerticesProcessor(spark: SparkSession,
config.userConfig,
config.rateConfig,
tagConfig,
graphProvider)
graphProvider,
config.executionConfig)

val errorBuffer = ArrayBuffer[String]()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ class EdgeProcessor(spark: SparkSession,
config.userConfig,
config.rateConfig,
edgeConfig,
graphProvider)
graphProvider,
config.executionConfig)
val errorBuffer = ArrayBuffer[String]()

writer.prepare()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,16 @@ class VerticesProcessor(spark: SparkSession,
config.userConfig,
config.rateConfig,
tagConfig,
graphProvider)
graphProvider,
config.executionConfig)

val errorBuffer = ArrayBuffer[String]()

writer.prepare()
// batch write tags
val startTime = System.currentTimeMillis
iterator.grouped(tagConfig.batch).foreach { vertexSet =>
val vertices = Vertices(nebulaKeys, vertexSet.toList, tagConfig.vertexPolicy)
val vertices = Vertices(nebulaKeys, vertexSet.toList, tagConfig.vertexPolicy)
val failStatements = writer.writeVertices(vertices, tagConfig.ignoreIndex)
if (failStatements.isEmpty) {
batchSuccess.add(1)
Expand Down

0 comments on commit 1e8a664

Please sign in to comment.