diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala index b4c145cf..a1c1724b 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala @@ -35,6 +35,13 @@ object SslType extends Enumeration { val SELF = Value("self") } +object WriteMode extends Enumeration { + type Mode = Value + val INSERT = Value("insert") + val UPDATE = Value("update") + val DELETE = Value("delete") +} + /** * DataBaseConfigEntry describe the nebula cluster's address and which space will be used. * @@ -261,6 +268,7 @@ object Configs { private[this] val DEFAULT_STREAM_INTERVAL = 30 private[this] val DEFAULT_KAFKA_STARTINGOFFSETS = "latest" private[this] val DEFAULT_PARALLEL = 1 + private[this] val DEFAULT_WRITE_MODE = "INSERT" /** * @@ -429,7 +437,10 @@ object Configs { val sinkConfig = dataSinkConfig(sinkCategory, nebulaConfig) LOG.info(s"Sink Config ${sourceConfig}") - val batch = getOrElse(tagConfig, "batch", DEFAULT_BATCH) + // val writeMode = toWriteModeCategory(tagConfig.getString("writeMode")) + val writeModeStr = getOrElse(tagConfig, "writeMode", DEFAULT_WRITE_MODE) + val writeMode = toWriteModeCategory(writeModeStr) + val batch = getOrElse(tagConfig, "batch", DEFAULT_BATCH) val checkPointPath = if (tagConfig.hasPath("check_point_path")) Some(tagConfig.getString("check_point_path")) else DEFAULT_CHECK_POINT_PATH @@ -440,6 +451,7 @@ object Configs { val partition = getOrElse(tagConfig, "partition", DEFAULT_PARTITION) val repartitionWithNebula = getOrElse(tagConfig, "repartitionWithNebula", true) val ignoreIndex = getOrElse(tagConfig, "ignoreIndex", false) + val deleteEdge = getOrElse(tagConfig, "deleteEdge", false) val vertexUdf = if (tagConfig.hasPath("vertex.udf")) { val sep = tagConfig.getString("vertex.udf.separator") @@ -455,6 +467,7 @@ object Configs { sinkConfig, fields, nebulaFields, + writeMode, vertexField, policyOpt, prefix, @@ -464,6 +477,7 @@ object Configs { repartitionWithNebula, enableTagless, ignoreIndex, + deleteEdge, vertexUdf ) LOG.info(s"Tag Config: ${entry}") @@ -562,7 +576,9 @@ object Configs { None } - val batch = getOrElse(edgeConfig, "batch", DEFAULT_BATCH) + val writeModeStr = getOrElse(edgeConfig, "writeMode", DEFAULT_WRITE_MODE) + val writeMode = toWriteModeCategory(writeModeStr) + val batch = getOrElse(edgeConfig, "batch", DEFAULT_BATCH) val checkPointPath = if (edgeConfig.hasPath("check_point_path")) Some(edgeConfig.getString("check_point_path")) else DEFAULT_CHECK_POINT_PATH @@ -595,6 +611,7 @@ object Configs { sinkConfig, fields, nebulaFields, + writeMode, sourceField, sourcePolicy, sourcePrefix, @@ -672,6 +689,21 @@ object Configs { } } + /** + * Use to get write mode according to category of writeMode. + * + * @param category + * @return + */ + private[this] def toWriteModeCategory(category: String): WriteMode.Mode = { + category.trim.toUpperCase match { + case "INSERT" => WriteMode.INSERT + case "UPDATE" => WriteMode.UPDATE + case "DELETE" => WriteMode.DELETE + case _ => throw new IllegalArgumentException(s"${category} not support") + } + } + /** * Use to generate data source com.vesoft.exchange.common.config according to category of source. * diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SchemaConfigs.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SchemaConfigs.scala index 145fafc6..5ad5e93c 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SchemaConfigs.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SchemaConfigs.scala @@ -35,6 +35,9 @@ sealed trait SchemaConfigEntry { /** check point path */ def checkPointPath: Option[String] + + /** write mode */ + def writeMode: WriteMode.Mode } /** @@ -55,6 +58,7 @@ case class TagConfigEntry(override val name: String, override val dataSinkConfigEntry: DataSinkConfigEntry, override val fields: List[String], override val nebulaFields: List[String], + override val writeMode: WriteMode.Mode, vertexField: String, vertexPolicy: Option[KeyPolicy.Value], vertexPrefix: String, @@ -64,6 +68,7 @@ case class TagConfigEntry(override val name: String, repartitionWithNebula: Boolean = true, enableTagless: Boolean = false, ignoreIndex: Boolean = false, + deleteEdge: Boolean = false, vertexUdf: Option[UdfConfigEntry] = None) extends SchemaConfigEntry { require( @@ -74,6 +79,7 @@ case class TagConfigEntry(override val name: String, s"Tag name: $name, " + s"source: $dataSourceConfigEntry, " + s"sink: $dataSinkConfigEntry, " + + s"writeMode: $writeMode, " + s"vertex field: $vertexField, " + s"vertex policy: $vertexPolicy, " + s"batch: $batch, " + @@ -109,6 +115,7 @@ case class EdgeConfigEntry(override val name: String, override val dataSinkConfigEntry: DataSinkConfigEntry, override val fields: List[String], override val nebulaFields: List[String], + override val writeMode: WriteMode.Mode, sourceField: String, sourcePolicy: Option[KeyPolicy.Value], sourcePrefix: String, @@ -136,6 +143,7 @@ case class EdgeConfigEntry(override val name: String, s"Edge name: $name, " + s"source: $dataSourceConfigEntry, " + s"sink: $dataSinkConfigEntry, " + + s"writeMode: $writeMode, " + s"latitude: $latitude, " + s"longitude: $longitude, " + s"source field: $sourceField, " + @@ -152,6 +160,7 @@ case class EdgeConfigEntry(override val name: String, s"Edge name: $name, " + s"source: $dataSourceConfigEntry, " + s"sink: $dataSinkConfigEntry, " + + s"writeMode: $writeMode, " + s"source field: $sourceField, " + s"source policy: $sourcePolicy, " + s"ranking: $rankingField, " + diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/ServerBaseWriter.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/ServerBaseWriter.scala index 5ba818ae..1e9dc0a8 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/ServerBaseWriter.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/writer/ServerBaseWriter.scala @@ -6,17 +6,10 @@ package com.vesoft.exchange.common.writer import java.util.concurrent.TimeUnit - import com.google.common.util.concurrent.RateLimiter import com.vesoft.exchange.common.GraphProvider import com.vesoft.exchange.common.{Edges, KeyPolicy, Vertices} -import com.vesoft.exchange.common.config.{ - DataBaseConfigEntry, - RateConfigEntry, - SchemaConfigEntry, - Type, - UserConfigEntry -} +import com.vesoft.exchange.common.config.{DataBaseConfigEntry, EdgeConfigEntry, RateConfigEntry, SchemaConfigEntry, TagConfigEntry, Type, UserConfigEntry, WriteMode} import com.vesoft.nebula.ErrorCode import org.apache.log4j.Logger @@ -30,6 +23,12 @@ abstract class ServerBaseWriter extends Writer { private[this] val EDGE_VALUE_WITHOUT_RANKING_TEMPLATE = "%s->%s: (%s)" private[this] val EDGE_VALUE_TEMPLATE = "%s->%s@%d: (%s)" + private[this] val BATCH_DELETE_VERTEX_TEMPLATE = "DELETE %s %s" + private[this] val BATCH_DELETE_VERTEX_WITH_EDGE_TEMPLATE = "DELETE %s %s WITH EDGE" + private[this] val DELETE_VALUE_TEMPLATE = "%s" + private[this] val BATCH_DELETE_EDGE_TEMPLATE = "DELETE %s `%s` %s" + private[this] val EDGE_ENDPOINT_TEMPLATE = "%s->%s@%d" + def toExecuteSentence(name: String, vertices: Vertices, ignoreIndex: Boolean): String = { { if (ignoreIndex) BATCH_INSERT_IGNORE_INDEX_TEMPLATE else BATCH_INSERT_TEMPLATE } .format( @@ -58,6 +57,32 @@ abstract class ServerBaseWriter extends Writer { ) } + def toDeleteExecuteSentence(vertices: Vertices, deleteEdge: Boolean): String = { + { if (deleteEdge) BATCH_DELETE_VERTEX_WITH_EDGE_TEMPLATE else BATCH_DELETE_VERTEX_TEMPLATE } + .format( + Type.VERTEX.toString, + vertices.values + .map { vertex => + if (vertices.policy.isEmpty) { + DELETE_VALUE_TEMPLATE.format(vertex.vertexID) + } else { + vertices.policy.get match { + case KeyPolicy.HASH => + ENDPOINT_TEMPLATE + .format(KeyPolicy.HASH.toString, vertex.vertexID) + case KeyPolicy.UUID => + ENDPOINT_TEMPLATE + .format(KeyPolicy.UUID.toString, vertex.vertexID) + case _ => + throw new IllegalArgumentException( + s"invalidate vertex policy ${vertices.policy.get}") + } + } + } + .mkString(", ") + ) + } + def toExecuteSentence(name: String, edges: Edges, ignoreIndex: Boolean): String = { val values = edges.values .map { edge => @@ -100,6 +125,40 @@ abstract class ServerBaseWriter extends Writer { values) } + def toDeleteExecuteSentence(edgeName: String, edges: Edges): String = { + BATCH_DELETE_EDGE_TEMPLATE.format( + Type.EDGE.toString, + edgeName, + edges.values + .map { value => + EDGE_ENDPOINT_TEMPLATE.format( + edges.sourcePolicy match { + case Some(KeyPolicy.HASH) => + ENDPOINT_TEMPLATE.format(KeyPolicy.HASH.toString, value.source) + case Some(KeyPolicy.UUID) => + ENDPOINT_TEMPLATE.format(KeyPolicy.UUID.toString, value.source) + case None => value.source + case _ => + throw new IllegalArgumentException( + s"source vertex policy ${edges.sourcePolicy.get} is not supported") + }, + edges.targetPolicy match { + case Some(KeyPolicy.HASH) => + ENDPOINT_TEMPLATE.format(KeyPolicy.HASH.toString, value.destination) + case Some(KeyPolicy.UUID) => + ENDPOINT_TEMPLATE.format(KeyPolicy.UUID.toString, value.destination) + case None => value.destination + case _ => + throw new IllegalArgumentException( + s"target vertex policy ${edges.targetPolicy.get} is not supported") + }, + if (value.ranking.isEmpty) 0 else value.ranking.get + ) + } + .mkString(", ") + ) + } + def writeVertices(vertices: Vertices, ignoreIndex: Boolean): String def writeEdges(edges: Edges, ignoreIndex: Boolean): String @@ -137,10 +196,41 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry, LOG.info(s"Connection to ${dataBaseConfigEntry.graphAddress}") } + def execute(vertices: Vertices, writeMode: WriteMode.Mode): String = { + val sentence = writeMode match { + case WriteMode.INSERT => + toExecuteSentence(config.name, vertices, config.asInstanceOf[TagConfigEntry].ignoreIndex) + case WriteMode.UPDATE => + // TODO: add definition and implementation for update + toExecuteSentence(config.name, vertices, config.asInstanceOf[TagConfigEntry].ignoreIndex) + case WriteMode.DELETE => + toDeleteExecuteSentence(vertices, config.asInstanceOf[TagConfigEntry].deleteEdge) + case _ => + throw new IllegalArgumentException(s"write mode ${writeMode} not supported.") + } + sentence + } + + def execute(edges: Edges, writeMode: WriteMode.Mode): String = { + val sentence = writeMode match { + case WriteMode.INSERT => + toExecuteSentence(config.name, edges, config.asInstanceOf[EdgeConfigEntry].ignoreIndex) + case WriteMode.UPDATE => + // TODO: add definition and implementation for update + toExecuteSentence(config.name, edges, config.asInstanceOf[EdgeConfigEntry].ignoreIndex) + case WriteMode.DELETE => + toDeleteExecuteSentence(config.name, edges) + case _ => + throw new IllegalArgumentException(s"write mode ${writeMode} not supported.") + } + sentence + } + override def writeVertices(vertices: Vertices, ignoreIndex: Boolean = false): String = { - val sentence = toExecuteSentence(config.name, vertices, ignoreIndex) + // val sentence = toExecuteSentence(config.name, vertices, ignoreIndex) + val statement = execute(vertices, config.asInstanceOf[TagConfigEntry].writeMode) if (rateLimiter.tryAcquire(rateConfig.timeout, TimeUnit.MILLISECONDS)) { - val result = graphProvider.submit(session, sentence) + val result = graphProvider.submit(session, statement) if (result.isSucceeded) { LOG.info( s" write ${config.name}, batch size(${vertices.values.size}), latency(${result.getLatency})") @@ -154,11 +244,12 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry, } else { LOG.error(s"write vertex failed because write speed is too fast") } - sentence + statement } override def writeEdges(edges: Edges, ignoreIndex: Boolean = false): String = { val sentence = toExecuteSentence(config.name, edges, ignoreIndex) + val statement = execute(edges, config.asInstanceOf[EdgeConfigEntry].writeMode) if (rateLimiter.tryAcquire(rateConfig.timeout, TimeUnit.MILLISECONDS)) { val result = graphProvider.submit(session, sentence) if (result.isSucceeded) { diff --git a/exchange-common/src/test/resources/application.conf b/exchange-common/src/test/resources/application.conf index 759fd7db..d97a0b63 100644 --- a/exchange-common/src/test/resources/application.conf +++ b/exchange-common/src/test/resources/application.conf @@ -91,6 +91,7 @@ source: parquet sink: client } + writeMode: INSERT path: path0 fields: [parquet-field-0, parquet-field-1, parquet-field-2] nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] @@ -110,6 +111,7 @@ source: csv sink: sst } + writeMode: INSERT path: path1 # if your csv file has no header, then use _c0,_c1,_c2,.. to indicate fields fields: [csv-field-0, csv-field-1, csv-field-2] @@ -130,6 +132,7 @@ source: json sink: client } + writeMode: DELETE path: path3 fields: [json-field-0, json-field-1, json-field-2] nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] @@ -148,6 +151,7 @@ source: hive sink: client } + writeMode: INSERT exec: "select hive-field0, hive-field1, hive-field2 from database.table" fields: [hive-field-0, hive-field-1, hive-field-2] nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] @@ -166,6 +170,7 @@ source: neo4j sink: client } + writeMode: INSERT server: "bolt://127.0.0.1:7687" user: neo4j password: neo4j @@ -189,6 +194,7 @@ source: hbase sink: client } + writeMode: INSERT host:127.0.0.1 port:2181 table:hbase-table @@ -209,6 +215,7 @@ source: pulsar sink: client } + writeMode: INSERT service: "pulsar://localhost:6650" admin: "http://localhost:8081" options: { @@ -251,6 +258,7 @@ source: mysql sink: client } + writeMode: INSERT user:root host: "127.0.0.1" port: "3306" @@ -276,6 +284,7 @@ source: postgresql sink: client } + writeMode: INSERT user:root host: "127.0.0.1" port: "5432" diff --git a/exchange-common/src/test/scala/com/vesoft/exchange/common/config/ConfigsSuite.scala b/exchange-common/src/test/scala/com/vesoft/exchange/common/config/ConfigsSuite.scala index 90fcf70d..a52096b1 100644 --- a/exchange-common/src/test/scala/com/vesoft/exchange/common/config/ConfigsSuite.scala +++ b/exchange-common/src/test/scala/com/vesoft/exchange/common/config/ConfigsSuite.scala @@ -6,26 +6,9 @@ package scala.com.vesoft.nebula.exchange.config import java.io.File - import com.vesoft.exchange.Argument import com.vesoft.exchange.common.KeyPolicy -import com.vesoft.exchange.common.config.{ - CaSignParam, - Configs, - DataBaseConfigEntry, - FileBaseSourceConfigEntry, - FileDataSourceConfigEntry, - HBaseSourceConfigEntry, - HiveSourceConfigEntry, - MySQLSourceConfigEntry, - Neo4JSourceConfigEntry, - PostgreSQLSourceConfigEntry, - SelfSignParam, - SinkCategory, - SourceCategory, - SslConfigEntry, - SslType -} +import com.vesoft.exchange.common.config.{CaSignParam, Configs, DataBaseConfigEntry, FileBaseSourceConfigEntry, FileDataSourceConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, PostgreSQLSourceConfigEntry, SelfSignParam, SinkCategory, SourceCategory, SslConfigEntry, SslType, WriteMode} import com.vesoft.nebula.client.graph.data.CASignedSSLParam import org.apache.log4j.Logger import org.junit.Test @@ -95,6 +78,10 @@ class ConfigsSuite { val sink = tagConfig.dataSinkConfigEntry assert(sink.category == SinkCategory.CLIENT || sink.category == SinkCategory.SST) + val writeMode = tagConfig.writeMode + assert(writeMode == WriteMode.INSERT || writeMode == WriteMode.UPDATE + || writeMode == WriteMode.DELETE) + val label = tagConfig.name val batch = tagConfig.batch val partition = tagConfig.partition @@ -184,6 +171,9 @@ class ConfigsSuite { val label = edgeConfig.name val batch = edgeConfig.batch val partition = edgeConfig.partition + val writeMode = edgeConfig.writeMode + assert(writeMode == WriteMode.INSERT || writeMode == WriteMode.UPDATE + || writeMode == WriteMode.DELETE) val sourceField = edgeConfig.sourceField val targetField = edgeConfig.targetField diff --git a/exchange-common/src/test/scala/com/vesoft/exchange/common/utils/NebulaUtilsSuite.scala b/exchange-common/src/test/scala/com/vesoft/exchange/common/utils/NebulaUtilsSuite.scala index 3cb82aa7..20bcd2ec 100644 --- a/exchange-common/src/test/scala/com/vesoft/exchange/common/utils/NebulaUtilsSuite.scala +++ b/exchange-common/src/test/scala/com/vesoft/exchange/common/utils/NebulaUtilsSuite.scala @@ -12,12 +12,7 @@ import com.vesoft.nebula.client.graph.NebulaPoolConfig import com.vesoft.nebula.client.graph.data.HostAddress import com.vesoft.nebula.client.graph.net.NebulaPool import com.vesoft.exchange.common.KeyPolicy -import com.vesoft.exchange.common.config.{ - NebulaSinkConfigEntry, - SinkCategory, - SslConfigEntry, - TagConfigEntry -} +import com.vesoft.exchange.common.config.{NebulaSinkConfigEntry, SinkCategory, SslConfigEntry, TagConfigEntry, WriteMode} import com.vesoft.exchange.common.utils.NebulaUtils import org.apache.log4j.Logger import org.junit.{After, Before, Test} @@ -80,11 +75,13 @@ class NebulaUtilsSuite { "col14") val label = "person" val dataSinkConfigEntry = NebulaSinkConfigEntry(SinkCategory.SST, List("127.0.0.1:9669")) + val writeMode = WriteMode.INSERT val sourceConfig = TagConfigEntry(label, null, dataSinkConfigEntry, sourceFields, nebulaFields, + writeMode, "id", Some(KeyPolicy.UUID), null, @@ -135,6 +132,7 @@ class NebulaUtilsSuite { dataSinkConfigEntry, sourceFields, wrongNebulaFields, + writeMode, "id", Some(KeyPolicy.UUID), null, diff --git a/exchange-common/src/test/scala/com/vesoft/exchange/common/writer/ServerBaseWriterSuite.scala b/exchange-common/src/test/scala/com/vesoft/exchange/common/writer/ServerBaseWriterSuite.scala index 4d50cee8..82d6ce27 100644 --- a/exchange-common/src/test/scala/com/vesoft/exchange/common/writer/ServerBaseWriterSuite.scala +++ b/exchange-common/src/test/scala/com/vesoft/exchange/common/writer/ServerBaseWriterSuite.scala @@ -33,6 +33,23 @@ class ServerBaseWriterSuite extends ServerBaseWriter { assert(sentence.equals(expectSentence)) } + @Test + def toDeleteExecuteSentenceSuiteForVertex(): Unit = { + val vertices: ListBuffer[Vertex] = new ListBuffer[Vertex] + val propNames = List("name", "age", "gender", "high", "weight") + + val props1 = List("\"Tom\"", 10, 0, 172.5, 55) + val props2 = List("\"Jena\"", 12, 1, 165.5, 45) + vertices.append(Vertex("\"vid1\"", props1)) + vertices.append(Vertex("\"vid2\"", props2)) + val nebulaVertices = Vertices(propNames, vertices.toList) + + val sentence = toDeleteExecuteSentence(nebulaVertices, false) + val expectSentence = + "DELETE VERTEX \"vid1\", \"vid2\"" + assert(sentence.equals(expectSentence)) + } + @Test def toExecuteSentenceSuiteForVertexWithSymbol(): Unit = { val vertices: ListBuffer[Vertex] = new ListBuffer[Vertex] @@ -71,6 +88,26 @@ class ServerBaseWriterSuite extends ServerBaseWriter { assert(sentence.equals(expectSentence)) } + @Test + def toDeleteExecuteSentenceSuiteForEdge(): Unit = { + val edges: ListBuffer[Edge] = new ListBuffer[Edge] + val edgeType = "friend" + val propNames = List("src_name", "dst_name", "time", "address", "relation") + + val props1 = List("\"Tom\"", "\"Jena\"", "2022-08-25", "hangzhou", "friend") + val props2 = List("\"Jena\"", "\"Bob\"", "2022-08-25", "shanghai", "friend") + edges.append(Edge("\"vid1\"", "\"vid2\"", Some(0L), props1)) + edges.append(Edge("\"vid2\"", "\"vid3\"", Some(1L), props2)) + val nebulaEdges = Edges(propNames, edges.toList) + val sentence = toDeleteExecuteSentence(edgeType, nebulaEdges) + val expectSentence = "DELETE EDGE `friend` " + + "\"vid1\"->\"vid2\"@0, " + + "\"vid2\"->\"vid3\"@1" + println(sentence) + println(expectSentence) + assert(sentence.equals(expectSentence)) + } + @Test def toExecuteSentenceSuiteForEdgeWithSymbol(): Unit = { val edges: ListBuffer[Edge] = new ListBuffer[Edge] @@ -89,6 +126,7 @@ class ServerBaseWriterSuite extends ServerBaseWriter { assert(sentence.equals(expectSentence)) } + override def writeVertices(vertices: Vertices, ignoreIndex: Boolean): String = ??? override def writeEdges(edges: common.Edges, ignoreIndex: Boolean): String = ??? diff --git a/nebula-exchange_spark_2.2/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala b/nebula-exchange_spark_2.2/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala index e78cb6f4..d468eda7 100644 --- a/nebula-exchange_spark_2.2/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala +++ b/nebula-exchange_spark_2.2/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala @@ -6,25 +6,15 @@ package com.vesoft.nebula.exchange.processor import java.io.File - import com.vesoft.exchange.common.VidType import com.vesoft.nebula.PropertyType import com.vesoft.exchange.common.KeyPolicy -import com.vesoft.exchange.common.config.{Configs, EdgeConfigEntry} +import com.vesoft.exchange.common.config.{Configs, EdgeConfigEntry, WriteMode} import com.vesoft.exchange.common.utils.NebulaUtils.DEFAULT_EMPTY_VALUE import com.vesoft.nebula.meta.{ColumnDef, ColumnTypeDef, EdgeItem, Schema, SchemaProp} import org.apache.commons.codec.binary.Hex import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema -import org.apache.spark.sql.types.{ - BooleanType, - DoubleType, - IntegerType, - LongType, - ShortType, - StringType, - StructField, - StructType -} +import org.apache.spark.sql.types.{BooleanType, DoubleType, IntegerType, LongType, ShortType, StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row} import org.junit.Test import org.scalatest.Assertions.assertThrows @@ -77,11 +67,13 @@ class EdgeProcessorSuite { StructField("dst", StringType, nullable = true))) val stringIdRow = new GenericRowWithSchema(stringIdValue.toArray, schema) val intIdRow = new GenericRowWithSchema(intIdValue.toArray, schema) + val writeMode = WriteMode.INSERT val edgeConfigEntry = EdgeConfigEntry("friend", null, null, fieldKeys, nebulaKeys, + writeMode, "src", None, null, @@ -115,6 +107,7 @@ class EdgeProcessorSuite { null, fieldKeys, nebulaKeys, + writeMode, "src", Some(KeyPolicy.HASH), null, @@ -150,11 +143,13 @@ class EdgeProcessorSuite { assert(edge.toString.equals( "Edge: \"1\"->\"2\"@0 values: \"\", \"fixedBob\", 12, 200, 1000, 100000, date(\"2021-01-01\"), datetime(\"2021-01-01T12:00:00.100\"), time(\"12:00:00.100\"), 345436232, true, 12.01, 22.12, ST_GeogFromText(\"POINT(3 8)\")")) + val writeMode = WriteMode.INSERT val edgeConfigEntryWithPrefix = EdgeConfigEntry("friend", null, null, fieldKeys, nebulaKeys, + writeMode, "src", None, "src", diff --git a/nebula-exchange_spark_2.2/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala b/nebula-exchange_spark_2.2/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala index 7fb90975..36526133 100644 --- a/nebula-exchange_spark_2.2/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala +++ b/nebula-exchange_spark_2.2/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala @@ -6,26 +6,16 @@ package com.vesoft.nebula.exchange.processor import java.io.File - import com.vesoft.exchange.common.VidType import com.vesoft.nebula.PropertyType import com.vesoft.exchange.common.KeyPolicy -import com.vesoft.exchange.common.config.{Configs, TagConfigEntry} +import com.vesoft.exchange.common.config.{Configs, TagConfigEntry, WriteMode} import com.vesoft.exchange.common.utils.NebulaUtils.DEFAULT_EMPTY_VALUE import com.vesoft.nebula.meta.{ColumnDef, ColumnTypeDef, Schema, SchemaProp, TagItem} import org.apache.commons.codec.binary.Hex import org.apache.log4j.Logger import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema -import org.apache.spark.sql.types.{ - BooleanType, - DoubleType, - IntegerType, - LongType, - ShortType, - StringType, - StructField, - StructType -} +import org.apache.spark.sql.types.{BooleanType, DoubleType, IntegerType, LongType, ShortType, StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row} import org.junit.Test import org.scalatest.Assertions.assertThrows @@ -78,8 +68,9 @@ class VerticesProcessorSuite { val schema: StructType = StructType(List(StructField("id", StringType, nullable = true))) val stringIdRow = new GenericRowWithSchema(stringIdValue.toArray, schema) val intIdRow = new GenericRowWithSchema(intIdValue.toArray, schema) + val writeMode = WriteMode.INSERT val tagConfigEntry = - TagConfigEntry("person", null, null, List(), List(), "id", None, null, 10, 10, None) + TagConfigEntry("person", null, null, List(), List(), writeMode, "id", None, null, 10, 10, None) // test for string id value without policy assert(processClazz.isVertexValid(stringIdRow, tagConfigEntry, false, true)) @@ -101,6 +92,7 @@ class VerticesProcessorSuite { null, List(), List(), + writeMode, "id", Some(KeyPolicy.HASH), null, @@ -125,6 +117,7 @@ class VerticesProcessorSuite { null, List(), List(), + writeMode, "id", Some(KeyPolicy.HASH), "prefix", @@ -144,9 +137,9 @@ class VerticesProcessorSuite { assert(vertex.vertexID.equals("\"1\"")) assert(vertex.toString.equals( "Vertex ID: \"1\", Values: \"\", \"fixedBob\", 12, 200, 1000, 100000, date(\"2021-01-01\"), datetime(\"2021-01-01T12:00:00.100\"), time(\"12:00:00.100\"), 345436232, true, 12.01, 22.12, ST_GeogFromText(\"POINT(3 8)\")")) - + val writeMode = WriteMode.INSERT val tagConfigEntryWithPrefix = - TagConfigEntry("person", null, null, List(), List(), "id", None, "prefix", 10, 10, None) + TagConfigEntry("person", null, null, List(), List(), writeMode, "id", None, "prefix", 10, 10, None) val vertexWithPrefix = processClazz.convertToVertex(row, tagConfigEntryWithPrefix, true, fieldKeys, map) assert(vertexWithPrefix.vertexID.equals("\"prefix_1\"")) diff --git a/nebula-exchange_spark_2.4/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala b/nebula-exchange_spark_2.4/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala index c2c79b70..509df544 100644 --- a/nebula-exchange_spark_2.4/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala +++ b/nebula-exchange_spark_2.4/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala @@ -6,27 +6,17 @@ package com.vesoft.exchange.common.processor import java.io.File - import com.vesoft.exchange.common.VidType import com.vesoft.nebula.PropertyType import com.vesoft.exchange.common.KeyPolicy -import com.vesoft.exchange.common.config.{Configs, EdgeConfigEntry} +import com.vesoft.exchange.common.config.{Configs, EdgeConfigEntry, WriteMode} import com.vesoft.exchange.common.utils.NebulaUtils.DEFAULT_EMPTY_VALUE import com.vesoft.nebula.exchange.processor.EdgeProcessor import com.vesoft.nebula.meta.{ColumnDef, ColumnTypeDef, EdgeItem, Schema, SchemaProp, TagItem} import org.apache.commons.codec.binary.Hex import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema -import org.apache.spark.sql.types.{ - BooleanType, - DoubleType, - IntegerType, - LongType, - ShortType, - StringType, - StructField, - StructType -} +import org.apache.spark.sql.types.{BooleanType, DoubleType, IntegerType, LongType, ShortType, StringType, StructField, StructType} import org.junit.Test import org.scalatest.Assertions.assertThrows @@ -78,11 +68,13 @@ class EdgeProcessorSuite { StructField("dst", StringType, nullable = true))) val stringIdRow = new GenericRowWithSchema(stringIdValue.toArray, schema) val intIdRow = new GenericRowWithSchema(intIdValue.toArray, schema) + val writeMode = WriteMode.INSERT val edgeConfigEntry = EdgeConfigEntry("friend", null, null, fieldKeys, nebulaKeys, + writeMode, "src", None, null, @@ -116,6 +108,7 @@ class EdgeProcessorSuite { null, fieldKeys, nebulaKeys, + writeMode, "src", Some(KeyPolicy.HASH), null, diff --git a/nebula-exchange_spark_2.4/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala b/nebula-exchange_spark_2.4/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala index 9a8dac6e..ad030f59 100644 --- a/nebula-exchange_spark_2.4/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala +++ b/nebula-exchange_spark_2.4/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala @@ -6,26 +6,16 @@ package com.vesoft.exchange.common.processor import java.io.File - import com.vesoft.exchange.common.VidType import com.vesoft.nebula.PropertyType import com.vesoft.exchange.common.KeyPolicy -import com.vesoft.exchange.common.config.{Configs, TagConfigEntry} +import com.vesoft.exchange.common.config.{Configs, TagConfigEntry, WriteMode} import com.vesoft.exchange.common.utils.NebulaUtils.DEFAULT_EMPTY_VALUE import com.vesoft.nebula.exchange.processor.VerticesProcessor import com.vesoft.nebula.meta.{ColumnDef, ColumnTypeDef, Schema, SchemaProp, TagItem} import org.apache.commons.codec.binary.Hex import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema -import org.apache.spark.sql.types.{ - BooleanType, - DoubleType, - IntegerType, - LongType, - ShortType, - StringType, - StructField, - StructType -} +import org.apache.spark.sql.types.{BooleanType, DoubleType, IntegerType, LongType, ShortType, StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.junit.Test import org.scalatest.Assertions.assertThrows @@ -76,8 +66,9 @@ class VerticesProcessorSuite { val schema: StructType = StructType(List(StructField("id", StringType, nullable = true))) val stringIdRow = new GenericRowWithSchema(stringIdValue.toArray, schema) val intIdRow = new GenericRowWithSchema(intIdValue.toArray, schema) + val writeMode = WriteMode.INSERT val tagConfigEntry = - TagConfigEntry("person", null, null, List(), List(), "id", None, null, 10, 10, None) + TagConfigEntry("person", null, null, List(), List(), writeMode, "id", None, null, 10, 10, None) // test for string id value without policy assert(processClazz.isVertexValid(stringIdRow, tagConfigEntry, false, true)) @@ -99,6 +90,7 @@ class VerticesProcessorSuite { null, List(), List(), + writeMode, "id", Some(KeyPolicy.HASH), null, diff --git a/nebula-exchange_spark_3.0/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala b/nebula-exchange_spark_3.0/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala index f4671529..e7cea2cb 100644 --- a/nebula-exchange_spark_3.0/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala +++ b/nebula-exchange_spark_3.0/src/test/scala/com/vesoft/nebula/exchange/processor/EdgeProcessorSuite.scala @@ -6,25 +6,15 @@ package com.vesoft.nebula.exchange.processor import java.io.File - import com.vesoft.exchange.common.VidType import com.vesoft.nebula.PropertyType import com.vesoft.exchange.common.KeyPolicy -import com.vesoft.exchange.common.config.{Configs, EdgeConfigEntry} +import com.vesoft.exchange.common.config.{Configs, EdgeConfigEntry, WriteMode} import com.vesoft.exchange.common.utils.NebulaUtils.DEFAULT_EMPTY_VALUE import com.vesoft.nebula.meta.{ColumnDef, ColumnTypeDef, EdgeItem, Schema, SchemaProp} import org.apache.commons.codec.binary.Hex import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema -import org.apache.spark.sql.types.{ - BooleanType, - DoubleType, - IntegerType, - LongType, - ShortType, - StringType, - StructField, - StructType -} +import org.apache.spark.sql.types.{BooleanType, DoubleType, IntegerType, LongType, ShortType, StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row} import org.junit.Test import org.scalatest.Assertions.assertThrows @@ -77,11 +67,13 @@ class EdgeProcessorSuite { StructField("dst", StringType, nullable = true))) val stringIdRow = new GenericRowWithSchema(stringIdValue.toArray, schema) val intIdRow = new GenericRowWithSchema(intIdValue.toArray, schema) + val writeMode = WriteMode.INSERT val edgeConfigEntry = EdgeConfigEntry("friend", null, null, fieldKeys, nebulaKeys, + writeMode, "src", None, null, @@ -115,6 +107,7 @@ class EdgeProcessorSuite { null, fieldKeys, nebulaKeys, + writeMode, "src", Some(KeyPolicy.HASH), null, diff --git a/nebula-exchange_spark_3.0/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala b/nebula-exchange_spark_3.0/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala index ed918654..205545bc 100644 --- a/nebula-exchange_spark_3.0/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala +++ b/nebula-exchange_spark_3.0/src/test/scala/com/vesoft/nebula/exchange/processor/VerticesProcessorSuite.scala @@ -6,25 +6,15 @@ package com.vesoft.nebula.exchange.processor import java.io.File - import com.vesoft.exchange.common.VidType import com.vesoft.nebula.PropertyType import com.vesoft.exchange.common.KeyPolicy -import com.vesoft.exchange.common.config.{Configs, TagConfigEntry} +import com.vesoft.exchange.common.config.{Configs, TagConfigEntry, WriteMode} import com.vesoft.exchange.common.utils.NebulaUtils.DEFAULT_EMPTY_VALUE import com.vesoft.nebula.meta.{ColumnDef, ColumnTypeDef, Schema, SchemaProp, TagItem} import org.apache.commons.codec.binary.Hex import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema -import org.apache.spark.sql.types.{ - BooleanType, - DoubleType, - IntegerType, - LongType, - ShortType, - StringType, - StructField, - StructType -} +import org.apache.spark.sql.types.{BooleanType, DoubleType, IntegerType, LongType, ShortType, StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row} import org.junit.Test import org.scalatest.Assertions.assertThrows @@ -75,8 +65,9 @@ class VerticesProcessorSuite { val schema: StructType = StructType(List(StructField("id", StringType, nullable = true))) val stringIdRow = new GenericRowWithSchema(stringIdValue.toArray, schema) val intIdRow = new GenericRowWithSchema(intIdValue.toArray, schema) + val writeMode = WriteMode.INSERT val tagConfigEntry = - TagConfigEntry("person", null, null, List(), List(), "id", None, null, 10, 10, None) + TagConfigEntry("person", null, null, List(), List(), writeMode, "id", None, null, 10, 10, None) // test for string id value without policy assert(processClazz.isVertexValid(stringIdRow, tagConfigEntry, false, true)) @@ -98,6 +89,7 @@ class VerticesProcessorSuite { null, List(), List(), + writeMode, "id", Some(KeyPolicy.HASH), null,