Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add batch delete for nebula-exchange #151

Merged
merged 4 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ object SslType extends Enumeration {
val SELF = Value("self")
}

object WriteMode extends Enumeration {
type Mode = Value
val INSERT = Value("insert")
val UPDATE = Value("update")
val DELETE = Value("delete")
}

/**
* DataBaseConfigEntry describe the nebula cluster's address and which space will be used.
*
Expand Down Expand Up @@ -261,6 +268,7 @@ object Configs {
private[this] val DEFAULT_STREAM_INTERVAL = 30
private[this] val DEFAULT_KAFKA_STARTINGOFFSETS = "latest"
private[this] val DEFAULT_PARALLEL = 1
private[this] val DEFAULT_WRITE_MODE = "INSERT"

/**
*
Expand Down Expand Up @@ -429,7 +437,10 @@ object Configs {
val sinkConfig = dataSinkConfig(sinkCategory, nebulaConfig)
LOG.info(s"Sink Config ${sourceConfig}")

val batch = getOrElse(tagConfig, "batch", DEFAULT_BATCH)
// val writeMode = toWriteModeCategory(tagConfig.getString("writeMode"))
val writeModeStr = getOrElse(tagConfig, "writeMode", DEFAULT_WRITE_MODE)
val writeMode = toWriteModeCategory(writeModeStr)
val batch = getOrElse(tagConfig, "batch", DEFAULT_BATCH)
val checkPointPath =
if (tagConfig.hasPath("check_point_path")) Some(tagConfig.getString("check_point_path"))
else DEFAULT_CHECK_POINT_PATH
Expand All @@ -440,6 +451,7 @@ object Configs {
val partition = getOrElse(tagConfig, "partition", DEFAULT_PARTITION)
val repartitionWithNebula = getOrElse(tagConfig, "repartitionWithNebula", true)
val ignoreIndex = getOrElse(tagConfig, "ignoreIndex", false)
val deleteEdge = getOrElse(tagConfig, "deleteEdge", false)

val vertexUdf = if (tagConfig.hasPath("vertex.udf")) {
val sep = tagConfig.getString("vertex.udf.separator")
Expand All @@ -455,6 +467,7 @@ object Configs {
sinkConfig,
fields,
nebulaFields,
writeMode,
vertexField,
policyOpt,
prefix,
Expand All @@ -464,6 +477,7 @@ object Configs {
repartitionWithNebula,
enableTagless,
ignoreIndex,
deleteEdge,
vertexUdf
)
LOG.info(s"Tag Config: ${entry}")
Expand Down Expand Up @@ -562,7 +576,9 @@ object Configs {
None
}

val batch = getOrElse(edgeConfig, "batch", DEFAULT_BATCH)
val writeModeStr = getOrElse(edgeConfig, "writeMode", DEFAULT_WRITE_MODE)
val writeMode = toWriteModeCategory(writeModeStr)
val batch = getOrElse(edgeConfig, "batch", DEFAULT_BATCH)
val checkPointPath =
if (edgeConfig.hasPath("check_point_path")) Some(edgeConfig.getString("check_point_path"))
else DEFAULT_CHECK_POINT_PATH
Expand Down Expand Up @@ -595,6 +611,7 @@ object Configs {
sinkConfig,
fields,
nebulaFields,
writeMode,
sourceField,
sourcePolicy,
sourcePrefix,
Expand Down Expand Up @@ -672,6 +689,21 @@ object Configs {
}
}

/**
* Use to get write mode according to category of writeMode.
*
* @param category
* @return
*/
private[this] def toWriteModeCategory(category: String): WriteMode.Mode = {
category.trim.toUpperCase match {
case "INSERT" => WriteMode.INSERT
case "UPDATE" => WriteMode.UPDATE
case "DELETE" => WriteMode.DELETE
case _ => throw new IllegalArgumentException(s"${category} not support")
}
}

/**
* Use to generate data source com.vesoft.exchange.common.config according to category of source.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ sealed trait SchemaConfigEntry {

/** check point path */
def checkPointPath: Option[String]

/** write mode */
def writeMode: WriteMode.Mode
}

/**
Expand All @@ -55,6 +58,7 @@ case class TagConfigEntry(override val name: String,
override val dataSinkConfigEntry: DataSinkConfigEntry,
override val fields: List[String],
override val nebulaFields: List[String],
override val writeMode: WriteMode.Mode,
vertexField: String,
vertexPolicy: Option[KeyPolicy.Value],
vertexPrefix: String,
Expand All @@ -64,6 +68,7 @@ case class TagConfigEntry(override val name: String,
repartitionWithNebula: Boolean = true,
enableTagless: Boolean = false,
ignoreIndex: Boolean = false,
deleteEdge: Boolean = false,
vertexUdf: Option[UdfConfigEntry] = None)
extends SchemaConfigEntry {
require(
Expand All @@ -74,6 +79,7 @@ case class TagConfigEntry(override val name: String,
s"Tag name: $name, " +
s"source: $dataSourceConfigEntry, " +
s"sink: $dataSinkConfigEntry, " +
s"writeMode: $writeMode, " +
s"vertex field: $vertexField, " +
s"vertex policy: $vertexPolicy, " +
s"batch: $batch, " +
Expand Down Expand Up @@ -109,6 +115,7 @@ case class EdgeConfigEntry(override val name: String,
override val dataSinkConfigEntry: DataSinkConfigEntry,
override val fields: List[String],
override val nebulaFields: List[String],
override val writeMode: WriteMode.Mode,
sourceField: String,
sourcePolicy: Option[KeyPolicy.Value],
sourcePrefix: String,
Expand Down Expand Up @@ -136,6 +143,7 @@ case class EdgeConfigEntry(override val name: String,
s"Edge name: $name, " +
s"source: $dataSourceConfigEntry, " +
s"sink: $dataSinkConfigEntry, " +
s"writeMode: $writeMode, " +
s"latitude: $latitude, " +
s"longitude: $longitude, " +
s"source field: $sourceField, " +
Expand All @@ -152,6 +160,7 @@ case class EdgeConfigEntry(override val name: String,
s"Edge name: $name, " +
s"source: $dataSourceConfigEntry, " +
s"sink: $dataSinkConfigEntry, " +
s"writeMode: $writeMode, " +
s"source field: $sourceField, " +
s"source policy: $sourcePolicy, " +
s"ranking: $rankingField, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,10 @@
package com.vesoft.exchange.common.writer

import java.util.concurrent.TimeUnit

import com.google.common.util.concurrent.RateLimiter
import com.vesoft.exchange.common.GraphProvider
import com.vesoft.exchange.common.{Edges, KeyPolicy, Vertices}
import com.vesoft.exchange.common.config.{
DataBaseConfigEntry,
RateConfigEntry,
SchemaConfigEntry,
Type,
UserConfigEntry
}
import com.vesoft.exchange.common.config.{DataBaseConfigEntry, EdgeConfigEntry, RateConfigEntry, SchemaConfigEntry, TagConfigEntry, Type, UserConfigEntry, WriteMode}
import com.vesoft.nebula.ErrorCode
import org.apache.log4j.Logger

Expand All @@ -30,6 +23,12 @@ abstract class ServerBaseWriter extends Writer {
private[this] val EDGE_VALUE_WITHOUT_RANKING_TEMPLATE = "%s->%s: (%s)"
private[this] val EDGE_VALUE_TEMPLATE = "%s->%s@%d: (%s)"

private[this] val BATCH_DELETE_VERTEX_TEMPLATE = "DELETE %s %s"
private[this] val BATCH_DELETE_VERTEX_WITH_EDGE_TEMPLATE = "DELETE %s %s WITH EDGE"
private[this] val DELETE_VALUE_TEMPLATE = "%s"
private[this] val BATCH_DELETE_EDGE_TEMPLATE = "DELETE %s `%s` %s"
private[this] val EDGE_ENDPOINT_TEMPLATE = "%s->%s@%d"

def toExecuteSentence(name: String, vertices: Vertices, ignoreIndex: Boolean): String = {
{ if (ignoreIndex) BATCH_INSERT_IGNORE_INDEX_TEMPLATE else BATCH_INSERT_TEMPLATE }
.format(
Expand Down Expand Up @@ -58,6 +57,32 @@ abstract class ServerBaseWriter extends Writer {
)
}

def toDeleteExecuteSentence(vertices: Vertices, deleteEdge: Boolean): String = {
{ if (deleteEdge) BATCH_DELETE_VERTEX_WITH_EDGE_TEMPLATE else BATCH_DELETE_VERTEX_TEMPLATE }
.format(
Type.VERTEX.toString,
vertices.values
.map { vertex =>
if (vertices.policy.isEmpty) {
DELETE_VALUE_TEMPLATE.format(vertex.vertexID)
} else {
vertices.policy.get match {
case KeyPolicy.HASH =>
ENDPOINT_TEMPLATE
.format(KeyPolicy.HASH.toString, vertex.vertexID)
case KeyPolicy.UUID =>
ENDPOINT_TEMPLATE
.format(KeyPolicy.UUID.toString, vertex.vertexID)
case _ =>
throw new IllegalArgumentException(
s"invalidate vertex policy ${vertices.policy.get}")
}
}
}
.mkString(", ")
)
}

def toExecuteSentence(name: String, edges: Edges, ignoreIndex: Boolean): String = {
val values = edges.values
.map { edge =>
Expand Down Expand Up @@ -100,6 +125,40 @@ abstract class ServerBaseWriter extends Writer {
values)
}

def toDeleteExecuteSentence(edgeName: String, edges: Edges): String = {
BATCH_DELETE_EDGE_TEMPLATE.format(
Type.EDGE.toString,
edgeName,
edges.values
.map { value =>
EDGE_ENDPOINT_TEMPLATE.format(
edges.sourcePolicy match {
case Some(KeyPolicy.HASH) =>
ENDPOINT_TEMPLATE.format(KeyPolicy.HASH.toString, value.source)
case Some(KeyPolicy.UUID) =>
ENDPOINT_TEMPLATE.format(KeyPolicy.UUID.toString, value.source)
case None => value.source
case _ =>
throw new IllegalArgumentException(
s"source vertex policy ${edges.sourcePolicy.get} is not supported")
},
edges.targetPolicy match {
case Some(KeyPolicy.HASH) =>
ENDPOINT_TEMPLATE.format(KeyPolicy.HASH.toString, value.destination)
case Some(KeyPolicy.UUID) =>
ENDPOINT_TEMPLATE.format(KeyPolicy.UUID.toString, value.destination)
case None => value.destination
case _ =>
throw new IllegalArgumentException(
s"target vertex policy ${edges.targetPolicy.get} is not supported")
},
if (value.ranking.isEmpty) 0 else value.ranking.get
)
}
.mkString(", ")
)
}

def writeVertices(vertices: Vertices, ignoreIndex: Boolean): String

def writeEdges(edges: Edges, ignoreIndex: Boolean): String
Expand Down Expand Up @@ -137,10 +196,41 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry,
LOG.info(s"Connection to ${dataBaseConfigEntry.graphAddress}")
}

def execute(vertices: Vertices, writeMode: WriteMode.Mode): String = {
val sentence = writeMode match {
case WriteMode.INSERT =>
toExecuteSentence(config.name, vertices, config.asInstanceOf[TagConfigEntry].ignoreIndex)
case WriteMode.UPDATE =>
// TODO: add definition and implementation for update
toExecuteSentence(config.name, vertices, config.asInstanceOf[TagConfigEntry].ignoreIndex)
case WriteMode.DELETE =>
toDeleteExecuteSentence(vertices, config.asInstanceOf[TagConfigEntry].deleteEdge)
case _ =>
throw new IllegalArgumentException(s"write mode ${writeMode} not supported.")
}
sentence
}

def execute(edges: Edges, writeMode: WriteMode.Mode): String = {
val sentence = writeMode match {
case WriteMode.INSERT =>
toExecuteSentence(config.name, edges, config.asInstanceOf[EdgeConfigEntry].ignoreIndex)
case WriteMode.UPDATE =>
// TODO: add definition and implementation for update
toExecuteSentence(config.name, edges, config.asInstanceOf[EdgeConfigEntry].ignoreIndex)
case WriteMode.DELETE =>
toDeleteExecuteSentence(config.name, edges)
case _ =>
throw new IllegalArgumentException(s"write mode ${writeMode} not supported.")
}
sentence
}

override def writeVertices(vertices: Vertices, ignoreIndex: Boolean = false): String = {
val sentence = toExecuteSentence(config.name, vertices, ignoreIndex)
// val sentence = toExecuteSentence(config.name, vertices, ignoreIndex)
val statement = execute(vertices, config.asInstanceOf[TagConfigEntry].writeMode)
if (rateLimiter.tryAcquire(rateConfig.timeout, TimeUnit.MILLISECONDS)) {
val result = graphProvider.submit(session, sentence)
val result = graphProvider.submit(session, statement)
if (result.isSucceeded) {
LOG.info(
s" write ${config.name}, batch size(${vertices.values.size}), latency(${result.getLatency})")
Expand All @@ -154,11 +244,12 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry,
} else {
LOG.error(s"write vertex failed because write speed is too fast")
}
sentence
statement
}

override def writeEdges(edges: Edges, ignoreIndex: Boolean = false): String = {
val sentence = toExecuteSentence(config.name, edges, ignoreIndex)
val statement = execute(edges, config.asInstanceOf[EdgeConfigEntry].writeMode)
if (rateLimiter.tryAcquire(rateConfig.timeout, TimeUnit.MILLISECONDS)) {
val result = graphProvider.submit(session, sentence)
if (result.isSucceeded) {
Expand Down
9 changes: 9 additions & 0 deletions exchange-common/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
source: parquet
sink: client
}
writeMode: INSERT
path: path0
fields: [parquet-field-0, parquet-field-1, parquet-field-2]
nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2]
Expand All @@ -110,6 +111,7 @@
source: csv
sink: sst
}
writeMode: INSERT
path: path1
# if your csv file has no header, then use _c0,_c1,_c2,.. to indicate fields
fields: [csv-field-0, csv-field-1, csv-field-2]
Expand All @@ -130,6 +132,7 @@
source: json
sink: client
}
writeMode: DELETE
path: path3
fields: [json-field-0, json-field-1, json-field-2]
nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2]
Expand All @@ -148,6 +151,7 @@
source: hive
sink: client
}
writeMode: INSERT
exec: "select hive-field0, hive-field1, hive-field2 from database.table"
fields: [hive-field-0, hive-field-1, hive-field-2]
nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2]
Expand All @@ -166,6 +170,7 @@
source: neo4j
sink: client
}
writeMode: INSERT
server: "bolt://127.0.0.1:7687"
user: neo4j
password: neo4j
Expand All @@ -189,6 +194,7 @@
source: hbase
sink: client
}
writeMode: INSERT
host:127.0.0.1
port:2181
table:hbase-table
Expand All @@ -209,6 +215,7 @@
source: pulsar
sink: client
}
writeMode: INSERT
service: "pulsar://localhost:6650"
admin: "http://localhost:8081"
options: {
Expand Down Expand Up @@ -251,6 +258,7 @@
source: mysql
sink: client
}
writeMode: INSERT
user:root
host: "127.0.0.1"
port: "3306"
Expand All @@ -276,6 +284,7 @@
source: postgresql
sink: client
}
writeMode: INSERT
user:root
host: "127.0.0.1"
port: "5432"
Expand Down
Loading