Skip to content

Commit

Permalink
Insert vertex and edge without overwrite (#95)
Browse files Browse the repository at this point in the history
* insert without overwrite

* fix overwrite parameter

* fix statement template
  • Loading branch information
Nicole00 authored Apr 19, 2023
1 parent 1e01411 commit 4280cf4
Show file tree
Hide file tree
Showing 15 changed files with 107 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,15 @@ private[connector] class WriteNebulaConfig(space: String,
user: String,
passwd: String,
batch: Int,
writeMode: String)
writeMode: String,
overwrite: Boolean)
extends Serializable {
def getSpace = space
def getBatch = batch
def getUser = user
def getPasswd = passwd
def getWriteMode = writeMode
def isOverwrite = overwrite
}

/**
Expand All @@ -242,8 +244,9 @@ class WriteNebulaVertexConfig(space: String,
user: String,
passwd: String,
writeMode: String,
deleteEdge: Boolean)
extends WriteNebulaConfig(space, user, passwd, batch, writeMode) {
deleteEdge: Boolean,
overwrite: Boolean)
extends WriteNebulaConfig(space, user, passwd, batch, writeMode, overwrite) {
def getTagName = tagName
def getVidField = vidField
def getVidPolicy = if (vidPolicy == null) "" else vidPolicy
Expand Down Expand Up @@ -275,6 +278,9 @@ object WriteNebulaVertexConfig {
/** whether delete the related edges of vertex */
var deleteEdge: Boolean = false

/** whether overwrite the exists vertex */
var overwrite: Boolean = true

/**
* set space name
*/
Expand Down Expand Up @@ -356,6 +362,14 @@ object WriteNebulaVertexConfig {
this
}

/**
* set whether overwrite the exists vertex
*/
def withOverwrite(overwrite: Boolean): WriteVertexConfigBuilder = {
this.overwrite = overwrite
this;
}

/**
* check and get WriteNebulaVertexConfig
*/
Expand All @@ -370,7 +384,8 @@ object WriteNebulaVertexConfig {
user,
passwd,
writeMode,
deleteEdge)
deleteEdge,
overwrite)
}

private def check(): Unit = {
Expand Down Expand Up @@ -436,8 +451,9 @@ class WriteNebulaEdgeConfig(space: String,
rankAsProp: Boolean,
user: String,
passwd: String,
writeMode: String)
extends WriteNebulaConfig(space, user, passwd, batch, writeMode) {
writeMode: String,
overwrite: Boolean)
extends WriteNebulaConfig(space, user, passwd, batch, writeMode, overwrite) {
def getEdgeName = edgeName
def getSrcFiled = srcFiled
def getSrcPolicy = if (srcPolicy == null) "" else srcPolicy
Expand Down Expand Up @@ -487,6 +503,9 @@ object WriteNebulaEdgeConfig {
/** write mode for nebula, insert or update */
var writeMode: String = WriteMode.INSERT.toString

/** whether overwrite the exists edge */
var overwrite: Boolean = true

/**
* set space name
*/
Expand Down Expand Up @@ -600,6 +619,14 @@ object WriteNebulaEdgeConfig {
this
}

/**
* set whether overwrite the exists edge
*/
def withOverwrite(overwrite: Boolean): WriteEdgeConfigBuilder = {
this.overwrite = overwrite
this
}

/**
* check configs and get WriteNebulaEdgeConfig
*/
Expand All @@ -618,7 +645,8 @@ object WriteNebulaEdgeConfig {
rankAsProp,
user,
passwd,
writeMode)
writeMode,
overwrite)
}

private def check(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String]) exten
var rankAsProp: Boolean = _
var writeMode: WriteMode.Value = _
var deleteEdge: Boolean = _
var overwrite: Boolean = _

if (operaType == OperaType.WRITE) {
require(parameters.isDefinedAt(GRAPH_ADDRESS),
Expand Down Expand Up @@ -175,6 +176,7 @@ class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String]) exten
writeMode =
WriteMode.withName(parameters.getOrElse(WRITE_MODE, DEFAULT_WRITE_MODE).toString.toLowerCase)
deleteEdge = parameters.getOrElse(DELETE_EDGE, false).toString.toBoolean
overwrite = parameters.getOrElse(OVERWRITE, true).toString.toBoolean
}

def getReturnCols: List[String] = {
Expand Down Expand Up @@ -260,6 +262,7 @@ object NebulaOptions {
val RANK_AS_PROP: String = "rankAsProp"
val WRITE_MODE: String = "writeMode"
val DELETE_EDGE: String = "deleteEdge"
val OVERWRITE: String = "overwrite"

val DEFAULT_TIMEOUT: Int = 3000
val DEFAULT_CONNECTION_TIMEOUT: Int = 3000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ package com.vesoft.nebula.connector

object NebulaTemplate {

private[connector] val BATCH_INSERT_TEMPLATE = "INSERT %s `%s`(%s) VALUES %s"
private[connector] val BATCH_INSERT_TEMPLATE = "INSERT %s `%s`(%s) VALUES %s"
private[connector] val BATCH_INSERT_NO_OVERWRITE_TEMPLATE =
"INSERT %s IF NOT EXISTS `%s`(%s) VALUES %s"
private[connector] val VERTEX_VALUE_TEMPLATE = "%s: (%s)"
private[connector] val VERTEX_VALUE_TEMPLATE_WITH_POLICY = "%s(\"%s\"): (%s)"
private[connector] val ENDPOINT_TEMPLATE = "%s(\"%s\")"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package com.vesoft.nebula.connector.writer

import com.vesoft.nebula.PropertyType
import com.vesoft.nebula.connector.NebulaTemplate.{
BATCH_INSERT_NO_OVERWRITE_TEMPLATE,
BATCH_INSERT_TEMPLATE,
DELETE_EDGE_TEMPLATE,
DELETE_VERTEX_TEMPLATE,
Expand Down Expand Up @@ -215,8 +216,8 @@ object NebulaExecutor {
/**
* construct insert statement for vertex
*/
def toExecuteSentence(tagName: String, vertices: NebulaVertices): String = {
BATCH_INSERT_TEMPLATE.format(
def toExecuteSentence(tagName: String, vertices: NebulaVertices, overwrite: Boolean): String = {
(if (overwrite) BATCH_INSERT_TEMPLATE else BATCH_INSERT_NO_OVERWRITE_TEMPLATE).format(
DataTypeEnum.VERTEX.toString,
tagName,
vertices.propertyNames,
Expand Down Expand Up @@ -244,7 +245,7 @@ object NebulaExecutor {
/**
* construct insert statement for edge
*/
def toExecuteSentence(edgeName: String, edges: NebulaEdges): String = {
def toExecuteSentence(edgeName: String, edges: NebulaEdges, overwrite: Boolean): String = {
val values = edges.values
.map { edge =>
val source = edges.getSourcePolicy match {
Expand Down Expand Up @@ -278,7 +279,8 @@ object NebulaExecutor {
EDGE_VALUE_TEMPLATE.format(source, target, edge.rank.get, edge.propertyValues)
}
.mkString(", ")
BATCH_INSERT_TEMPLATE.format(DataTypeEnum.EDGE.toString, edgeName, edges.propertyNames, values)
(if (overwrite) BATCH_INSERT_TEMPLATE else BATCH_INSERT_NO_OVERWRITE_TEMPLATE)
.format(DataTypeEnum.EDGE.toString, edgeName, edges.propertyNames, values)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ package object connector {
.option(NebulaOptions.VID_AS_PROP, writeConfig.getVidAsProp)
.option(NebulaOptions.WRITE_MODE, writeConfig.getWriteMode)
.option(NebulaOptions.DELETE_EDGE, writeConfig.getDeleteEdge)
.option(NebulaOptions.OVERWRITE, writeConfig.isOverwrite)
.option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress)
.option(NebulaOptions.GRAPH_ADDRESS, connectionConfig.getGraphAddress)
.option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout)
Expand Down Expand Up @@ -296,6 +297,7 @@ package object connector {
.option(NebulaOptions.DST_AS_PROP, writeConfig.getDstAsProp)
.option(NebulaOptions.RANK_AS_PROP, writeConfig.getRankAsProp)
.option(NebulaOptions.WRITE_MODE, writeConfig.getWriteMode)
.option(NebulaOptions.OVERWRITE, writeConfig.isOverwrite)
.option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress)
.option(NebulaOptions.GRAPH_ADDRESS, connectionConfig.getGraphAddress)
.option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ class NebulaEdgeWriter(nebulaOptions: NebulaOptions,
def execute(): Unit = {
val nebulaEdges = NebulaEdges(propNames, edges.toList, srcPolicy, dstPolicy)
val exec = nebulaOptions.writeMode match {
case WriteMode.INSERT => NebulaExecutor.toExecuteSentence(nebulaOptions.label, nebulaEdges)
case WriteMode.INSERT =>
NebulaExecutor.toExecuteSentence(nebulaOptions.label, nebulaEdges, nebulaOptions.overwrite)
case WriteMode.UPDATE =>
NebulaExecutor.toUpdateExecuteStatement(nebulaOptions.label, nebulaEdges)
case WriteMode.DELETE =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ class NebulaVertexWriter(nebulaOptions: NebulaOptions, vertexIndex: Int, schema:
def execute(): Unit = {
val nebulaVertices = NebulaVertices(propNames, vertices.toList, policy)
val exec = nebulaOptions.writeMode match {
case WriteMode.INSERT => NebulaExecutor.toExecuteSentence(nebulaOptions.label, nebulaVertices)
case WriteMode.INSERT =>
NebulaExecutor.toExecuteSentence(nebulaOptions.label,
nebulaVertices,
nebulaOptions.overwrite)
case WriteMode.UPDATE =>
NebulaExecutor.toUpdateExecuteStatement(nebulaOptions.label, nebulaVertices)
case WriteMode.DELETE =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,19 @@ class NebulaExecutorSuite extends AnyFunSuite with BeforeAndAfterAll {
vertices.append(NebulaVertex("\"vid2\"", props2))

val nebulaVertices = NebulaVertices(propNames, vertices.toList, None)
val vertexStatement = NebulaExecutor.toExecuteSentence(tagName, nebulaVertices)
val vertexStatement = NebulaExecutor.toExecuteSentence(tagName, nebulaVertices, true)

val expectStatement = "INSERT vertex `person`(`col_string`,`col_fixed_string`,`col_bool`," +
"`col_int`,`col_int64`,`col_double`,`col_date`,`col_geo`) VALUES \"vid1\": (" + props1
.mkString(", ") + "), \"vid2\": (" + props2.mkString(", ") + ")"
assert(expectStatement.equals(vertexStatement))

val vertexWithoutOverwriteStatement =
NebulaExecutor.toExecuteSentence(tagName, nebulaVertices, false)
val expectWithoutOverwriteStatement = "INSERT vertex IF NOT EXISTS `person`(`col_string`," +
"`col_fixed_string`,`col_bool`,`col_int`,`col_int64`,`col_double`,`col_date`,`col_geo`) " +
"VALUES \"vid1\": (" + props1.mkString(", ") + "), \"vid2\": (" + props2.mkString(", ") + ")"
assert(expectWithoutOverwriteStatement.equals(vertexWithoutOverwriteStatement))
}

test("test toExecuteSentence for vertex with hash policy") {
Expand All @@ -167,7 +174,7 @@ class NebulaExecutorSuite extends AnyFunSuite with BeforeAndAfterAll {
vertices.append(NebulaVertex("vid2", props2))

val nebulaVertices = NebulaVertices(propNames, vertices.toList, Some(KeyPolicy.HASH))
val vertexStatement = NebulaExecutor.toExecuteSentence(tagName, nebulaVertices)
val vertexStatement = NebulaExecutor.toExecuteSentence(tagName, nebulaVertices, true)

val expectStatement = "INSERT vertex `person`(`col_string`,`col_fixed_string`,`col_bool`," +
"`col_int`,`col_int64`,`col_double`,`col_date`,`col_geo`) VALUES hash(\"vid1\"): (" + props1
Expand Down Expand Up @@ -201,12 +208,20 @@ class NebulaExecutorSuite extends AnyFunSuite with BeforeAndAfterAll {
edges.append(NebulaEdge("\"vid2\"", "\"vid1\"", Some(2L), props2))

val nebulaEdges = NebulaEdges(propNames, edges.toList, None, None)
val edgeStatement = NebulaExecutor.toExecuteSentence(edgeName, nebulaEdges)
val edgeStatement = NebulaExecutor.toExecuteSentence(edgeName, nebulaEdges, true)

val expectStatement = "INSERT edge `friend`(`col_string`,`col_fixed_string`,`col_bool`,`col_int`" +
",`col_int64`,`col_double`,`col_date`,`col_geo`) VALUES \"vid1\"->\"vid2\"@1: (" +
props1.mkString(", ") + "), \"vid2\"->\"vid1\"@2: (" + props2.mkString(", ") + ")"
assert(expectStatement.equals(edgeStatement))

val edgeWithoutOverwriteStatement =
NebulaExecutor.toExecuteSentence(edgeName, nebulaEdges, false)
val expectWithoutOverwriteStatement = "INSERT edge IF NOT EXISTS `friend`(`col_string`," +
"`col_fixed_string`,`col_bool`,`col_int`,`col_int64`,`col_double`,`col_date`,`col_geo`) " +
"VALUES \"vid1\"->\"vid2\"@1: (" + props1.mkString(", ") + "), \"vid2\"->\"vid1\"@2: (" +
props2.mkString(", ") + ")"
assert(expectWithoutOverwriteStatement.equals(edgeWithoutOverwriteStatement))
}

test("test toUpdateExecuteSentence for vertex") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ package object connector {
.option(NebulaOptions.BATCH, writeConfig.getBatch)
.option(NebulaOptions.VID_AS_PROP, writeConfig.getVidAsProp)
.option(NebulaOptions.WRITE_MODE, writeConfig.getWriteMode)
.option(NebulaOptions.OVERWRITE, writeConfig.isOverwrite)
.option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress)
.option(NebulaOptions.GRAPH_ADDRESS, connectionConfig.getGraphAddress)
.option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout)
Expand Down Expand Up @@ -358,6 +359,7 @@ package object connector {
.option(NebulaOptions.DST_AS_PROP, writeConfig.getDstAsProp)
.option(NebulaOptions.RANK_AS_PROP, writeConfig.getRankAsProp)
.option(NebulaOptions.WRITE_MODE, writeConfig.getWriteMode)
.option(NebulaOptions.OVERWRITE, writeConfig.isOverwrite)
.option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress)
.option(NebulaOptions.GRAPH_ADDRESS, connectionConfig.getGraphAddress)
.option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ class NebulaEdgeWriter(nebulaOptions: NebulaOptions,
def execute(): Unit = {
val nebulaEdges = NebulaEdges(propNames, edges.toList, srcPolicy, dstPolicy)
val exec = nebulaOptions.writeMode match {
case WriteMode.INSERT => NebulaExecutor.toExecuteSentence(nebulaOptions.label, nebulaEdges)
case WriteMode.INSERT =>
NebulaExecutor.toExecuteSentence(nebulaOptions.label, nebulaEdges, nebulaOptions.overwrite)
case WriteMode.UPDATE =>
NebulaExecutor.toUpdateExecuteStatement(nebulaOptions.label, nebulaEdges)
case WriteMode.DELETE =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ class NebulaVertexWriter(nebulaOptions: NebulaOptions, vertexIndex: Int, schema:
private def execute(): Unit = {
val nebulaVertices = NebulaVertices(propNames, vertices.toList, policy)
val exec = nebulaOptions.writeMode match {
case WriteMode.INSERT => NebulaExecutor.toExecuteSentence(nebulaOptions.label, nebulaVertices)
case WriteMode.INSERT =>
NebulaExecutor.toExecuteSentence(nebulaOptions.label,
nebulaVertices,
nebulaOptions.overwrite)
case WriteMode.UPDATE =>
NebulaExecutor.toUpdateExecuteStatement(nebulaOptions.label, nebulaVertices)
case WriteMode.DELETE =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ package object connector {
.option(NebulaOptions.VID_AS_PROP, writeConfig.getVidAsProp)
.option(NebulaOptions.WRITE_MODE, writeConfig.getWriteMode)
.option(NebulaOptions.DELETE_EDGE, writeConfig.getDeleteEdge)
.option(NebulaOptions.OVERWRITE, writeConfig.isOverwrite)
.option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress)
.option(NebulaOptions.GRAPH_ADDRESS, connectionConfig.getGraphAddress)
.option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout)
Expand Down Expand Up @@ -257,6 +258,7 @@ package object connector {
.option(NebulaOptions.DST_AS_PROP, writeConfig.getDstAsProp)
.option(NebulaOptions.RANK_AS_PROP, writeConfig.getRankAsProp)
.option(NebulaOptions.WRITE_MODE, writeConfig.getWriteMode)
.option(NebulaOptions.OVERWRITE, writeConfig.isOverwrite)
.option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress)
.option(NebulaOptions.GRAPH_ADDRESS, connectionConfig.getGraphAddress)
.option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ class NebulaEdgeWriter(nebulaOptions: NebulaOptions,
def execute(): Unit = {
val nebulaEdges = NebulaEdges(propNames, edges.toList, srcPolicy, dstPolicy)
val exec = nebulaOptions.writeMode match {
case WriteMode.INSERT => NebulaExecutor.toExecuteSentence(nebulaOptions.label, nebulaEdges)
case WriteMode.INSERT =>
NebulaExecutor.toExecuteSentence(nebulaOptions.label, nebulaEdges, nebulaOptions.overwrite)
case WriteMode.UPDATE =>
NebulaExecutor.toUpdateExecuteStatement(nebulaOptions.label, nebulaEdges)
case WriteMode.DELETE =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ class NebulaVertexWriter(nebulaOptions: NebulaOptions, vertexIndex: Int, schema:
def execute(): Unit = {
val nebulaVertices = NebulaVertices(propNames, vertices.toList, policy)
val exec = nebulaOptions.writeMode match {
case WriteMode.INSERT => NebulaExecutor.toExecuteSentence(nebulaOptions.label, nebulaVertices)
case WriteMode.INSERT =>
NebulaExecutor.toExecuteSentence(nebulaOptions.label,
nebulaVertices,
nebulaOptions.overwrite)
case WriteMode.UPDATE =>
NebulaExecutor.toUpdateExecuteStatement(nebulaOptions.label, nebulaVertices)
case WriteMode.DELETE =>
Expand Down
Loading

0 comments on commit 4280cf4

Please sign in to comment.