diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/mock/SparkMock.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/mock/SparkMock.scala index 43da472e..9962e712 100644 --- a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/mock/SparkMock.scala +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/mock/SparkMock.scala @@ -176,4 +176,44 @@ object SparkMock { spark.stop() } + /** + * write nebula vertex with delete_with_edge mode + */ + def deleteVertexWithEdge(): Unit = { + val sparkConf = new SparkConf + sparkConf + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol])) + val spark = SparkSession + .builder() + .master("local") + .config(sparkConf) + .getOrCreate() + + val df = spark.read + .option("header", true) + .csv("src/test/resources/vertex.csv") + + val config = + NebulaConnectionConfig + .builder() + .withMetaAddress("127.0.0.1:9559") + .withGraphAddress("127.0.0.1:9669") + .withConenctionRetry(2) + .build() + val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig + .builder() + .withSpace("test_write_string") + .withTag("person_connector") + .withVidField("id") + .withVidAsProp(false) + .withWriteMode(WriteMode.DELETE) + .withDeleteEdge(true) + .withBatch(5) + .build() + df.write.nebula(config, nebulaWriteVertexConfig).writeVertices() + + spark.stop() + } + } diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/NebulaExecutorSuite.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/NebulaExecutorSuite.scala index 58c725e7..b5a062ec 100644 --- a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/NebulaExecutorSuite.scala +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/NebulaExecutorSuite.scala @@ -335,9 +335,13 @@ class NebulaExecutorSuite extends AnyFunSuite with BeforeAndAfterAll { vertices.append(NebulaVertex("\"vid2\"", List())) val nebulaVertices = NebulaVertices(List(), vertices.toList, None) - val vertexStatement = NebulaExecutor.toDeleteExecuteStatement(nebulaVertices) + val vertexStatement = NebulaExecutor.toDeleteExecuteStatement(nebulaVertices, false) val expectVertexDeleteStatement = "DELETE VERTEX \"vid1\",\"vid2\"" assert(expectVertexDeleteStatement.equals(vertexStatement)) + + val vertexWithEdgeStatement = NebulaExecutor.toDeleteExecuteStatement(nebulaVertices, true) + val expectVertexWithEdgeDeleteStatement = "DELETE VERTEX \"vid1\",\"vid2\" WITH EDGE" + assert(expectVertexWithEdgeDeleteStatement.equals(vertexWithEdgeStatement)) } test("test toDeleteExecuteStatement for vertex with HASH policy") { @@ -346,9 +350,14 @@ class NebulaExecutorSuite extends AnyFunSuite with BeforeAndAfterAll { vertices.append(NebulaVertex("vid2", List())) val nebulaVertices = NebulaVertices(List(), vertices.toList, Some(KeyPolicy.HASH)) - val vertexStatement = NebulaExecutor.toDeleteExecuteStatement(nebulaVertices) + val vertexStatement = NebulaExecutor.toDeleteExecuteStatement(nebulaVertices, false) val expectVertexDeleteStatement = "DELETE VERTEX hash(\"vid1\"),hash(\"vid2\")" assert(expectVertexDeleteStatement.equals(vertexStatement)) + + val vertexWithEdgeStatement = NebulaExecutor.toDeleteExecuteStatement(nebulaVertices, true) + val expectVertexWithEdgeDeleteStatement = + "DELETE VERTEX hash(\"vid1\"),hash(\"vid2\") WITH EDGE" + assert(expectVertexWithEdgeDeleteStatement.equals(vertexWithEdgeStatement)) } test("test toDeleteExecuteStatement for edge") { diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/WriteDeleteSuite.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/WriteDeleteSuite.scala index e63fbf8c..2cb77486 100644 --- a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/WriteDeleteSuite.scala +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/WriteDeleteSuite.scala @@ -22,6 +22,7 @@ class WriteDeleteSuite extends AnyFunSuite with BeforeAndAfterAll { graphMock.mockIntIdGraphSchema() graphMock.close() SparkMock.writeVertex() + SparkMock.writeEdge() } test("write vertex into test_write_string space with delete mode") { @@ -36,6 +37,28 @@ class WriteDeleteSuite extends AnyFunSuite with BeforeAndAfterAll { assert(resultSet.isEmpty) } + test("write vertex into test_write_with_edge_string space with delete with edge mode") { + SparkMock.writeVertex() + SparkMock.writeEdge() + SparkMock.deleteVertexWithEdge() + val addresses: List[Address] = List(new Address("127.0.0.1", 9669)) + val graphProvider = new GraphProvider(addresses, 3000) + + graphProvider.switchSpace("root", "nebula", "test_write_string") + // assert vertex is deleted + val vertexResultSet: ResultSet = + graphProvider.submit("use test_write_string;match (v:person_connector) return v;") + assert(vertexResultSet.getColumnNames.size() == 0) + assert(vertexResultSet.isEmpty) + + // assert edge is deleted + val edgeResultSet: ResultSet = + graphProvider.submit("use test_write_string;fetch prop on friend_connector 1->2@10") + assert(edgeResultSet.getColumnNames.size() == 0) + assert(edgeResultSet.isEmpty) + + } + test("write edge into test_write_string space with delete mode") { SparkMock.deleteEdge() val addresses: List[Address] = List(new Address("127.0.0.1", 9669))