Skip to content

Commit

Permalink
support delete related edges when delete vertex
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicole00 committed Aug 16, 2022
1 parent afdb61d commit ace6db4
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,14 @@ class WriteNebulaVertexConfig(space: String,
vidAsProp: Boolean,
user: String,
passwd: String,
writeMode: String)
writeMode: String,
deleteEdge: Boolean)
extends WriteNebulaConfig(space, user, passwd, batch, writeMode) {
def getTagName = tagName
def getVidField = vidField
def getVidPolicy = if (vidPolicy == null) "" else vidPolicy
def getVidAsProp = vidAsProp
def getTagName = tagName
def getVidField = vidField
def getVidPolicy = if (vidPolicy == null) "" else vidPolicy
def getVidAsProp = vidAsProp
def getDeleteEdge = deleteEdge
}

/**
Expand All @@ -270,6 +272,9 @@ object WriteNebulaVertexConfig {
/** whether set vid as property */
var vidAsProp: Boolean = false

/** whether delete the related edges of vertex */
var deleteEdge: Boolean = false

/**
* set space name
*/
Expand Down Expand Up @@ -343,6 +348,14 @@ object WriteNebulaVertexConfig {
this
}

/**
* set whether delete related edges when delete vertex
*/
def withDeleteEdge(deleteEdge: Boolean): WriteVertexConfigBuilder = {
this.deleteEdge = deleteEdge
this
}

/**
* check and get WriteNebulaVertexConfig
*/
Expand All @@ -356,7 +369,8 @@ object WriteNebulaVertexConfig {
vidAsProp,
user,
passwd,
writeMode)
writeMode,
deleteEdge)
}

private def check(): Unit = {
Expand Down Expand Up @@ -388,7 +402,7 @@ object WriteNebulaVertexConfig {
}
LOG.info(
s"NebulaWriteVertexConfig={space=$space,tagName=$tagName,vidField=$vidField," +
s"vidPolicy=$vidPolicy,batch=$batch,writeMode=$writeMode}")
s"vidPolicy=$vidPolicy,batch=$batch,writeMode=$writeMode,deleteEdge=$deleteEdge}")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String])(
var dstAsProp: Boolean = _
var rankAsProp: Boolean = _
var writeMode: WriteMode.Value = _
var deleteEdge: Boolean = _

if (operaType == OperaType.WRITE) {
require(parameters.isDefinedAt(GRAPH_ADDRESS),
Expand Down Expand Up @@ -166,6 +167,7 @@ class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String])(
rankAsProp = parameters.getOrElse(RANK_AS_PROP, false).toString.toBoolean
writeMode =
WriteMode.withName(parameters.getOrElse(WRITE_MODE, DEFAULT_WRITE_MODE).toString.toLowerCase)
deleteEdge = parameters.getOrElse(DELETE_EDGE, false).toString.toBoolean
}

def getReturnCols: List[String] = {
Expand Down Expand Up @@ -249,6 +251,7 @@ object NebulaOptions {
val DST_AS_PROP: String = "dstAsProp"
val RANK_AS_PROP: String = "rankAsProp"
val WRITE_MODE: String = "writeMode"
val DELETE_EDGE: String = "deleteEdge"

val DEFAULT_TIMEOUT: Int = 3000
val DEFAULT_CONNECTION_TIMEOUT: Int = 3000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ object NebulaTemplate {
private[connector] val UPDATE_EDGE_TEMPLATE = "UPDATE %s ON `%s` %s->%s@%d SET %s"
private[connector] val UPDATE_VALUE_TEMPLATE = "`%s`=%s"

private[connector] val DELETE_VERTEX_TEMPLATE = "DELETE VERTEX %s"
private[connector] val DELETE_EDGE_TEMPLATE = "DELETE EDGE `%s` %s"
private[connector] val EDGE_ENDPOINT_TEMPLATE = "%s->%s@%d"
private[connector] val DELETE_VERTEX_TEMPLATE = "DELETE VERTEX %s"
private[connector] val DELETE_VERTEX_WITH_EDGE_TEMPLATE = "DELETE VERTEX %s WITH EDGE"
private[connector] val DELETE_EDGE_TEMPLATE = "DELETE EDGE `%s` %s"
private[connector] val EDGE_ENDPOINT_TEMPLATE = "%s->%s@%d"
}
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ package object connector {
.option(NebulaOptions.BATCH, writeConfig.getBatch)
.option(NebulaOptions.VID_AS_PROP, writeConfig.getVidAsProp)
.option(NebulaOptions.WRITE_MODE, writeConfig.getWriteMode)
.option(NebulaOptions.DELETE_EDGE, writeConfig.getDeleteEdge)
.option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress)
.option(NebulaOptions.GRAPH_ADDRESS, connectionConfig.getGraphAddress)
.option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.vesoft.nebula.connector.NebulaTemplate.{
BATCH_INSERT_TEMPLATE,
DELETE_EDGE_TEMPLATE,
DELETE_VERTEX_TEMPLATE,
DELETE_VERTEX_WITH_EDGE_TEMPLATE,
EDGE_ENDPOINT_TEMPLATE,
EDGE_VALUE_TEMPLATE,
EDGE_VALUE_WITHOUT_RANKING_TEMPLATE,
Expand Down Expand Up @@ -365,26 +366,31 @@ object NebulaExecutor {
/**
* construct delete statement for vertex
*/
def toDeleteExecuteStatement(vertices: NebulaVertices): String = {
DELETE_VERTEX_TEMPLATE.format(
vertices.values
.map { value =>
vertices.policy match {
case Some(KeyPolicy.HASH) =>
ENDPOINT_TEMPLATE.format(KeyPolicy.HASH.toString, value.vertexIDSlice)
def toDeleteExecuteStatement(vertices: NebulaVertices, deleteEdge: Boolean): String = {
if (deleteEdge)
DELETE_VERTEX_WITH_EDGE_TEMPLATE.format(genDeleteVertexInfo(vertices))
else
DELETE_VERTEX_TEMPLATE.format(genDeleteVertexInfo(vertices))
}

case Some(KeyPolicy.UUID) =>
ENDPOINT_TEMPLATE.format(KeyPolicy.UUID.toString, value.vertexIDSlice)
private def genDeleteVertexInfo(vertices: NebulaVertices): String = {
vertices.values
.map { value =>
vertices.policy match {
case Some(KeyPolicy.HASH) =>
ENDPOINT_TEMPLATE.format(KeyPolicy.HASH.toString, value.vertexIDSlice)

case None =>
value.vertexIDSlice
case _ =>
throw new IllegalArgumentException(
s"vertex policy ${vertices.policy.get} is not supported")
}
case Some(KeyPolicy.UUID) =>
ENDPOINT_TEMPLATE.format(KeyPolicy.UUID.toString, value.vertexIDSlice)

case None =>
value.vertexIDSlice
case _ =>
throw new IllegalArgumentException(
s"vertex policy ${vertices.policy.get} is not supported")
}
.mkString(",")
)
}
.mkString(",")
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ class NebulaVertexWriter(nebulaOptions: NebulaOptions, vertexIndex: Int, schema:
case WriteMode.INSERT => NebulaExecutor.toExecuteSentence(nebulaOptions.label, nebulaVertices)
case WriteMode.UPDATE =>
NebulaExecutor.toUpdateExecuteStatement(nebulaOptions.label, nebulaVertices)
case WriteMode.DELETE => NebulaExecutor.toDeleteExecuteStatement(nebulaVertices)
case WriteMode.DELETE =>
NebulaExecutor.toDeleteExecuteStatement(nebulaVertices, nebulaOptions.deleteEdge)
case _ =>
throw new IllegalArgumentException(s"write mode ${nebulaOptions.writeMode} not supported.")
}
Expand Down

0 comments on commit ace6db4

Please sign in to comment.