From 39c727d2e944c64fc8b95c9b5ca23685a48f04eb Mon Sep 17 00:00:00 2001 From: Nicole00 Date: Thu, 16 Sep 2021 15:58:20 +0800 Subject: [PATCH 1/6] update flink connector version --- .../nebula/sink/NebulaBufferedRow.java | 39 --------------- .../nebula/sink/NebulaEdgeBatchExecutor.java | 8 ++++ .../sink/NebulaOutputFormatConverter.java | 22 --------- .../sink/NebulaVertexBatchExecutor.java | 8 ++++ .../connector/nebula/utils/NebulaEdge.java | 8 ++++ .../connector/nebula/utils/NebulaEdges.java | 8 ++++ .../connector/nebula/utils/NebulaVertex.java | 33 +++++++++++++ .../nebula/utils/NebulaVertices.java | 8 ++++ .../connector/nebula/utils/WriteModeEnum.java | 8 ++++ connector/src/main/resources/log4j.properties | 6 +++ .../flink/connector/nebula/MockData.java | 8 ++++ .../sink/AbstractNebulaOutPutFormatTest.java | 0 .../sink/NebulaEdgeBatchExecutorTest.java | 4 ++ .../sink/NebulaOutputFormatConverterTest.java | 47 +++++++++++++------ ...ebulaRowEdgeOutputFormatConverterTest.java | 4 ++ ...ulaRowVertexOutputFormatConverterTest.java | 4 ++ .../sink/NebulaVertexBatchExecutorTest.java | 4 ++ .../nebula/utils/NebulaEdgesTest.java | 8 ++++ .../nebula/utils/NebulaVerticesTest.java | 8 ++++ connector/src/test/resources/log4j.properties | 6 +++ example/pom.xml | 2 +- 21 files changed, 167 insertions(+), 76 deletions(-) delete mode 100644 connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBufferedRow.java create mode 100644 connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java delete mode 100644 connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaOutputFormatConverter.java create mode 100644 connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java create mode 100644 connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaEdge.java create mode 100644 connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaEdges.java create mode 100644 connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaVertex.java create mode 100644 connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaVertices.java create mode 100644 connector/src/main/java/org.apache.flink/connector/nebula/utils/WriteModeEnum.java create mode 100644 connector/src/main/resources/log4j.properties create mode 100644 connector/src/test/java/org/apache/flink/connector/nebula/MockData.java rename connector/src/test/java/org/apache/flink/{ => connector/nebula}/sink/AbstractNebulaOutPutFormatTest.java (100%) create mode 100644 connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaEdgeBatchExecutorTest.java rename connector/src/test/java/org/apache/flink/{ => connector/nebula}/sink/NebulaOutputFormatConverterTest.java (75%) create mode 100644 connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaRowEdgeOutputFormatConverterTest.java create mode 100644 connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaRowVertexOutputFormatConverterTest.java create mode 100644 connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaVertexBatchExecutorTest.java create mode 100644 connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaEdgesTest.java create mode 100644 connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaVerticesTest.java create mode 100644 connector/src/test/resources/log4j.properties diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBufferedRow.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBufferedRow.java deleted file mode 100644 index 97ac159..0000000 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBufferedRow.java +++ /dev/null @@ -1,39 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License, - * attached with Common Clause Condition 1.0, found in the LICENSES directory. - */ - -package org.apache.flink.connector.nebula.sink; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - -/** - * buffer for batch write - */ -public class NebulaBufferedRow implements Serializable { - - private static final long serialVersionUID = -1364277720478588644L; - - private final List rows = new ArrayList<>(); - - public void putRow(String row) { - rows.add(row); - } - - - public List getRows() { - return rows; - } - - public void clean() { - rows.clear(); - } - - public int bufferSize() { - return rows.size(); - } - -} diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java new file mode 100644 index 0000000..2d83385 --- /dev/null +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java @@ -0,0 +1,8 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package org.apache.flink.connector.nebula.sink;public class NebulaEdgeBatchExecutor { +} diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaOutputFormatConverter.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaOutputFormatConverter.java deleted file mode 100644 index 8d8f89d..0000000 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaOutputFormatConverter.java +++ /dev/null @@ -1,22 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License, - * attached with Common Clause Condition 1.0, found in the LICENSES directory. - */ - -package org.apache.flink.connector.nebula.sink; - -import java.io.Serializable; -import org.apache.flink.connector.nebula.utils.PolicyEnum; - -public interface NebulaOutputFormatConverter extends Serializable { - - /** - * convert row to nebula's insert values - * - * @param record flink record - * @param policy see {@link PolicyEnum} - * @return String - */ - String createValue(T record, PolicyEnum policy); -} diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java new file mode 100644 index 0000000..baa4d18 --- /dev/null +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java @@ -0,0 +1,8 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package org.apache.flink.connector.nebula.sink;public class NebulaVertexBatchExecutor { +} diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaEdge.java b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaEdge.java new file mode 100644 index 0000000..a0ac859 --- /dev/null +++ b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaEdge.java @@ -0,0 +1,8 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package org.apache.flink.connector.nebula.utils;public class NebulaEdge { +} diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaEdges.java b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaEdges.java new file mode 100644 index 0000000..ecd2d79 --- /dev/null +++ b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaEdges.java @@ -0,0 +1,8 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package org.apache.flink.connector.nebula.utils;public class NebulaEdges { +} diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaVertex.java b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaVertex.java new file mode 100644 index 0000000..38e6a4b --- /dev/null +++ b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaVertex.java @@ -0,0 +1,33 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package org.apache.flink.connector.nebula.utils; + +import java.io.Serializable; +import java.util.List; +import java.util.stream.Collectors; + +public class NebulaVertices implements Serializable { + + private String vid; + private List propValues; + + public String getVid() { + return vid; + } + + public void setVid(String vid) { + this.vid = vid; + } + + public String getPropValues() { + return propValues.stream().collect(Collectors.joining(",")); + } + + public void setPropValues(List propValues) { + this.propValues = propValues; + } +} diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaVertices.java b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaVertices.java new file mode 100644 index 0000000..86940f8 --- /dev/null +++ b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaVertices.java @@ -0,0 +1,8 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package org.apache.flink.connector.nebula.utils;public class NebulaVertices { +} diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/utils/WriteModeEnum.java b/connector/src/main/java/org.apache.flink/connector/nebula/utils/WriteModeEnum.java new file mode 100644 index 0000000..211eb4b --- /dev/null +++ b/connector/src/main/java/org.apache.flink/connector/nebula/utils/WriteModeEnum.java @@ -0,0 +1,8 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package org.apache.flink.connector.nebula.utils;public class WriteMode { +} diff --git a/connector/src/main/resources/log4j.properties b/connector/src/main/resources/log4j.properties new file mode 100644 index 0000000..913391d --- /dev/null +++ b/connector/src/main/resources/log4j.properties @@ -0,0 +1,6 @@ +# Global logging configuration +log4j.rootLogger=INFO, stdout +# Console output... +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/MockData.java b/connector/src/test/java/org/apache/flink/connector/nebula/MockData.java new file mode 100644 index 0000000..f0ae8ad --- /dev/null +++ b/connector/src/test/java/org/apache/flink/connector/nebula/MockData.java @@ -0,0 +1,8 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package org.apache.flink.connector.nebula;public class MockData { +} diff --git a/connector/src/test/java/org/apache/flink/sink/AbstractNebulaOutPutFormatTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/sink/AbstractNebulaOutPutFormatTest.java similarity index 100% rename from connector/src/test/java/org/apache/flink/sink/AbstractNebulaOutPutFormatTest.java rename to connector/src/test/java/org/apache/flink/connector/nebula/sink/AbstractNebulaOutPutFormatTest.java diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaEdgeBatchExecutorTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaEdgeBatchExecutorTest.java new file mode 100644 index 0000000..e5dc885 --- /dev/null +++ b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaEdgeBatchExecutorTest.java @@ -0,0 +1,4 @@ +import junit.framework.TestCase; +public class NebulaEdgeBatchExecutorTest extends TestCase { + +} diff --git a/connector/src/test/java/org/apache/flink/sink/NebulaOutputFormatConverterTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaOutputFormatConverterTest.java similarity index 75% rename from connector/src/test/java/org/apache/flink/sink/NebulaOutputFormatConverterTest.java rename to connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaOutputFormatConverterTest.java index 83fd069..25eb28d 100644 --- a/connector/src/test/java/org/apache/flink/sink/NebulaOutputFormatConverterTest.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaOutputFormatConverterTest.java @@ -15,6 +15,8 @@ import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions; import org.apache.flink.connector.nebula.statement.ExecutionOptions; import org.apache.flink.connector.nebula.statement.VertexExecutionOptions; +import org.apache.flink.connector.nebula.utils.NebulaEdge; +import org.apache.flink.connector.nebula.utils.NebulaVertex; import org.apache.flink.connector.nebula.utils.PolicyEnum; import org.apache.flink.connector.nebula.utils.VidTypeEnum; import org.apache.flink.types.Row; @@ -64,8 +66,10 @@ public void testCreateVertexValue() { VidTypeEnum.STRING, schema); - String value = helper.createValue(row, null); - assert "\"2\": (\"Tom\",11)".equals(value); + NebulaVertex vertex = helper.createVertex(row, null); + assert(vertex.getVid().equals("\"2\"")); + assert(vertex.getPropValues().size() == 2); + assert(vertex.getPropValuesString().equals("\"Tom\",11")); } @Test @@ -82,9 +86,10 @@ public void testVertexDateValue() { VidTypeEnum.STRING, schema); - String value = helper.createValue(row, null); - assert (("\"2\": (\"Tom\",date(\"2020-01-01\"),datetime(\"2020-01-01 12:12:12:0000\")," - + "time(\"12:12:12:0000\"),11)").equals(value)); + NebulaVertex vertex = helper.createVertex(row, null); + assert (vertex.getVid().equals("\"2\"")); + assert (vertex.getPropValuesString().equals("\"Tom\",date(\"2020-01-01\"),datetime" + + "(\"2020-01-01 12:12:12:0000\"),time(\"12:12:12:0000\"),11")); } @Test @@ -101,8 +106,10 @@ public void testIntVidVertex() { VidTypeEnum.INT, schema); - String value = helper.createValue(row, PolicyEnum.HASH); - assert ("HASH(\"Tom\"): (\"Tom\",11)".equals(value)); + NebulaVertex vertex = helper.createVertex(row, PolicyEnum.HASH); + assert (vertex.getVid().equals("Tom")); + assert (vertex.getPropValues().size() == 2); + assert (vertex.getPropValuesString().equals("\"Tom\",11")); } @@ -122,8 +129,12 @@ public void testCreateEdgeValue() { new NebulaRowEdgeOutputFormatConverter((EdgeExecutionOptions) rowInfoConfig, VidTypeEnum.STRING, schema); - String value = helper.createValue(row, null); - assert ("\"Tom\"->\"Jena\"@2: (\"Tom\",\"Jena\",12.0)".equals(value)); + NebulaEdge edge = helper.createEdge(row, null); + assert (edge.getSource().equals("\"Tom\"")); + assert (edge.getTarget().equals("\"Jena\"")); + assert (edge.getRank() == 2); + assert (edge.getPropValues().size() == 3); + assert (edge.getPropValuesString().equals("\"Tom\",\"Jena\",12.0")); } @@ -143,9 +154,13 @@ public void testEdgeDateValue() { VidTypeEnum.STRING, schema); - String value = helper.createValue(row, null); - assert (("\"Tom\"->\"Jena\": (12.0,date(\"2020-01-01\"),datetime(\"2020-01-01 " - + "12:12:12:0000\"),time(\"12:12:12:0000\"))").equals(value)); + NebulaEdge edge = helper.createEdge(row, null); + assert (edge.getSource().equals("\"Tom\"")); + assert (edge.getTarget().equals("\"Jena\"")); + assert (edge.getRank() == null); + assert (edge.getPropValues().size() == 4); + assert (edge.getPropValuesString().equals("12.0,date(\"2020-01-01\"),datetime" + + "(\"2020-01-01 12:12:12:0000\"),time(\"12:12:12:0000\")")); } @Test @@ -164,7 +179,11 @@ public void testIntVidEdge() { VidTypeEnum.INT, schema); - String value = helper.createValue(row, PolicyEnum.HASH); - assert ("HASH(\"Tom\")->HASH(\"Jena\"): (12.0)".equals(value)); + NebulaEdge edge = helper.createEdge(row, PolicyEnum.HASH); + assert(edge.getSource().equals("HASH(\"Tom\")")); + assert(edge.getTarget().equals("HASH(\"Jena\")")); + assert(edge.getRank() == null); + assert(edge.getPropValues().size() == 1); + assert(edge.getPropValuesString().equals("12.0")); } } diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaRowEdgeOutputFormatConverterTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaRowEdgeOutputFormatConverterTest.java new file mode 100644 index 0000000..d115fe3 --- /dev/null +++ b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaRowEdgeOutputFormatConverterTest.java @@ -0,0 +1,4 @@ +import junit.framework.TestCase; +public class NebulaRowEdgeOutputFormatConverterTest extends TestCase { + +} diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaRowVertexOutputFormatConverterTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaRowVertexOutputFormatConverterTest.java new file mode 100644 index 0000000..405a62c --- /dev/null +++ b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaRowVertexOutputFormatConverterTest.java @@ -0,0 +1,4 @@ +import junit.framework.TestCase; +public class NebulaRowVertexOutputFormatConverterTest extends TestCase { + +} diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaVertexBatchExecutorTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaVertexBatchExecutorTest.java new file mode 100644 index 0000000..cd8e9eb --- /dev/null +++ b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaVertexBatchExecutorTest.java @@ -0,0 +1,4 @@ +import junit.framework.TestCase; +public class NebulaVertexBatchExecutorTest extends TestCase { + +} diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaEdgesTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaEdgesTest.java new file mode 100644 index 0000000..ef58817 --- /dev/null +++ b/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaEdgesTest.java @@ -0,0 +1,8 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package org.apache.flink.connector.nebula.utils;public class NebulaEdgesTest { +} diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaVerticesTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaVerticesTest.java new file mode 100644 index 0000000..2d745eb --- /dev/null +++ b/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaVerticesTest.java @@ -0,0 +1,8 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package org.apache.flink.connector.nebula.utils;public class NebulaVerticesTest { +} diff --git a/connector/src/test/resources/log4j.properties b/connector/src/test/resources/log4j.properties new file mode 100644 index 0000000..913391d --- /dev/null +++ b/connector/src/test/resources/log4j.properties @@ -0,0 +1,6 @@ +# Global logging configuration +log4j.rootLogger=INFO, stdout +# Console output... +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n diff --git a/example/pom.xml b/example/pom.xml index 1c89424..dac169a 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -16,7 +16,7 @@ com.vesoft nebula-flink-connector - 2.0.0 + 2.0-SNAPSHOT From 1bc986df99aca3f884b96e56591611856ee3efb4 Mon Sep 17 00:00:00 2001 From: Nicole00 Date: Thu, 16 Sep 2021 15:58:54 +0800 Subject: [PATCH 2/6] support UPDATE and DELETE write mode --- .../nebula/sink/NebulaBatchExecutor.java | 57 +----- .../nebula/sink/NebulaBatchOutputFormat.java | 37 ++-- .../nebula/sink/NebulaEdgeBatchExecutor.java | 88 +++++++- .../NebulaRowEdgeOutputFormatConverter.java | 33 +-- .../NebulaRowVertexOutputFormatConverter.java | 17 +- .../sink/NebulaVertexBatchExecutor.java | 90 ++++++++- .../statement/EdgeExecutionOptions.java | 13 +- .../nebula/statement/ExecutionOptions.java | 18 +- .../statement/VertexExecutionOptions.java | 13 +- .../nebula/utils/NebulaConstant.java | 12 +- .../connector/nebula/utils/NebulaEdge.java | 52 ++++- .../connector/nebula/utils/NebulaEdges.java | 188 +++++++++++++++++- .../connector/nebula/utils/NebulaVertex.java | 26 ++- .../nebula/utils/NebulaVertices.java | 138 ++++++++++++- .../connector/nebula/utils/WriteModeEnum.java | 28 ++- 15 files changed, 688 insertions(+), 122 deletions(-) diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchExecutor.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchExecutor.java index 886a8ae..1e0288f 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchExecutor.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchExecutor.java @@ -17,20 +17,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class NebulaBatchExecutor { - private static final Logger LOG = LoggerFactory.getLogger(NebulaBatchExecutor.class); +public abstract class NebulaBatchExecutor { - private final ExecutionOptions executionOptions; - private final NebulaBufferedRow nebulaBufferedRow; - private final boolean isVertex; - private final Map schema; - private final VidTypeEnum vidType; + protected final ExecutionOptions executionOptions; + protected final Map schema; + protected final VidTypeEnum vidType; - public NebulaBatchExecutor(ExecutionOptions executionOptions, boolean isVertex, + public NebulaBatchExecutor(ExecutionOptions executionOptions, VidTypeEnum vidType, Map schema) { this.executionOptions = executionOptions; - this.nebulaBufferedRow = new NebulaBufferedRow(); - this.isVertex = isVertex; this.vidType = vidType; this.schema = schema; } @@ -40,50 +35,12 @@ public NebulaBatchExecutor(ExecutionOptions executionOptions, boolean isVertex, * * @param record represent vertex or edge */ - void addToBatch(T record) { - NebulaOutputFormatConverter converter; - if (isVertex) { - converter = new NebulaRowVertexOutputFormatConverter( - (VertexExecutionOptions) executionOptions, vidType, schema); - } else { - converter = new NebulaRowEdgeOutputFormatConverter( - (EdgeExecutionOptions) executionOptions, vidType, schema); - } - String value = converter.createValue(record, executionOptions.getPolicy()); - if (value == null) { - return; - } - nebulaBufferedRow.putRow(value); - } + abstract void addToBatch(T record); /** * execute the insert statement * * @param session graph session */ - String executeBatch(Session session) { - String propNames = String.join(NebulaConstant.COMMA, executionOptions.getFields()); - String values = String.join(NebulaConstant.COMMA, nebulaBufferedRow.getRows()); - String exec = String.format(NebulaConstant.BATCH_INSERT_TEMPLATE, - executionOptions.getDataType(), executionOptions.getLabel(), propNames, values); - LOG.info("insert statement={}", exec); - ResultSet execResult = null; - try { - execResult = session.execute(exec); - } catch (Exception e) { - LOG.error("insert error:", e); - nebulaBufferedRow.clean(); - return exec; - } - - if (execResult.isSucceeded()) { - LOG.debug("insert success"); - } else { - LOG.error("insert failed: {}", execResult.getErrorMessage()); - nebulaBufferedRow.clean(); - return exec; - } - nebulaBufferedRow.clean(); - return null; - } + abstract String executeBatch(Session session); } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchOutputFormat.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchOutputFormat.java index 4dc5434..0aa720d 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchOutputFormat.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchOutputFormat.java @@ -62,6 +62,18 @@ public void open(int i, int i1) throws IOException { LOG.error("failed to get graph session, ", e); throw new IOException("get graph session error, ", e); } + ResultSet resultSet; + try { + resultSet = session.execute("USE " + executionOptions.getGraphSpace()); + } catch (IOErrorException e) { + LOG.error("switch space error, ", e); + throw new IOException("switch space error,", e); + } + if (!resultSet.isSucceeded()) { + LOG.error("switch space failed, {}", resultSet.getErrorMessage()); + throw new RuntimeException("switch space failed, " + resultSet.getErrorMessage()); + } + try { metaClient = metaProvider.getMetaClient(); } catch (TException e) { @@ -77,11 +89,12 @@ public void open(int i, int i1) throws IOException { if (isVertex) { schema = metaProvider.getTagSchema(metaClient, executionOptions.getGraphSpace(), executionOptions.getLabel()); + nebulaBatchExecutor = new NebulaVertexBatchExecutor(executionOptions, vidType, schema); } else { schema = metaProvider.getEdgeSchema(metaClient, executionOptions.getGraphSpace(), executionOptions.getLabel()); + nebulaBatchExecutor = new NebulaEdgeBatchExecutor(executionOptions, vidType, schema); } - nebulaBatchExecutor = new NebulaBatchExecutor(executionOptions, isVertex, vidType, schema); } /** @@ -100,24 +113,12 @@ public final synchronized void writeRecord(T row) throws IOException { * commit batch insert statements */ private synchronized void commit() throws IOException { - ResultSet resultSet; - try { - resultSet = session.execute("USE " + executionOptions.getGraphSpace()); - } catch (IOErrorException e) { - LOG.error("switch space error, ", e); - throw new IOException("switch space error,", e); - } - - if (resultSet.isSucceeded()) { - String errorExec = nebulaBatchExecutor.executeBatch(session); - if (errorExec != null) { - errorBuffer.add(errorExec); - } - long pendingRow = numPendingRow.get(); - numPendingRow.compareAndSet(pendingRow, 0); - } else { - LOG.error("switch space failed, ", resultSet.getErrorMessage()); + String errorExec = nebulaBatchExecutor.executeBatch(session); + if (errorExec != null) { + errorBuffer.add(errorExec); } + long pendingRow = numPendingRow.get(); + numPendingRow.compareAndSet(pendingRow, 0); } /** diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java index 2d83385..ef25f65 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java @@ -1,8 +1,90 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. +/* Copyright (c) 2021 vesoft inc. All rights reserved. * * This source code is licensed under Apache 2.0 License, * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ - -package org.apache.flink.connector.nebula.sink;public class NebulaEdgeBatchExecutor { + +package org.apache.flink.connector.nebula.sink; + +import com.vesoft.nebula.client.graph.data.ResultSet; +import com.vesoft.nebula.client.graph.net.Session; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions; +import org.apache.flink.connector.nebula.statement.ExecutionOptions; +import org.apache.flink.connector.nebula.utils.NebulaEdge; +import org.apache.flink.connector.nebula.utils.NebulaEdges; +import org.apache.flink.connector.nebula.utils.VidTypeEnum; +import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NebulaEdgeBatchExecutor extends NebulaBatchExecutor { + private static final Logger LOG = LoggerFactory.getLogger(NebulaEdgeBatchExecutor.class); + private final List nebulaEdgeList; + + public NebulaEdgeBatchExecutor(ExecutionOptions executionOptions, + VidTypeEnum vidType, Map schema) { + super(executionOptions, vidType, schema); + nebulaEdgeList = new ArrayList<>(); + } + + /** + * put record into buffer + */ + @Override + void addToBatch(T record) { + NebulaRowEdgeOutputFormatConverter converter = + new NebulaRowEdgeOutputFormatConverter((EdgeExecutionOptions) executionOptions, + vidType, schema); + NebulaEdge edge = converter.createEdge((Row) record, executionOptions.getPolicy()); + if (edge == null) { + return; + } + nebulaEdgeList.add(edge); + } + + @Override + String executeBatch(Session session) { + NebulaEdges nebulaEdges = new NebulaEdges(executionOptions.getLabel(), + executionOptions.getFields(), nebulaEdgeList, executionOptions.getPolicy(), + executionOptions.getPolicy()); + // generate the write ngql statement + String statement = null; + switch (executionOptions.getWriteMode()) { + case INSERT: + statement = nebulaEdges.getInsertStatement(); + break; + case UPDATE: + statement = nebulaEdges.getUpdateStatement(); + break; + case DELETE: + statement = nebulaEdges.getDeleteStatement(); + break; + default: + throw new IllegalArgumentException("write mode {} is not supported"); + } + LOG.debug("write statement={}", statement); + + // execute ngql statement + ResultSet execResult = null; + try { + execResult = session.execute(statement); + } catch (Exception e) { + LOG.error("write data error, ", e); + nebulaEdgeList.clear(); + return statement; + } + + if (execResult.isSucceeded()) { + LOG.debug("write success"); + } else { + LOG.error("write data failed: {}", execResult.getErrorMessage()); + nebulaEdgeList.clear(); + return statement; + } + nebulaEdgeList.clear(); + return null; + } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaRowEdgeOutputFormatConverter.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaRowEdgeOutputFormatConverter.java index 3a4e92f..df07820 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaRowEdgeOutputFormatConverter.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaRowEdgeOutputFormatConverter.java @@ -7,18 +7,20 @@ package org.apache.flink.connector.nebula.sink; import com.esotericsoftware.minlog.Log; +import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions; import org.apache.flink.connector.nebula.utils.NebulaConstant; +import org.apache.flink.connector.nebula.utils.NebulaEdge; import org.apache.flink.connector.nebula.utils.NebulaUtils; import org.apache.flink.connector.nebula.utils.PolicyEnum; import org.apache.flink.connector.nebula.utils.VidTypeEnum; import org.apache.flink.types.Row; -public class NebulaRowEdgeOutputFormatConverter implements NebulaOutputFormatConverter { +public class NebulaRowEdgeOutputFormatConverter implements Serializable { private final int srcIdIndex; private final int dstIdIndex; @@ -44,18 +46,19 @@ public NebulaRowEdgeOutputFormatConverter(EdgeExecutionOptions executionOptions, } } - @Override - public String createValue(Row row, PolicyEnum policy) { + + public NebulaEdge createEdge(Row row, PolicyEnum policy) { + // check row data if (row == null || row.getArity() == 0) { Log.error("empty row"); return null; } - Object srcId = row.getField(srcIdIndex); Object dstId = row.getField(dstIdIndex); if (srcId == null || dstId == null) { return null; } + // extract edge properties List edgeProps = new ArrayList<>(); for (int i : positions) { String propName = pos2Field.get(i); @@ -63,6 +66,7 @@ public String createValue(Row row, PolicyEnum policy) { edgeProps.add(NebulaUtils.extraValue(row.getField(i), type)); } + // format edge source id and target id String srcFormatId = srcId.toString(); String dstFormatId = dstId.toString(); @@ -76,20 +80,19 @@ public String createValue(Row row, PolicyEnum policy) { } } else { assert (vidType == VidTypeEnum.INT); - srcFormatId = String.format(NebulaConstant.ENDPOINT_TEMPLATE, policy.policy(), - srcId.toString()); - dstFormatId = String.format(NebulaConstant.ENDPOINT_TEMPLATE, policy.policy(), - dstId.toString()); } + // extract edge rank + Long rank = null; if (rankIndex >= 0) { - assert row.getField(rankIndex) != null; - Long rank = Long.parseLong(row.getField(rankIndex).toString()); - return String.format(NebulaConstant.EDGE_VALUE_TEMPLATE, srcFormatId, dstFormatId, - rank, String.join(NebulaConstant.COMMA, edgeProps)); - } else { - return String.format(NebulaConstant.EDGE_VALUE_WITHOUT_RANKING_TEMPLATE, srcFormatId, - dstFormatId, String.join(NebulaConstant.COMMA, edgeProps)); + if (row.getField(rankIndex) == null) { + rank = 0L; + } else { + rank = Long.parseLong(row.getField(rankIndex).toString()); + } } + + NebulaEdge edge = new NebulaEdge(srcFormatId, dstFormatId, rank, edgeProps); + return edge; } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaRowVertexOutputFormatConverter.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaRowVertexOutputFormatConverter.java index 301d752..1c2c019 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaRowVertexOutputFormatConverter.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaRowVertexOutputFormatConverter.java @@ -7,6 +7,7 @@ package org.apache.flink.connector.nebula.sink; import com.esotericsoftware.minlog.Log; +import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -14,11 +15,12 @@ import org.apache.flink.connector.nebula.statement.VertexExecutionOptions; import org.apache.flink.connector.nebula.utils.NebulaConstant; import org.apache.flink.connector.nebula.utils.NebulaUtils; +import org.apache.flink.connector.nebula.utils.NebulaVertex; import org.apache.flink.connector.nebula.utils.PolicyEnum; import org.apache.flink.connector.nebula.utils.VidTypeEnum; import org.apache.flink.types.Row; -public class NebulaRowVertexOutputFormatConverter implements NebulaOutputFormatConverter { +public class NebulaRowVertexOutputFormatConverter implements Serializable { private static final long serialVersionUID = -7728344698410737677L; @@ -44,8 +46,8 @@ public NebulaRowVertexOutputFormatConverter(VertexExecutionOptions executionOpti } - @Override - public String createValue(Row row, PolicyEnum policy) { + public NebulaVertex createVertex(Row row, PolicyEnum policy) { + // check row data if (row == null || row.getArity() == 0) { Log.error("empty row"); return null; @@ -55,6 +57,7 @@ public String createValue(Row row, PolicyEnum policy) { Log.error("wrong id, your id is null "); return null; } + // extract vertex properties List vertexProps = new ArrayList<>(); for (int i : positions) { String propName = pos2Field.get(i); @@ -66,8 +69,8 @@ public String createValue(Row row, PolicyEnum policy) { vertexProps.add(NebulaUtils.extraValue(row.getField(i), type)); } + // format vertex id String formatId = String.valueOf(id); - if (policy == null) { if (vidType == VidTypeEnum.STRING) { formatId = NebulaUtils.mkString(NebulaUtils.escapeUtil(String.valueOf(formatId)), @@ -75,12 +78,10 @@ public String createValue(Row row, PolicyEnum policy) { } else { assert (NebulaUtils.isNumeric(formatId)); } - return String.format(NebulaConstant.VERTEX_VALUE_TEMPLATE, formatId, - String.join(",", vertexProps)); } else { assert (vidType == VidTypeEnum.INT); - return String.format(NebulaConstant.VERTEX_VALUE_TEMPLATE_WITH_POLICY, - policy.policy(), formatId, String.join(",", vertexProps)); } + NebulaVertex vertex = new NebulaVertex(formatId, vertexProps); + return vertex; } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java index baa4d18..5e08e9a 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java @@ -1,8 +1,92 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. +/* Copyright (c) 2021 vesoft inc. All rights reserved. * * This source code is licensed under Apache 2.0 License, * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ - -package org.apache.flink.connector.nebula.sink;public class NebulaVertexBatchExecutor { + +package org.apache.flink.connector.nebula.sink; + +import com.vesoft.nebula.client.graph.data.ResultSet; +import com.vesoft.nebula.client.graph.net.Session; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.flink.connector.nebula.statement.ExecutionOptions; +import org.apache.flink.connector.nebula.statement.VertexExecutionOptions; +import org.apache.flink.connector.nebula.utils.NebulaVertex; +import org.apache.flink.connector.nebula.utils.NebulaVertices; +import org.apache.flink.connector.nebula.utils.VidTypeEnum; +import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NebulaVertexBatchExecutor extends NebulaBatchExecutor { + private static final Logger LOG = LoggerFactory.getLogger(NebulaVertexBatchExecutor.class); + + private final List nebulaVertexList; + + public NebulaVertexBatchExecutor(ExecutionOptions executionOptions, + VidTypeEnum vidType, Map schema) { + super(executionOptions, vidType, schema); + nebulaVertexList = new ArrayList<>(); + } + + /** + * put record into buffer + * + * @param record represent vertex or edge + */ + @Override + void addToBatch(T record) { + NebulaRowVertexOutputFormatConverter converter = new NebulaRowVertexOutputFormatConverter( + (VertexExecutionOptions) executionOptions, vidType, schema); + NebulaVertex vertex = converter.createVertex((Row) record, executionOptions.getPolicy()); + if (vertex == null) { + return; + } + nebulaVertexList.add(vertex); + } + + @Override + String executeBatch(Session session) { + NebulaVertices nebulaVertices = new NebulaVertices(executionOptions.getLabel(), + executionOptions.getFields(), nebulaVertexList, executionOptions.getPolicy()); + // generate the write ngql statement + String statement = null; + switch (executionOptions.getWriteMode()) { + case INSERT: + statement = nebulaVertices.getInsertStatement(); + break; + case UPDATE: + statement = nebulaVertices.getUpdateStatement(); + break; + case DELETE: + statement = nebulaVertices.getDeleteStatement(); + break; + default: + throw new IllegalArgumentException("write mode is not supported"); + } + LOG.debug("write statement={}", statement); + + // execute ngql statement + ResultSet execResult = null; + try { + execResult = session.execute(statement); + } catch (Exception e) { + LOG.error("write data error, ", e); + nebulaVertexList.clear(); + return statement; + } + + if (execResult.isSucceeded()) { + LOG.debug("write success"); + } else { + LOG.error("write data failed: {}", execResult.getErrorMessage()); + nebulaVertexList.clear(); + return statement; + } + nebulaVertexList.clear(); + return null; + } + } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/statement/EdgeExecutionOptions.java b/connector/src/main/java/org.apache.flink/connector/nebula/statement/EdgeExecutionOptions.java index dd7d753..8581ad6 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/statement/EdgeExecutionOptions.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/statement/EdgeExecutionOptions.java @@ -13,6 +13,7 @@ import java.util.List; import org.apache.flink.connector.nebula.utils.DataTypeEnum; import org.apache.flink.connector.nebula.utils.PolicyEnum; +import org.apache.flink.connector.nebula.utils.WriteModeEnum; public class EdgeExecutionOptions extends ExecutionOptions { @@ -40,9 +41,10 @@ public class EdgeExecutionOptions extends ExecutionOptions { private EdgeExecutionOptions(String graphSpace, String executeStatement, List fields, List positions, boolean noColumn, int limit, long startTime, long endTime, long batch, PolicyEnum policy, + WriteModeEnum mode, String edge, int srcIndex, int dstIndex, int rankIndex) { super(graphSpace, executeStatement, fields, positions, noColumn, limit, startTime, - endTime, batch, policy); + endTime, batch, policy, mode); this.edge = edge; this.srcIndex = srcIndex; this.dstIndex = dstIndex; @@ -87,6 +89,7 @@ public static class ExecutionOptionBuilder { private long endTime = Long.MAX_VALUE; private int batch = DEFAULT_WRITE_BATCH; private PolicyEnum policy = null; + private WriteModeEnum mode = WriteModeEnum.INSERT; private int srcIndex = DEFAULT_ROW_INFO_INDEX; private int dstIndex = DEFAULT_ROW_INFO_INDEX; private int rankIndex = DEFAULT_ROW_INFO_INDEX; @@ -163,6 +166,11 @@ public ExecutionOptionBuilder setRankIndex(int rankIndex) { return this; } + public ExecutionOptionBuilder setWriteMode(WriteModeEnum mode) { + this.mode = mode; + return this; + } + public ExecutionOptions builder() { if (graphSpace == null || graphSpace.trim().isEmpty()) { throw new IllegalArgumentException("graph space can not be empty."); @@ -171,7 +179,8 @@ public ExecutionOptions builder() { throw new IllegalArgumentException("edge can not be empty."); } return new EdgeExecutionOptions(graphSpace, executeStatement, fields, positions, - noColumn, limit, startTime, endTime, batch, policy, edge, srcIndex, dstIndex, + noColumn, limit, startTime, endTime, batch, policy, mode, edge, srcIndex, + dstIndex, rankIndex); } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/statement/ExecutionOptions.java b/connector/src/main/java/org.apache.flink/connector/nebula/statement/ExecutionOptions.java index a3b3fd4..6be81a0 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/statement/ExecutionOptions.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/statement/ExecutionOptions.java @@ -10,6 +10,7 @@ import java.util.List; import org.apache.flink.connector.nebula.utils.DataTypeEnum; import org.apache.flink.connector.nebula.utils.PolicyEnum; +import org.apache.flink.connector.nebula.utils.WriteModeEnum; import org.apache.flink.types.Row; /** @@ -126,6 +127,11 @@ public abstract class ExecutionOptions implements Serializable { */ private PolicyEnum policy; + /** + * write mode + */ + private WriteModeEnum writeMode; + protected ExecutionOptions(String graphSpace, String executeStatement, @@ -136,7 +142,8 @@ protected ExecutionOptions(String graphSpace, long startTime, long endTime, long batch, - PolicyEnum policy) { + PolicyEnum policy, + WriteModeEnum writeMode) { this.graphSpace = graphSpace; this.executeStatement = executeStatement; @@ -148,6 +155,7 @@ protected ExecutionOptions(String graphSpace, this.endTime = endTime; this.batch = batch; this.policy = policy; + this.writeMode = writeMode; } public String getGraphSpace() { @@ -194,6 +202,10 @@ public PolicyEnum getPolicy() { public abstract DataTypeEnum getDataType(); + public WriteModeEnum getWriteMode() { + return writeMode; + } + @Override public String toString() { return "ExecutionOptions{" @@ -206,8 +218,8 @@ public String toString() { + ", startTime=" + startTime + ", endTime=" + endTime + ", batch=" + batch - + ", policy=" - + policy + + ", policy=" + policy + + ", mode=" + writeMode + '}'; } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/statement/VertexExecutionOptions.java b/connector/src/main/java/org.apache.flink/connector/nebula/statement/VertexExecutionOptions.java index 75a02e8..683874f 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/statement/VertexExecutionOptions.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/statement/VertexExecutionOptions.java @@ -13,6 +13,7 @@ import java.util.List; import org.apache.flink.connector.nebula.utils.DataTypeEnum; import org.apache.flink.connector.nebula.utils.PolicyEnum; +import org.apache.flink.connector.nebula.utils.WriteModeEnum; public class VertexExecutionOptions extends ExecutionOptions { @@ -36,10 +37,11 @@ public VertexExecutionOptions(String graphSpace, long endTime, long batch, PolicyEnum policy, + WriteModeEnum mode, String tag, int idIndex) { super(graphSpace, executeStatement, fields, positions, noColumn, limit, startTime, - endTime, batch, policy); + endTime, batch, policy, mode); this.tag = tag; this.idIndex = idIndex; } @@ -70,6 +72,7 @@ public static class ExecutionOptionBuilder { private long endTime = Long.MAX_VALUE; private int batch = DEFAULT_WRITE_BATCH; private PolicyEnum policy = null; + private WriteModeEnum mode = WriteModeEnum.INSERT; private int idIndex = DEFAULT_ROW_INFO_INDEX; public ExecutionOptionBuilder setGraphSpace(String graphSpace) { @@ -138,6 +141,11 @@ public ExecutionOptionBuilder setIdIndex(int idIndex) { return this; } + public ExecutionOptionBuilder setWriteMode(WriteModeEnum mode) { + this.mode = mode; + return this; + } + public ExecutionOptions builder() { if (graphSpace == null || graphSpace.trim().isEmpty()) { throw new IllegalArgumentException("graph space can not be empty."); @@ -146,7 +154,8 @@ public ExecutionOptions builder() { throw new IllegalArgumentException("tag can not be empty."); } return new VertexExecutionOptions(graphSpace, executeStatement, fields, - positions, noColumn, limit, startTime, endTime, batch, policy, tag, idIndex); + positions, noColumn, limit, startTime, endTime, batch, policy, mode, tag, + idIndex); } } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaConstant.java b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaConstant.java index 089f8ca..17060bf 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaConstant.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaConstant.java @@ -8,13 +8,23 @@ public class NebulaConstant { // template for insert statement - public static String BATCH_INSERT_TEMPLATE = "INSERT %s %s(%s) VALUES %s"; + public static String BATCH_INSERT_TEMPLATE = "INSERT %s `%s`(%s) VALUES %s"; public static String VERTEX_VALUE_TEMPLATE = "%s: (%s)"; public static String VERTEX_VALUE_TEMPLATE_WITH_POLICY = "%s(\"%s\"): (%s)"; public static String ENDPOINT_TEMPLATE = "%s(\"%s\")"; public static String EDGE_VALUE_WITHOUT_RANKING_TEMPLATE = "%s->%s: (%s)"; public static String EDGE_VALUE_TEMPLATE = "%s->%s@%d: (%s)"; + // template for update statement + public static String UPDATE_VERTEX_TEMPLATE = "UPDATE %s ON `%s` %s SET %s"; + public static String UPDATE_EDGE_TEMPLATE = "UPDATE %s ON `%s` %s->%s@%d SET %s"; + public static String UPDATE_VALUE_TEMPLATE = "`%s`=%s"; + + // template for delete statement + public static String DELETE_VERTEX_TEMPLATE = "DELETE VERTEX %s"; + public static String DELETE_EDGE_TEMPLATE = "DELETE EDGE `%s` %s"; + public static String EDGE_ENDPOINT_TEMPLATE = "%s->%s@%d"; + // Delimiter public static String COMMA = ","; public static String SUB_LINE = "_"; diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaEdge.java b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaEdge.java index a0ac859..a84a2a1 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaEdge.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaEdge.java @@ -1,8 +1,54 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. +/* Copyright (c) 2021 vesoft inc. All rights reserved. * * This source code is licensed under Apache 2.0 License, * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ - -package org.apache.flink.connector.nebula.utils;public class NebulaEdge { + +package org.apache.flink.connector.nebula.utils; + +import java.io.Serializable; +import java.util.List; + +public class NebulaEdge implements Serializable { + private String source; + private String target; + private Long rank; + private List propValues; + + public NebulaEdge(String source, String target, Long rank, List propValues) { + this.source = source; + this.target = target; + this.rank = rank; + this.propValues = propValues; + } + + public String getSource() { + return source; + } + + public String getTarget() { + return target; + } + + public Long getRank() { + return rank; + } + + public List getPropValues() { + return propValues; + } + + public String getPropValuesString() { + return String.join(",", propValues); + } + + @Override + public String toString() { + return "NebulaEdge{" + + "source='" + source + '\'' + + ", target='" + target + '\'' + + ", rank=" + rank + + ", propValues=" + propValues + + '}'; + } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaEdges.java b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaEdges.java index ecd2d79..3b0cb39 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaEdges.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaEdges.java @@ -1,8 +1,190 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. +/* Copyright (c) 2021 vesoft inc. All rights reserved. * * This source code is licensed under Apache 2.0 License, * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ - -package org.apache.flink.connector.nebula.utils;public class NebulaEdges { + +package org.apache.flink.connector.nebula.utils; + +import static org.apache.flink.connector.nebula.utils.NebulaConstant.BATCH_INSERT_TEMPLATE; +import static org.apache.flink.connector.nebula.utils.NebulaConstant.DELETE_EDGE_TEMPLATE; +import static org.apache.flink.connector.nebula.utils.NebulaConstant.EDGE_ENDPOINT_TEMPLATE; +import static org.apache.flink.connector.nebula.utils.NebulaConstant.EDGE_VALUE_TEMPLATE; +import static org.apache.flink.connector.nebula.utils.NebulaConstant.EDGE_VALUE_WITHOUT_RANKING_TEMPLATE; +import static org.apache.flink.connector.nebula.utils.NebulaConstant.ENDPOINT_TEMPLATE; +import static org.apache.flink.connector.nebula.utils.NebulaConstant.UPDATE_EDGE_TEMPLATE; +import static org.apache.flink.connector.nebula.utils.NebulaConstant.UPDATE_VALUE_TEMPLATE; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class NebulaEdges implements Serializable { + + private String edgeType; + private List propNames; + private List edges; + private PolicyEnum sourcePolicy = null; + private PolicyEnum targetPolicy = null; + + public NebulaEdges(String edgeType, List propNames, List edges, + PolicyEnum sourcePolicy, PolicyEnum targetPolicy) { + this.edgeType = edgeType; + this.propNames = propNames; + this.edges = edges; + this.sourcePolicy = sourcePolicy; + this.targetPolicy = targetPolicy; + } + + public String getEdgeType() { + return edgeType; + } + + public String getPropNames() { + List escapePropNames = new ArrayList<>(); + for (String propName : propNames) { + escapePropNames.add(NebulaUtils.mkString(propName, "`", "", "`")); + } + return String.join(",", escapePropNames); + } + + public List getEdges() { + return edges; + } + + public PolicyEnum getSourcePolicy() { + return sourcePolicy; + } + + public PolicyEnum getTargetPolicy() { + return targetPolicy; + } + + /** + * construct Nebula batch insert ngql for edges + * + * @return ngql + */ + public String getInsertStatement() { + List values = new ArrayList<>(); + for (NebulaEdge edge : edges) { + String sourceId = getSourceId(edge); + String targetId = getTargetId(edge); + + // edge rank + if (edge.getRank() == null) { + values.add(String.format(EDGE_VALUE_WITHOUT_RANKING_TEMPLATE, sourceId, targetId, + edge.getPropValuesString())); + } else { + values.add(String.format(EDGE_VALUE_TEMPLATE, sourceId, targetId, edge.getRank(), + edge.getPropValuesString())); + } + } + return String.format(BATCH_INSERT_TEMPLATE, DataTypeEnum.EDGE.name(), edgeType, + getPropNames(), String.join(",", values)); + } + + /** + * construct Nebula batch update ngql for edge + * + * @return ngql + */ + public String getUpdateStatement() { + List statements = new ArrayList<>(); + // for update mode, each vertex construct one update statement. + for (NebulaEdge edge : edges) { + String sourceId = getSourceId(edge); + String targetId = getTargetId(edge); + long rank = 0; + if (edge.getRank() != null) { + rank = edge.getRank(); + } + + List updateProps = new ArrayList<>(); + for (int i = 0; i < propNames.size(); i++) { + updateProps.add(String.format(UPDATE_VALUE_TEMPLATE, propNames.get(i), + edge.getPropValues().get(i))); + } + String updatePropsString = String.join(",", updateProps); + String statement = String.format(UPDATE_EDGE_TEMPLATE, DataTypeEnum.EDGE.name(), + edgeType, sourceId, targetId, rank, updatePropsString); + statements.add(statement); + } + return String.join(";", statements); + } + + /** + * construct Nebula batch delete ngql for edge + * + * @return ngql + */ + public String getDeleteStatement() { + List sourceTargetIds = new ArrayList<>(); + for (NebulaEdge edge : edges) { + String sourceId = getSourceId(edge); + String targetId = getTargetId(edge); + long rank = 0; + if (edge.getRank() != null) { + rank = edge.getRank(); + } + String statement = String.format(EDGE_ENDPOINT_TEMPLATE, sourceId, targetId, rank); + sourceTargetIds.add(statement); + } + return String.format(DELETE_EDGE_TEMPLATE, edgeType, String.join(",", sourceTargetIds)); + } + + /** + * format edge source id with policy + * + * @param edge Nebula edge {@link NebulaEdge} + * @return the formatted source id + */ + private String getSourceId(NebulaEdge edge) { + String sourceId = null; + if (sourcePolicy == null) { + sourceId = edge.getSource(); + } else { + switch (sourcePolicy) { + case HASH: + sourceId = String.format(ENDPOINT_TEMPLATE, PolicyEnum.HASH.name(), + edge.getSource()); + break; + case UUID: + sourceId = String.format(ENDPOINT_TEMPLATE, PolicyEnum.UUID.name(), + edge.getSource()); + break; + default: + throw new IllegalArgumentException("source policy is not supported"); + } + } + return sourceId; + } + + /** + * format edge target id with policy + * + * @param edge Nebula edge {@link NebulaEdge} + * @return the formatted target id + */ + private String getTargetId(NebulaEdge edge) { + String targetId = null; + if (targetPolicy == null) { + targetId = edge.getTarget(); + } else { + switch (targetPolicy) { + case HASH: + targetId = String.format(ENDPOINT_TEMPLATE, PolicyEnum.HASH.name(), + edge.getTarget()); + break; + case UUID: + targetId = String.format(ENDPOINT_TEMPLATE, PolicyEnum.UUID.name(), + edge.getTarget()); + break; + default: + throw new IllegalArgumentException("target policy is not supported"); + } + } + return targetId; + } + } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaVertex.java b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaVertex.java index 38e6a4b..942a823 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaVertex.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaVertex.java @@ -1,4 +1,4 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. +/* Copyright (c) 2021 vesoft inc. All rights reserved. * * This source code is licensed under Apache 2.0 License, * attached with Common Clause Condition 1.0, found in the LICENSES directory. @@ -8,13 +8,17 @@ import java.io.Serializable; import java.util.List; -import java.util.stream.Collectors; -public class NebulaVertices implements Serializable { +public class NebulaVertex implements Serializable { private String vid; private List propValues; + public NebulaVertex(String vid, List propValues) { + this.vid = vid; + this.propValues = propValues; + } + public String getVid() { return vid; } @@ -23,11 +27,23 @@ public void setVid(String vid) { this.vid = vid; } - public String getPropValues() { - return propValues.stream().collect(Collectors.joining(",")); + public String getPropValuesString() { + return String.join(",", propValues); + } + + public List getPropValues() { + return propValues; } public void setPropValues(List propValues) { this.propValues = propValues; } + + @Override + public String toString() { + return "NebulaVertex{" + + "vid='" + vid + '\'' + + ", propValues=" + propValues + + '}'; + } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaVertices.java b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaVertices.java index 86940f8..2c385b9 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaVertices.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaVertices.java @@ -1,8 +1,140 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. +/* Copyright (c) 2021 vesoft inc. All rights reserved. * * This source code is licensed under Apache 2.0 License, * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ - -package org.apache.flink.connector.nebula.utils;public class NebulaVertices { + +package org.apache.flink.connector.nebula.utils; + +import static org.apache.flink.connector.nebula.utils.NebulaConstant.BATCH_INSERT_TEMPLATE; +import static org.apache.flink.connector.nebula.utils.NebulaConstant.DELETE_VERTEX_TEMPLATE; +import static org.apache.flink.connector.nebula.utils.NebulaConstant.ENDPOINT_TEMPLATE; +import static org.apache.flink.connector.nebula.utils.NebulaConstant.UPDATE_VALUE_TEMPLATE; +import static org.apache.flink.connector.nebula.utils.NebulaConstant.UPDATE_VERTEX_TEMPLATE; +import static org.apache.flink.connector.nebula.utils.NebulaConstant.VERTEX_VALUE_TEMPLATE; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class NebulaVertices implements Serializable { + + private String tagName; + private List propNames; + private List vertices; + private PolicyEnum policy = null; + + public NebulaVertices(String tagName, List propNames, List vertices, + PolicyEnum policy) { + this.tagName = tagName; + this.propNames = propNames; + this.vertices = vertices; + this.policy = policy; + } + + public String getPropNames() { + List escapePropNames = new ArrayList<>(); + for (String propName : propNames) { + escapePropNames.add(NebulaUtils.mkString(propName, "`", "", "`")); + } + return String.join(",", escapePropNames); + } + + public void setPropNames(List propNames) { + this.propNames = propNames; + } + + public List getVertices() { + return vertices; + } + + public void setVertices(List vertices) { + this.vertices = vertices; + } + + public PolicyEnum getPolicy() { + return policy; + } + + public void setPolicy(PolicyEnum policy) { + this.policy = policy; + } + + /** + * construct Nebula batch insert ngql for vertex + * + * @return ngql + */ + public String getInsertStatement() { + List values = new ArrayList<>(); + for (NebulaVertex vertex : vertices) { + String vertexId = getVertexId(vertex); + values.add(String.format(VERTEX_VALUE_TEMPLATE, vertexId, + vertex.getPropValuesString())); + } + return String.format(BATCH_INSERT_TEMPLATE, DataTypeEnum.VERTEX.name(), tagName, + getPropNames(), String.join(",", values)); + } + + /** + * construct Nebula batch update ngql for vertex + * + * @return ngql + */ + public String getUpdateStatement() { + List statements = new ArrayList<>(); + // for update mode, each vertex construct one update statement. + for (NebulaVertex vertex : vertices) { + String vertexId = getVertexId(vertex); + + List updateProps = new ArrayList<>(); + for (int i = 0; i < propNames.size(); i++) { + updateProps.add(String.format(UPDATE_VALUE_TEMPLATE, propNames.get(i), + vertex.getPropValues().get(i))); + } + String updatePropsString = String.join(",", updateProps); + String statement = String.format(UPDATE_VERTEX_TEMPLATE, DataTypeEnum.VERTEX.name(), + tagName, vertexId, updatePropsString); + statements.add(statement); + } + return String.join(";", statements); + } + + /** + * construct Nebula batch delete ngql for vertex + * + * @return ngql + */ + public String getDeleteStatement() { + List vertexIds = new ArrayList<>(); + for (NebulaVertex vertex : vertices) { + String vertexId = getVertexId(vertex); + vertexIds.add(vertexId); + } + return String.format(DELETE_VERTEX_TEMPLATE, String.join(",", vertexIds)); + } + + /** + * format vertex id with policy + */ + private String getVertexId(NebulaVertex vertex) { + String vertexId = null; + if (policy == null) { + vertexId = vertex.getVid(); + } else { + switch (policy) { + case HASH: + vertexId = String.format(ENDPOINT_TEMPLATE, PolicyEnum.HASH.name(), + vertex.getVid()); + break; + case UUID: + vertexId = String.format(ENDPOINT_TEMPLATE, PolicyEnum.UUID.name(), + vertex.getVid()); + break; + default: + throw new IllegalArgumentException("policy is not supported"); + } + } + return vertexId; + } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/utils/WriteModeEnum.java b/connector/src/main/java/org.apache.flink/connector/nebula/utils/WriteModeEnum.java index 211eb4b..19fb6cc 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/utils/WriteModeEnum.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/utils/WriteModeEnum.java @@ -1,8 +1,30 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. +/* Copyright (c) 2021 vesoft inc. All rights reserved. * * This source code is licensed under Apache 2.0 License, * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ - -package org.apache.flink.connector.nebula.utils;public class WriteMode { + +package org.apache.flink.connector.nebula.utils; + +public enum WriteModeEnum { + /** + * INSERT write mode + */ + INSERT("insert"), + + /** + * UPDATE write mode + */ + UPDATE("update"), + + /** + * DELETE write mode + */ + DELETE("delete"); + + private String mode; + + WriteModeEnum(String mode) { + this.mode = mode; + } } From 2f47366ad46b197d32400915e1e2a36c7de08cea Mon Sep 17 00:00:00 2001 From: Nicole00 Date: Thu, 16 Sep 2021 15:59:14 +0800 Subject: [PATCH 3/6] add test for sink nebula --- .../flink/connector/nebula/MockData.java | 82 +++- .../sink/AbstractNebulaOutPutFormatTest.java | 3 +- .../sink/NebulaEdgeBatchExecutorTest.java | 369 +++++++++++++++++- .../sink/NebulaOutputFormatConverterTest.java | 26 +- ...ebulaRowEdgeOutputFormatConverterTest.java | 141 ++++++- ...ulaRowVertexOutputFormatConverterTest.java | 103 ++++- .../sink/NebulaVertexBatchExecutorTest.java | 365 ++++++++++++++++- .../nebula/utils/NebulaEdgesTest.java | 159 +++++++- .../nebula/utils/NebulaVerticesTest.java | 114 +++++- 9 files changed, 1327 insertions(+), 35 deletions(-) diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/MockData.java b/connector/src/test/java/org/apache/flink/connector/nebula/MockData.java index f0ae8ad..de5fb4d 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/MockData.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/MockData.java @@ -3,6 +3,84 @@ * This source code is licensed under Apache 2.0 License, * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ - -package org.apache.flink.connector.nebula;public class MockData { + +package org.apache.flink.connector.nebula; + +import com.vesoft.nebula.client.graph.NebulaPoolConfig; +import com.vesoft.nebula.client.graph.data.HostAddress; +import com.vesoft.nebula.client.graph.data.ResultSet; +import com.vesoft.nebula.client.graph.exception.AuthFailedException; +import com.vesoft.nebula.client.graph.exception.IOErrorException; +import com.vesoft.nebula.client.graph.exception.NotValidConnectionException; +import com.vesoft.nebula.client.graph.net.NebulaPool; +import com.vesoft.nebula.client.graph.net.Session; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MockData { + private static final Logger LOGGER = + LoggerFactory.getLogger(MockData.class); + + public static void mockSchema() { + NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig(); + List addresses = Arrays.asList(new HostAddress("127.0.0.1", 9669)); + NebulaPool pool = new NebulaPool(); + Session session = null; + try { + pool.init(addresses, nebulaPoolConfig); + session = pool.getSession("root", "nebula", true); + + ResultSet respStringSpace = session.execute(createStringSpace()); + ResultSet respIntSpace = session.execute(createIntSpace()); + + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + + if (!respStringSpace.isSucceeded()) { + LOGGER.error("create string vid type space failed, {}", + respStringSpace.getErrorMessage()); + assert (false); + } + if (!respIntSpace.isSucceeded()) { + LOGGER.error("create int vid type space failed, {}", + respIntSpace.getErrorMessage()); + assert (false); + } + } catch (UnknownHostException | NotValidConnectionException + | IOErrorException | AuthFailedException e) { + LOGGER.error("create space error, ", e); + assert (false); + } finally { + pool.close(); + } + } + + + private static String createStringSpace() { + String exec = "CREATE SPACE IF NOT EXISTS test_string(partition_num=10," + + "vid_type=fixed_string(8));" + + "USE test_string;" + + "CREATE TAG IF NOT EXISTS person(col1 fixed_string(8), col2 string, col3 int32," + + " col4 double, col5 date, col6 datetime, col7 time, col8 timestamp);" + + "CREATE EDGE IF NOT EXISTS friend(col1 fixed_string(8), col2 string, col3 " + + "int32, col4 double, col5 date, col6 datetime, col7 time, col8 timestamp);"; + return exec; + } + + private static String createIntSpace() { + String exec = "CREATE SPACE IF NOT EXISTS test_int(partition_num=10,vid_type=int64);" + + "USE test_int;" + + "CREATE TAG IF NOT EXISTS person(col1 fixed_string(8), col2 string, col3 int32," + + " col4 double, col5 date, col6 datetime, col7 time, col8 timestamp);" + + "CREATE EDGE IF NOT EXISTS friend(col1 fixed_string(8), col2 string, col3 " + + "int32, col4 double, col5 date, col6 datetime, col7 time, col8 timestamp);"; + return exec; + } } diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/sink/AbstractNebulaOutPutFormatTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/sink/AbstractNebulaOutPutFormatTest.java index 2c0c4ba..e9242af 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/sink/AbstractNebulaOutPutFormatTest.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/sink/AbstractNebulaOutPutFormatTest.java @@ -4,7 +4,7 @@ * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ -package org.apache.flink.sink; +package org.apache.flink.connector.nebula.sink; import com.vesoft.nebula.client.graph.NebulaPoolConfig; import com.vesoft.nebula.client.graph.data.HostAddress; @@ -18,7 +18,6 @@ import org.apache.flink.connector.nebula.connection.NebulaClientOptions; import org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider; import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider; -import org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat; import org.apache.flink.connector.nebula.statement.ExecutionOptions; import org.apache.flink.connector.nebula.statement.VertexExecutionOptions; import org.apache.flink.types.Row; diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaEdgeBatchExecutorTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaEdgeBatchExecutorTest.java index e5dc885..34893be 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaEdgeBatchExecutorTest.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaEdgeBatchExecutorTest.java @@ -1,4 +1,367 @@ -import junit.framework.TestCase; -public class NebulaEdgeBatchExecutorTest extends TestCase { - +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package org.apache.flink.connector.nebula.sink; + +import com.vesoft.nebula.client.graph.NebulaPoolConfig; +import com.vesoft.nebula.client.graph.data.HostAddress; +import com.vesoft.nebula.client.graph.data.ResultSet; +import com.vesoft.nebula.client.graph.exception.IOErrorException; +import com.vesoft.nebula.client.graph.net.NebulaPool; +import com.vesoft.nebula.client.graph.net.Session; +import com.vesoft.nebula.meta.PropertyType; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import org.apache.flink.connector.nebula.MockData; +import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions; +import org.apache.flink.connector.nebula.statement.ExecutionOptions; +import org.apache.flink.connector.nebula.utils.VidTypeEnum; +import org.apache.flink.connector.nebula.utils.WriteModeEnum; +import org.apache.flink.types.Row; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NebulaEdgeBatchExecutorTest { + + private static final Logger LOGGER = + LoggerFactory.getLogger(NebulaEdgeBatchExecutorTest.class); + + String ip = "127.0.0.1"; + EdgeExecutionOptions.ExecutionOptionBuilder builder = null; + Map schema = new HashMap<>(); + Row row1 = new Row(10); + Row row2 = new Row(10); + + Session session = null; + + @Before + public void before() { + MockData.mockSchema(); + builder = new EdgeExecutionOptions.ExecutionOptionBuilder() + .setEdge("friend") + .setSrcIndex(0) + .setDstIndex(1) + .setFields(Arrays.asList("col1", "col2", "col3", "col4", "col5", "col6", "col7", + "col8")) + .setPositions(Arrays.asList(2, 3, 4, 5, 6, 7, 8, 9)); + + schema.put("col1", PropertyType.STRING.getValue()); + schema.put("col2", PropertyType.FIXED_STRING.getValue()); + schema.put("col3", PropertyType.INT32.getValue()); + schema.put("col4", PropertyType.DOUBLE.getValue()); + schema.put("col5", PropertyType.DATE.getValue()); + schema.put("col6", PropertyType.DATETIME.getValue()); + schema.put("col7", PropertyType.TIME.getValue()); + schema.put("col8", PropertyType.TIMESTAMP.getValue()); + + row1.setField(0, 1); + row1.setField(1, 2); + row1.setField(2, "Tom"); + row1.setField(3, "Tom"); + row1.setField(4, 10); + row1.setField(5, 1.0); + row1.setField(6, "2021-01-01"); + row1.setField(7, "2021-01-01T12:00:00"); + row1.setField(8, "12:00:00"); + row1.setField(9, 372435234); + + row2.setField(0, 2); + row2.setField(1, 3); + row2.setField(2, "Jina"); + row2.setField(3, "Jina"); + row2.setField(4, 20); + row2.setField(5, 2.0); + row2.setField(6, "2021-02-01"); + row2.setField(7, "2021-02-01T12:00:00"); + row2.setField(8, "15:00:00"); + row2.setField(9, 392435234); + + // get Session + NebulaPoolConfig poolConfig = new NebulaPoolConfig(); + NebulaPool pool = new NebulaPool(); + + try { + pool.init(Arrays.asList(new HostAddress(ip, 9669)), poolConfig); + session = pool.getSession("root", "nebula", true); + } catch (Exception e) { + LOGGER.error("init nebula pool error, ", e); + assert (false); + } + } + + /** + * test addToBatch for INSERT write mode + */ + @Test + public void testAddToBatchWithInsert() { + ExecutionOptions options = builder + .setGraphSpace("test_int") + .setWriteMode(WriteModeEnum.INSERT) + .builder(); + NebulaEdgeBatchExecutor edgeBatchExecutor = + new NebulaEdgeBatchExecutor<>(options, VidTypeEnum.INT, schema); + edgeBatchExecutor.addToBatch(row1); + } + + /** + * test addToBatch for INSERT write mode + */ + @Test + public void testAddToBatchWithInsertPolicy() { + ExecutionOptions options = builder + .setGraphSpace("test_int") + .setPolicy("HASH") + .setWriteMode(WriteModeEnum.INSERT) + .builder(); + NebulaEdgeBatchExecutor edgeBatchExecutor = + new NebulaEdgeBatchExecutor<>(options, VidTypeEnum.INT, schema); + edgeBatchExecutor.addToBatch(row1); + } + + /** + * test addToBatch for UPDATE write mode + */ + @Test + public void testAddToBatchWithUpdate() { + ExecutionOptions options = builder + .setGraphSpace("test_int") + .setWriteMode(WriteModeEnum.UPDATE) + .builder(); + NebulaEdgeBatchExecutor edgeBatchExecutor = + new NebulaEdgeBatchExecutor<>(options, VidTypeEnum.INT, schema); + edgeBatchExecutor.addToBatch(row1); + } + + /** + * test addToBatch for UPDATE write mode + */ + @Test + public void testAddToBatchWithUpdatePolicy() { + ExecutionOptions options = builder + .setGraphSpace("test_int") + .setPolicy("HASH") + .setWriteMode(WriteModeEnum.UPDATE) + .builder(); + NebulaEdgeBatchExecutor edgeBatchExecutor = + new NebulaEdgeBatchExecutor<>(options, VidTypeEnum.INT, schema); + edgeBatchExecutor.addToBatch(row1); + } + + /** + * test addToBatch for DELETE write mode + */ + @Test + public void testAddToBatchWithDelete() { + ExecutionOptions options = builder + .setGraphSpace("test_int") + .setWriteMode(WriteModeEnum.DELETE) + .builder(); + NebulaEdgeBatchExecutor edgeBatchExecutor = + new NebulaEdgeBatchExecutor<>(options, VidTypeEnum.INT, schema); + edgeBatchExecutor.addToBatch(row1); + } + + /** + * test addToBatch for DELETE write mode + */ + @Test + public void testAddToBatchWithDeletePolicy() { + ExecutionOptions options = builder + .setGraphSpace("test_int") + .setPolicy("HASH") + .setWriteMode(WriteModeEnum.DELETE) + .builder(); + NebulaEdgeBatchExecutor edgeBatchExecutor = + new NebulaEdgeBatchExecutor<>(options, VidTypeEnum.INT, schema); + edgeBatchExecutor.addToBatch(row1); + } + + /** + * test batch execute for int vid and insert mode + */ + @Test + public void testExecuteBatch() { + ExecutionOptions options = builder + .setGraphSpace("test_int") + .setPolicy("HASH") + .setWriteMode(WriteModeEnum.INSERT) + .builder(); + NebulaEdgeBatchExecutor edgeBatchExecutor = + new NebulaEdgeBatchExecutor<>(options, VidTypeEnum.INT, schema); + edgeBatchExecutor.addToBatch(row1); + edgeBatchExecutor.addToBatch(row2); + + ResultSet resultSet = null; + try { + resultSet = session.execute("USE test_int"); + } catch (IOErrorException e) { + LOGGER.error("switch space error,", e); + assert (false); + } + if (!resultSet.isSucceeded()) { + LOGGER.error("switch space failed,{}", resultSet.getErrorMessage()); + assert (false); + } + + String statement = edgeBatchExecutor.executeBatch(session); + assert (statement == null); + } + + /** + * test batch exeucte for int vid and UPDATE mode + */ + @Test + public void testExecuteBatchWithUpdate() { + ExecutionOptions options = builder + .setGraphSpace("test_int") + .setPolicy("HASH") + .setWriteMode(WriteModeEnum.UPDATE) + .builder(); + NebulaEdgeBatchExecutor edgeBatchExecutor = + new NebulaEdgeBatchExecutor<>(options, VidTypeEnum.INT, schema); + edgeBatchExecutor.addToBatch(row1); + edgeBatchExecutor.addToBatch(row2); + + ResultSet resultSet = null; + try { + resultSet = session.execute("USE test_int"); + } catch (IOErrorException e) { + LOGGER.error("switch space error,", e); + assert (false); + } + if (!resultSet.isSucceeded()) { + LOGGER.error("switch space failed,{}", resultSet.getErrorMessage()); + assert (false); + } + + String statement = edgeBatchExecutor.executeBatch(session); + assert (statement == null); + } + + /** + * test batch exeucte for int vid and DELETE mode + */ + @Test + public void testExecuteBatchWithDelete() { + ExecutionOptions options = builder.setGraphSpace("test_int") + .setPolicy("HASH") + .setWriteMode(WriteModeEnum.DELETE) + .builder(); + NebulaEdgeBatchExecutor edgeBatchExecutor = + new NebulaEdgeBatchExecutor<>(options, VidTypeEnum.INT, schema); + edgeBatchExecutor.addToBatch(row1); + edgeBatchExecutor.addToBatch(row2); + + ResultSet resultSet = null; + try { + resultSet = session.execute("USE test_int"); + } catch (IOErrorException e) { + LOGGER.error("switch space error,", e); + assert (false); + } + if (!resultSet.isSucceeded()) { + LOGGER.error("switch space failed,{}", resultSet.getErrorMessage()); + assert (false); + } + + String statement = edgeBatchExecutor.executeBatch(session); + assert (statement == null); + } + + /** + * test batch exeucte for string vid and insert mode + */ + @Test + public void testExecuteBatchWithStringVidAndInsert() { + ExecutionOptions options = builder + .setGraphSpace("test_string") + .setWriteMode(WriteModeEnum.INSERT) + .builder(); + NebulaEdgeBatchExecutor edgeBatchExecutor = + new NebulaEdgeBatchExecutor<>(options, VidTypeEnum.STRING, schema); + edgeBatchExecutor.addToBatch(row1); + edgeBatchExecutor.addToBatch(row2); + + ResultSet resultSet = null; + try { + resultSet = session.execute("USE test_string"); + } catch (IOErrorException e) { + LOGGER.error("switch space error,", e); + assert (false); + } + if (!resultSet.isSucceeded()) { + LOGGER.error("switch space failed,{}", resultSet.getErrorMessage()); + assert (false); + } + + String statement = edgeBatchExecutor.executeBatch(session); + assert (statement == null); + } + + /** + * test batch execute for string vid and update mode + */ + @Test + public void testExecuteBatchWithStringVidAndUpdate() { + ExecutionOptions options = builder + .setGraphSpace("test_string") + .setWriteMode(WriteModeEnum.UPDATE) + .builder(); + NebulaEdgeBatchExecutor edgeBatchExecutor = + new NebulaEdgeBatchExecutor<>(options, VidTypeEnum.STRING, schema); + edgeBatchExecutor.addToBatch(row1); + edgeBatchExecutor.addToBatch(row2); + + ResultSet resultSet = null; + try { + resultSet = session.execute("USE test_string"); + } catch (IOErrorException e) { + LOGGER.error("switch space error,", e); + assert (false); + } + if (!resultSet.isSucceeded()) { + LOGGER.error("switch space failed,{}", resultSet.getErrorMessage()); + assert (false); + } + + String statement = edgeBatchExecutor.executeBatch(session); + assert (statement == null); + } + + /** + * test batch execute for string vid and DELETE mode + */ + @Test + public void testExecuteBatchWithStringVidAndDelete() { + ExecutionOptions options = builder + .setGraphSpace("test_string") + .setWriteMode(WriteModeEnum.DELETE) + .builder(); + NebulaEdgeBatchExecutor edgeBatchExecutor = + new NebulaEdgeBatchExecutor<>(options, VidTypeEnum.STRING, schema); + edgeBatchExecutor.addToBatch(row1); + edgeBatchExecutor.addToBatch(row2); + + ResultSet resultSet = null; + try { + resultSet = session.execute("USE test_string"); + } catch (IOErrorException e) { + LOGGER.error("switch space error,", e); + assert (false); + } + if (!resultSet.isSucceeded()) { + LOGGER.error("switch space failed,{}", resultSet.getErrorMessage()); + assert (false); + } + + String statement = edgeBatchExecutor.executeBatch(session); + assert (statement == null); + } + } diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaOutputFormatConverterTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaOutputFormatConverterTest.java index 25eb28d..979a5ab 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaOutputFormatConverterTest.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaOutputFormatConverterTest.java @@ -4,7 +4,7 @@ * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ -package org.apache.flink.sink; +package org.apache.flink.connector.nebula.sink; import com.vesoft.nebula.meta.PropertyType; import java.util.Arrays; @@ -67,9 +67,9 @@ public void testCreateVertexValue() { schema); NebulaVertex vertex = helper.createVertex(row, null); - assert(vertex.getVid().equals("\"2\"")); - assert(vertex.getPropValues().size() == 2); - assert(vertex.getPropValuesString().equals("\"Tom\",11")); + assert (vertex.getVid().equals("\"2\"")); + assert (vertex.getPropValues().size() == 2); + assert (vertex.getPropValuesString().equals("\"Tom\",11")); } @Test @@ -88,8 +88,8 @@ public void testVertexDateValue() { NebulaVertex vertex = helper.createVertex(row, null); assert (vertex.getVid().equals("\"2\"")); - assert (vertex.getPropValuesString().equals("\"Tom\",date(\"2020-01-01\"),datetime" + - "(\"2020-01-01 12:12:12:0000\"),time(\"12:12:12:0000\"),11")); + assert (vertex.getPropValuesString().equals("\"Tom\",date(\"2020-01-01\"),datetime" + + "(\"2020-01-01 12:12:12:0000\"),time(\"12:12:12:0000\"),11")); } @Test @@ -159,8 +159,8 @@ public void testEdgeDateValue() { assert (edge.getTarget().equals("\"Jena\"")); assert (edge.getRank() == null); assert (edge.getPropValues().size() == 4); - assert (edge.getPropValuesString().equals("12.0,date(\"2020-01-01\"),datetime" + - "(\"2020-01-01 12:12:12:0000\"),time(\"12:12:12:0000\")")); + assert (edge.getPropValuesString().equals("12.0,date(\"2020-01-01\"),datetime" + + "(\"2020-01-01 12:12:12:0000\"),time(\"12:12:12:0000\")")); } @Test @@ -180,10 +180,10 @@ public void testIntVidEdge() { schema); NebulaEdge edge = helper.createEdge(row, PolicyEnum.HASH); - assert(edge.getSource().equals("HASH(\"Tom\")")); - assert(edge.getTarget().equals("HASH(\"Jena\")")); - assert(edge.getRank() == null); - assert(edge.getPropValues().size() == 1); - assert(edge.getPropValuesString().equals("12.0")); + assert (edge.getSource().equals("HASH(\"Tom\")")); + assert (edge.getTarget().equals("HASH(\"Jena\")")); + assert (edge.getRank() == null); + assert (edge.getPropValues().size() == 1); + assert (edge.getPropValuesString().equals("12.0")); } } diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaRowEdgeOutputFormatConverterTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaRowEdgeOutputFormatConverterTest.java index d115fe3..c8a6a10 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaRowEdgeOutputFormatConverterTest.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaRowEdgeOutputFormatConverterTest.java @@ -1,4 +1,139 @@ -import junit.framework.TestCase; -public class NebulaRowEdgeOutputFormatConverterTest extends TestCase { - +package org.apache.flink.connector.nebula.sink; + +import com.vesoft.nebula.meta.PropertyType; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions; +import org.apache.flink.connector.nebula.statement.ExecutionOptions; +import org.apache.flink.connector.nebula.utils.NebulaEdge; +import org.apache.flink.connector.nebula.utils.PolicyEnum; +import org.apache.flink.connector.nebula.utils.VidTypeEnum; +import org.apache.flink.types.Row; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NebulaRowEdgeOutputFormatConverterTest { + + private static final Logger LOGGER = + LoggerFactory.getLogger(NebulaRowEdgeOutputFormatConverterTest.class); + + EdgeExecutionOptions.ExecutionOptionBuilder builder = null; + Map schema = new HashMap<>(); + Row row = new Row(10); + + @Before + public void setUp() { + builder = new EdgeExecutionOptions.ExecutionOptionBuilder() + .setGraphSpace("test") + .setEdge("friend") + .setSrcIndex(0) + .setDstIndex(1) + .setFields(Arrays.asList("col1", "col2", "col3", "col4", "col5", "col6", "col7", + "col8")) + .setPositions(Arrays.asList(2, 3, 4, 5, 6, 7, 8, 9)); + + schema.put("col1", PropertyType.STRING.getValue()); + schema.put("col2", PropertyType.FIXED_STRING.getValue()); + schema.put("col3", PropertyType.INT32.getValue()); + schema.put("col4", PropertyType.DOUBLE.getValue()); + schema.put("col5", PropertyType.DATE.getValue()); + schema.put("col6", PropertyType.DATETIME.getValue()); + schema.put("col7", PropertyType.TIME.getValue()); + schema.put("col8", PropertyType.TIMESTAMP.getValue()); + + row.setField(0, 1); + row.setField(1, 2); + row.setField(2, "Tom"); + row.setField(3, "Tom"); + row.setField(4, 10); + row.setField(5, 1.0); + row.setField(6, "2021-01-01"); + row.setField(7, "2021-01-01T12:00:00"); + row.setField(8, "12:00:00"); + row.setField(9, 372435234); + } + + public void tearDown() { + } + + /** + * test create edge for int id + */ + @Test + public void testCreateEdgeIntId() { + ExecutionOptions options = builder.builder(); + NebulaRowEdgeOutputFormatConverter converter = + new NebulaRowEdgeOutputFormatConverter((EdgeExecutionOptions) options, + VidTypeEnum.INT, schema); + NebulaEdge edge = converter.createEdge(row, null); + assert (edge.getSource().equals("1")); + assert (edge.getTarget().equals("2")); + assert (edge.getRank() == null); + assert (edge.getPropValuesString().equals("\"Tom\",\"Tom\",10,1.0,date(\"2021-01-01\")," + + "datetime(\"2021-01-01T12:00:00\"),time(\"12:00:00\"),372435234")); + + } + + /** + * test create edge with rank for int id + */ + @Test + public void testCreateEdgeIntIdWithRank() { + ExecutionOptions options = builder.setRankIndex(4).builder(); + NebulaRowEdgeOutputFormatConverter converter = + new NebulaRowEdgeOutputFormatConverter((EdgeExecutionOptions) options, + VidTypeEnum.INT, schema); + NebulaEdge edge = converter.createEdge(row, null); + assert (edge.getSource().equals("1")); + assert (edge.getTarget().equals("2")); + assert (edge.getRank() == 10L); + } + + /** + * test create edge with policy for int id + */ + @Test + public void testCreateEdgeIntIdWithPolicy() { + ExecutionOptions options = builder.builder(); + NebulaRowEdgeOutputFormatConverter converter = + new NebulaRowEdgeOutputFormatConverter((EdgeExecutionOptions) options, + VidTypeEnum.INT, schema); + NebulaEdge edge = converter.createEdge(row, PolicyEnum.HASH); + assert (edge.getSource().equals("1")); + assert (edge.getTarget().equals("2")); + assert (edge.getRank() == null); + } + + /** + * test create edge for String id + */ + @Test + public void testCreateEdgeStringId() { + ExecutionOptions options = builder.builder(); + NebulaRowEdgeOutputFormatConverter converter = + new NebulaRowEdgeOutputFormatConverter((EdgeExecutionOptions) options, + VidTypeEnum.STRING, schema); + NebulaEdge edge = converter.createEdge(row, null); + assert (edge.getSource().equals("\"1\"")); + assert (edge.getTarget().equals("\"2\"")); + assert (edge.getRank() == null); + } + + /** + * test create edge with rank for String id + */ + @Test + public void testCreateEdgeStringIdWithRank() { + ExecutionOptions options = builder.setRankIndex(4).builder(); + NebulaRowEdgeOutputFormatConverter converter = + new NebulaRowEdgeOutputFormatConverter((EdgeExecutionOptions) options, + VidTypeEnum.STRING, schema); + NebulaEdge edge = converter.createEdge(row, null); + assert (edge.getSource().equals("\"1\"")); + assert (edge.getTarget().equals("\"2\"")); + assert (edge.getRank() == 10L); + } } diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaRowVertexOutputFormatConverterTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaRowVertexOutputFormatConverterTest.java index 405a62c..0839bac 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaRowVertexOutputFormatConverterTest.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaRowVertexOutputFormatConverterTest.java @@ -1,4 +1,101 @@ -import junit.framework.TestCase; -public class NebulaRowVertexOutputFormatConverterTest extends TestCase { - +package org.apache.flink.connector.nebula.sink; + +import com.vesoft.nebula.meta.PropertyType; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import org.apache.flink.connector.nebula.statement.ExecutionOptions; +import org.apache.flink.connector.nebula.statement.VertexExecutionOptions; +import org.apache.flink.connector.nebula.utils.NebulaVertex; +import org.apache.flink.connector.nebula.utils.VidTypeEnum; +import org.apache.flink.types.Row; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NebulaRowVertexOutputFormatConverterTest { + private static final Logger LOGGER = + LoggerFactory.getLogger(NebulaRowVertexOutputFormatConverterTest.class); + + VertexExecutionOptions.ExecutionOptionBuilder builder = null; + Map schema = new HashMap<>(); + Row row = new Row(9); + + @Before + public void setUp() { + builder = new VertexExecutionOptions.ExecutionOptionBuilder() + .setGraphSpace("test") + .setTag("person") + .setIdIndex(0) + .setFields(Arrays.asList("col1", "col2", "col3", "col4", "col5", "col6", "col7", + "col8")) + .setPositions(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8)); + + schema.put("col1", PropertyType.STRING.getValue()); + schema.put("col2", PropertyType.FIXED_STRING.getValue()); + schema.put("col3", PropertyType.INT32.getValue()); + schema.put("col4", PropertyType.DOUBLE.getValue()); + schema.put("col5", PropertyType.DATE.getValue()); + schema.put("col6", PropertyType.DATETIME.getValue()); + schema.put("col7", PropertyType.TIME.getValue()); + schema.put("col8", PropertyType.TIMESTAMP.getValue()); + + row.setField(0, 1); + row.setField(1, "Tom"); + row.setField(2, "Tom"); + row.setField(3, 10); + row.setField(4, 1.0); + row.setField(5, "2021-01-01"); + row.setField(6, "2021-01-01T12:00:00"); + row.setField(7, "12:00:00"); + row.setField(8, 372435234); + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void testCreateVertex() { + ExecutionOptions options = builder.builder(); + NebulaRowVertexOutputFormatConverter converter = + new NebulaRowVertexOutputFormatConverter((VertexExecutionOptions) options, + VidTypeEnum.INT, schema); + NebulaVertex vertex = converter.createVertex(row, null); + assert (vertex.getVid().equals("1")); + assert (vertex.getPropValuesString().equals("\"Tom\",\"Tom\",10,1.0,date(\"2021-01-01\")," + + "datetime(\"2021-01-01T12:00:00\"),time(\"12:00:00\"),372435234")); + } + + /** + * test create vertex with policy for int vid type + */ + @Test + public void testCreateVertexPolicy() { + ExecutionOptions options = builder.setPolicy("HASH").builder(); + NebulaRowVertexOutputFormatConverter converter = + new NebulaRowVertexOutputFormatConverter((VertexExecutionOptions) options, + VidTypeEnum.INT, schema); + NebulaVertex vertex = converter.createVertex(row, null); + assert (vertex.getVid().equals("1")); + assert (vertex.getPropValuesString().equals("\"Tom\",\"Tom\",10,1.0,date(\"2021-01-01\")," + + "datetime(\"2021-01-01T12:00:00\"),time(\"12:00:00\"),372435234")); + } + + /** + * test create vertex for string vid type + */ + @Test + public void testCreateVertexStringId() { + ExecutionOptions options = builder.builder(); + NebulaRowVertexOutputFormatConverter converter = + new NebulaRowVertexOutputFormatConverter((VertexExecutionOptions) options, + VidTypeEnum.STRING, schema); + NebulaVertex vertex = converter.createVertex(row, null); + assert (vertex.getVid().equals("\"1\"")); + assert (vertex.getPropValuesString().equals("\"Tom\",\"Tom\",10,1.0,date(\"2021-01-01\")," + + "datetime(\"2021-01-01T12:00:00\"),time(\"12:00:00\"),372435234")); + } } diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaVertexBatchExecutorTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaVertexBatchExecutorTest.java index cd8e9eb..052fb6c 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaVertexBatchExecutorTest.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaVertexBatchExecutorTest.java @@ -1,4 +1,363 @@ -import junit.framework.TestCase; -public class NebulaVertexBatchExecutorTest extends TestCase { - +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package org.apache.flink.connector.nebula.sink; + +import com.vesoft.nebula.client.graph.NebulaPoolConfig; +import com.vesoft.nebula.client.graph.data.HostAddress; +import com.vesoft.nebula.client.graph.data.ResultSet; +import com.vesoft.nebula.client.graph.exception.IOErrorException; +import com.vesoft.nebula.client.graph.net.NebulaPool; +import com.vesoft.nebula.client.graph.net.Session; +import com.vesoft.nebula.meta.PropertyType; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import org.apache.flink.connector.nebula.MockData; +import org.apache.flink.connector.nebula.statement.ExecutionOptions; +import org.apache.flink.connector.nebula.statement.VertexExecutionOptions; +import org.apache.flink.connector.nebula.utils.VidTypeEnum; +import org.apache.flink.connector.nebula.utils.WriteModeEnum; +import org.apache.flink.types.Row; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NebulaVertexBatchExecutorTest { + private static final Logger LOGGER = + LoggerFactory.getLogger(NebulaVertexBatchExecutorTest.class); + + String ip = "127.0.0.1"; + VertexExecutionOptions.ExecutionOptionBuilder builder = null; + Map schema = new HashMap<>(); + Row row1 = new Row(9); + Row row2 = new Row(9); + + Session session = null; + + @Before + public void before() { + MockData.mockSchema(); + builder = new VertexExecutionOptions.ExecutionOptionBuilder() + .setTag("person") + .setIdIndex(0) + .setFields(Arrays.asList("col1", "col2", "col3", "col4", "col5", "col6", "col7", + "col8")) + .setPositions(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8)); + + schema.put("col1", PropertyType.STRING.getValue()); + schema.put("col2", PropertyType.FIXED_STRING.getValue()); + schema.put("col3", PropertyType.INT32.getValue()); + schema.put("col4", PropertyType.DOUBLE.getValue()); + schema.put("col5", PropertyType.DATE.getValue()); + schema.put("col6", PropertyType.DATETIME.getValue()); + schema.put("col7", PropertyType.TIME.getValue()); + schema.put("col8", PropertyType.TIMESTAMP.getValue()); + + row1.setField(0, 1); + row1.setField(1, "Tom"); + row1.setField(2, "Tom"); + row1.setField(3, 10); + row1.setField(4, 1.0); + row1.setField(5, "2021-01-01"); + row1.setField(6, "2021-01-01T12:00:00"); + row1.setField(7, "12:00:00"); + row1.setField(8, 372435234); + + row2.setField(0, 2); + row2.setField(1, "Jina"); + row2.setField(2, "Jina"); + row2.setField(3, 20); + row2.setField(4, 2.0); + row2.setField(5, "2021-02-01"); + row2.setField(6, "2021-02-01T12:00:00"); + row2.setField(7, "15:00:00"); + row2.setField(8, 392435234); + + // get Session + NebulaPoolConfig poolConfig = new NebulaPoolConfig(); + NebulaPool pool = new NebulaPool(); + + try { + pool.init(Arrays.asList(new HostAddress(ip, 9669)), poolConfig); + session = pool.getSession("root", "nebula", true); + } catch (Exception e) { + LOGGER.error("init nebula pool error, ", e); + assert (false); + } + } + + /** + * test addToBatch for INSERT write mode + */ + @Test + public void testAddToBatchWithInsert() { + ExecutionOptions options = builder + .setGraphSpace("test_int") + .setWriteMode(WriteModeEnum.INSERT) + .builder(); + NebulaVertexBatchExecutor vertexBatchExecutor = + new NebulaVertexBatchExecutor<>(options, VidTypeEnum.INT, schema); + vertexBatchExecutor.addToBatch(row1); + } + + /** + * test addToBatch for INSERT write mode + */ + @Test + public void testAddToBatchWithInsertPolicy() { + ExecutionOptions options = builder + .setGraphSpace("test_int") + .setPolicy("HASH") + .setWriteMode(WriteModeEnum.INSERT) + .builder(); + NebulaVertexBatchExecutor vertexBatchExecutor = + new NebulaVertexBatchExecutor<>(options, VidTypeEnum.INT, schema); + vertexBatchExecutor.addToBatch(row1); + } + + /** + * test addToBatch for UPDATE write mode + */ + @Test + public void testAddToBatchWithUpdate() { + ExecutionOptions options = builder + .setGraphSpace("test_int") + .setWriteMode(WriteModeEnum.UPDATE) + .builder(); + NebulaVertexBatchExecutor vertexBatchExecutor = + new NebulaVertexBatchExecutor<>(options, VidTypeEnum.INT, schema); + vertexBatchExecutor.addToBatch(row1); + } + + /** + * test addToBatch for UPDATE write mode + */ + @Test + public void testAddToBatchWithUpdatePolicy() { + ExecutionOptions options = builder + .setGraphSpace("test_int") + .setPolicy("HASH") + .setWriteMode(WriteModeEnum.UPDATE) + .builder(); + NebulaVertexBatchExecutor vertexBatchExecutor = + new NebulaVertexBatchExecutor<>(options, VidTypeEnum.INT, schema); + vertexBatchExecutor.addToBatch(row1); + } + + /** + * test addToBatch for DELETE write mode + */ + @Test + public void testAddToBatchWithDelete() { + ExecutionOptions options = builder + .setGraphSpace("test_int") + .setWriteMode(WriteModeEnum.DELETE) + .builder(); + NebulaVertexBatchExecutor vertexBatchExecutor = + new NebulaVertexBatchExecutor<>(options, VidTypeEnum.INT, schema); + vertexBatchExecutor.addToBatch(row1); + } + + /** + * test addToBatch for DELETE write mode + */ + @Test + public void testAddToBatchWithDeletePolicy() { + ExecutionOptions options = builder + .setGraphSpace("test_int") + .setPolicy("HASH") + .setWriteMode(WriteModeEnum.DELETE) + .builder(); + NebulaVertexBatchExecutor vertexBatchExecutor = + new NebulaVertexBatchExecutor<>(options, VidTypeEnum.INT, schema); + vertexBatchExecutor.addToBatch(row1); + } + + /** + * test batch execute for int vid and insert mode + */ + @Test + public void testExecuteBatch() { + ExecutionOptions options = builder + .setGraphSpace("test_int") + .setPolicy("HASH") + .setWriteMode(WriteModeEnum.INSERT) + .builder(); + NebulaVertexBatchExecutor vertexBatchExecutor = + new NebulaVertexBatchExecutor<>(options, VidTypeEnum.INT, schema); + vertexBatchExecutor.addToBatch(row1); + vertexBatchExecutor.addToBatch(row2); + + ResultSet resultSet = null; + try { + resultSet = session.execute("USE test_int"); + } catch (IOErrorException e) { + LOGGER.error("switch space error,", e); + assert (false); + } + if (!resultSet.isSucceeded()) { + LOGGER.error("switch space failed,{}", resultSet.getErrorMessage()); + assert (false); + } + + String statement = vertexBatchExecutor.executeBatch(session); + assert (statement == null); + } + + /** + * test batch exeucte for int vid and UPDATE mode + */ + @Test + public void testExecuteBatchWithUpdate() { + ExecutionOptions options = builder + .setGraphSpace("test_int") + .setPolicy("HASH") + .setWriteMode(WriteModeEnum.UPDATE) + .builder(); + NebulaVertexBatchExecutor vertexBatchExecutor = + new NebulaVertexBatchExecutor<>(options, VidTypeEnum.INT, schema); + vertexBatchExecutor.addToBatch(row1); + vertexBatchExecutor.addToBatch(row2); + + ResultSet resultSet = null; + try { + resultSet = session.execute("USE test_int"); + } catch (IOErrorException e) { + LOGGER.error("switch space error,", e); + assert (false); + } + if (!resultSet.isSucceeded()) { + LOGGER.error("switch space failed,{}", resultSet.getErrorMessage()); + assert (false); + } + + String statement = vertexBatchExecutor.executeBatch(session); + assert (statement == null); + } + + /** + * test batch exeucte for int vid and DELETE mode + */ + @Test + public void testExecuteBatchWithDelete() { + ExecutionOptions options = builder.setGraphSpace("test_int") + .setPolicy("HASH") + .setWriteMode(WriteModeEnum.DELETE) + .builder(); + NebulaVertexBatchExecutor vertexBatchExecutor = + new NebulaVertexBatchExecutor<>(options, VidTypeEnum.INT, schema); + vertexBatchExecutor.addToBatch(row1); + vertexBatchExecutor.addToBatch(row2); + + ResultSet resultSet = null; + try { + resultSet = session.execute("USE test_int"); + } catch (IOErrorException e) { + LOGGER.error("switch space error,", e); + assert (false); + } + if (!resultSet.isSucceeded()) { + LOGGER.error("switch space failed,{}", resultSet.getErrorMessage()); + assert (false); + } + + String statement = vertexBatchExecutor.executeBatch(session); + assert (statement == null); + } + + /** + * test batch exeucte for string vid and insert mode + */ + @Test + public void testExecuteBatchWithStringVidAndInsert() { + ExecutionOptions options = builder + .setGraphSpace("test_string") + .setWriteMode(WriteModeEnum.INSERT) + .builder(); + NebulaVertexBatchExecutor vertexBatchExecutor = + new NebulaVertexBatchExecutor<>(options, VidTypeEnum.STRING, schema); + vertexBatchExecutor.addToBatch(row1); + vertexBatchExecutor.addToBatch(row2); + + ResultSet resultSet = null; + try { + resultSet = session.execute("USE test_string"); + } catch (IOErrorException e) { + LOGGER.error("switch space error,", e); + assert (false); + } + if (!resultSet.isSucceeded()) { + LOGGER.error("switch space failed,{}", resultSet.getErrorMessage()); + assert (false); + } + + String statement = vertexBatchExecutor.executeBatch(session); + assert (statement == null); + } + + /** + * test batch execute for string vid and update mode + */ + @Test + public void testExecuteBatchWithStringVidAndUpdate() { + ExecutionOptions options = builder + .setGraphSpace("test_string") + .setWriteMode(WriteModeEnum.UPDATE) + .builder(); + NebulaVertexBatchExecutor vertexBatchExecutor = + new NebulaVertexBatchExecutor<>(options, VidTypeEnum.STRING, schema); + vertexBatchExecutor.addToBatch(row1); + vertexBatchExecutor.addToBatch(row2); + + ResultSet resultSet = null; + try { + resultSet = session.execute("USE test_string"); + } catch (IOErrorException e) { + LOGGER.error("switch space error,", e); + assert (false); + } + if (!resultSet.isSucceeded()) { + LOGGER.error("switch space failed,{}", resultSet.getErrorMessage()); + assert (false); + } + + String statement = vertexBatchExecutor.executeBatch(session); + assert (statement == null); + } + + /** + * test batch execute for string vid and DELETE mode + */ + @Test + public void testExecuteBatchWithStringVidAndDelete() { + ExecutionOptions options = builder + .setGraphSpace("test_string") + .setWriteMode(WriteModeEnum.DELETE) + .builder(); + NebulaVertexBatchExecutor vertexBatchExecutor = + new NebulaVertexBatchExecutor<>(options, VidTypeEnum.STRING, schema); + vertexBatchExecutor.addToBatch(row1); + vertexBatchExecutor.addToBatch(row2); + + ResultSet resultSet = null; + try { + resultSet = session.execute("USE test_string"); + } catch (IOErrorException e) { + LOGGER.error("switch space error,", e); + assert (false); + } + if (!resultSet.isSucceeded()) { + LOGGER.error("switch space failed,{}", resultSet.getErrorMessage()); + assert (false); + } + + String statement = vertexBatchExecutor.executeBatch(session); + assert (statement == null); + } + } diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaEdgesTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaEdgesTest.java index ef58817..459dac4 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaEdgesTest.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaEdgesTest.java @@ -1,8 +1,161 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. +/* Copyright (c) 2021 vesoft inc. All rights reserved. * * This source code is licensed under Apache 2.0 License, * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ - -package org.apache.flink.connector.nebula.utils;public class NebulaEdgesTest { + +package org.apache.flink.connector.nebula.utils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import junit.framework.TestCase; + +public class NebulaEdgesTest extends TestCase { + + List edges = new ArrayList<>(); + String edgeName = "friend"; + List propNames = Arrays.asList( + "col_string", + "col_fixed_string", + "col_bool", + "col_int", + "col_int64", + "col_double", + "col_date"); + List props1 = Arrays.asList("\"Tom\"", "\"Tom\"", "true", "10", "100", "1.0", + "2021-11-12"); + List props2 = Arrays.asList("\"Bob\"", "\"Bob\"", "false", "20", "200", "2.0", + "2021-05-01"); + + + public void testGetInsertStatement() { + edges.add(new NebulaEdge("\"vid1\"", "\"vid2\"", null, props1)); + edges.add(new NebulaEdge("\"vid2\"", "\"vid1\"", null, props2)); + NebulaEdges nebulaEdges = new NebulaEdges(edgeName, propNames, edges, null, null); + String edgeStatement = nebulaEdges.getInsertStatement(); + + String expectStatement = + "INSERT EDGE `friend`(`col_string`,`col_fixed_string`,`col_bool`,`col_int`," + + "`col_int64`,`col_double`,`col_date`) VALUES " + + "\"vid1\"->\"vid2\": (" + String.join(",", props1) + ")," + + "\"vid2\"->\"vid1\": (" + String.join(",", props2) + ")"; + assert (edgeStatement.equals(expectStatement)); + } + + public void testGetInsertStatementWithRank() { + edges.add(new NebulaEdge("\"vid1\"", "\"vid2\"", 1L, props1)); + edges.add(new NebulaEdge("\"vid2\"", "\"vid1\"", 2L, props2)); + NebulaEdges nebulaEdges = new NebulaEdges(edgeName, propNames, edges, null, null); + String edgeStatement = nebulaEdges.getInsertStatement(); + + String expectStatement = + "INSERT EDGE `friend`(`col_string`,`col_fixed_string`,`col_bool`,`col_int`," + + "`col_int64`,`col_double`,`col_date`) VALUES " + + "\"vid1\"->\"vid2\"@1: (" + String.join(",", props1) + ")," + + "\"vid2\"->\"vid1\"@2: (" + String.join(",", props2) + ")"; + assert (edgeStatement.equals(expectStatement)); + } + + public void testGetInsertStatementWithPolicy() { + edges.add(new NebulaEdge("vid1", "vid2", null, props1)); + edges.add(new NebulaEdge("vid2", "vid1", null, props2)); + NebulaEdges nebulaEdges = new NebulaEdges(edgeName, propNames, edges, PolicyEnum.HASH, + PolicyEnum.HASH); + String edgeStatement = nebulaEdges.getInsertStatement(); + + String expectStatement = + "INSERT EDGE `friend`(`col_string`,`col_fixed_string`,`col_bool`,`col_int`," + + "`col_int64`,`col_double`,`col_date`) VALUES " + + "HASH(\"vid1\")->HASH(\"vid2\"): (" + String.join(",", props1) + ")," + + "HASH(\"vid2\")->HASH(\"vid1\"): (" + String.join(",", props2) + ")"; + assert (edgeStatement.equals(expectStatement)); + } + + public void testGetInsertStatementWithRankAndPolicy() { + edges.add(new NebulaEdge("vid1", "vid2", 1L, props1)); + edges.add(new NebulaEdge("vid2", "vid1", 2L, props2)); + NebulaEdges nebulaEdges = new NebulaEdges(edgeName, propNames, edges, PolicyEnum.HASH, + PolicyEnum.HASH); + String edgeStatement = nebulaEdges.getInsertStatement(); + + String expectStatement = + "INSERT EDGE `friend`(`col_string`,`col_fixed_string`,`col_bool`,`col_int`," + + "`col_int64`,`col_double`,`col_date`) VALUES " + + "HASH(\"vid1\")->HASH(\"vid2\")@1: (" + String.join(",", props1) + ")," + + "HASH(\"vid2\")->HASH(\"vid1\")@2: (" + String.join(",", props2) + ")"; + assert (edgeStatement.equals(expectStatement)); + } + + public void testGetUpdateStatement() { + edges.add(new NebulaEdge("\"vid1\"", "\"vid2\"", null, props1)); + edges.add(new NebulaEdge("\"vid2\"", "\"vid1\"", null, props2)); + NebulaEdges nebulaEdges = new NebulaEdges(edgeName, propNames, edges, null, null); + String edgeStatement = nebulaEdges.getUpdateStatement(); + + String expectStatement = + "UPDATE EDGE ON `friend` \"vid1\"->\"vid2\"@0 SET `col_string`=\"Tom\"," + + "`col_fixed_string`=\"Tom\",`col_bool`=true,`col_int`=10,`col_int64`=100," + + "`col_double`=1.0,`col_date`=2021-11-12;" + + "UPDATE EDGE ON `friend` \"vid2\"->\"vid1\"@0 SET `col_string`=\"Bob\"," + + "`col_fixed_string`=\"Bob\",`col_bool`=false,`col_int`=20," + + "`col_int64`=200," + + "`col_double`=2.0,`col_date`=2021-05-01"; + assert (edgeStatement.equals(expectStatement)); + } + + public void testGetUpdateStatementWithRank() { + edges.add(new NebulaEdge("\"vid1\"", "\"vid2\"", 1L, props1)); + edges.add(new NebulaEdge("\"vid2\"", "\"vid1\"", 2L, props2)); + NebulaEdges nebulaEdges = new NebulaEdges(edgeName, propNames, edges, null, null); + String edgeStatement = nebulaEdges.getUpdateStatement(); + + String expectStatement = + "UPDATE EDGE ON `friend` \"vid1\"->\"vid2\"@1 SET `col_string`=\"Tom\"," + + "`col_fixed_string`=\"Tom\",`col_bool`=true,`col_int`=10,`col_int64`=100," + + "`col_double`=1.0,`col_date`=2021-11-12;" + + "UPDATE EDGE ON `friend` \"vid2\"->\"vid1\"@2 SET `col_string`=\"Bob\"," + + "`col_fixed_string`=\"Bob\",`col_bool`=false,`col_int`=20," + + "`col_int64`=200,`col_double`=2.0,`col_date`=2021-05-01"; + assert (edgeStatement.equals(expectStatement)); + } + + public void testGetUpdateStatementWithPolicy() { + edges.add(new NebulaEdge("vid1", "vid2", null, props1)); + edges.add(new NebulaEdge("vid2", "vid1", null, props2)); + NebulaEdges nebulaEdges = new NebulaEdges(edgeName, propNames, edges, PolicyEnum.HASH, + PolicyEnum.HASH); + String edgeStatement = nebulaEdges.getUpdateStatement(); + + String expectStatement = + "UPDATE EDGE ON `friend` HASH(\"vid1\")->HASH(\"vid2\")@0 SET " + + "`col_string`=\"Tom\"," + + "`col_fixed_string`=\"Tom\",`col_bool`=true,`col_int`=10,`col_int64`=100," + + "`col_double`=1.0,`col_date`=2021-11-12;" + + "UPDATE EDGE ON `friend` HASH(\"vid2\")->HASH(\"vid1\")@0 SET " + + "`col_string`=\"Bob\"," + + "`col_fixed_string`=\"Bob\",`col_bool`=false,`col_int`=20," + + "`col_int64`=200,`col_double`=2.0,`col_date`=2021-05-01"; + assert (edgeStatement.equals(expectStatement)); + } + + + public void testGetUpdateStatementWithRankAndPolicy() { + edges.add(new NebulaEdge("vid1", "vid2", 1L, props1)); + edges.add(new NebulaEdge("vid2", "vid1", 2L, props2)); + NebulaEdges nebulaEdges = new NebulaEdges(edgeName, propNames, edges, PolicyEnum.HASH, + PolicyEnum.HASH); + String edgeStatement = nebulaEdges.getUpdateStatement(); + + String expectStatement = + "UPDATE EDGE ON `friend` HASH(\"vid1\")->HASH(\"vid2\")@1 SET " + + "`col_string`=\"Tom\"," + + "`col_fixed_string`=\"Tom\",`col_bool`=true,`col_int`=10,`col_int64`=100," + + "`col_double`=1.0,`col_date`=2021-11-12;" + + "UPDATE EDGE ON `friend` HASH(\"vid2\")->HASH(\"vid1\")@2 SET " + + "`col_string`=\"Bob\"," + + "`col_fixed_string`=\"Bob\",`col_bool`=false,`col_int`=20," + + "`col_int64`=200,`col_double`=2.0,`col_date`=2021-05-01"; + assert (edgeStatement.equals(expectStatement)); + } } diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaVerticesTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaVerticesTest.java index 2d745eb..7cc1a5e 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaVerticesTest.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaVerticesTest.java @@ -1,8 +1,116 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. +/* Copyright (c) 2021 vesoft inc. All rights reserved. * * This source code is licensed under Apache 2.0 License, * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ - -package org.apache.flink.connector.nebula.utils;public class NebulaVerticesTest { + +package org.apache.flink.connector.nebula.utils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import junit.framework.TestCase; + +public class NebulaVerticesTest extends TestCase { + + List vertices = new ArrayList<>(); + String tagName = "person"; + List propNames = Arrays.asList( + "col_string", + "col_fixed_string", + "col_bool", + "col_int", + "col_int64", + "col_double", + "col_date"); + List props1 = Arrays.asList("\"Tom\"", "\"Tom\"", "true", "10", "100", "1.0", + "2021-11-12"); + List props2 = Arrays.asList("\"Bob\"", "\"Bob\"", "false", "20", "200", "2.0", + "2021-05-01"); + + + public void testGetInsertStatement() { + vertices.add(new NebulaVertex("\"vid1\"", props1)); + vertices.add(new NebulaVertex("\"vid2\"", props2)); + + NebulaVertices nebulaVertices = new NebulaVertices(tagName, propNames, vertices, null); + String vertexStatement = nebulaVertices.getInsertStatement(); + + String expectStatement = "INSERT VERTEX `person`(`col_string`,`col_fixed_string`," + + "`col_bool`," + + "`col_int`,`col_int64`,`col_double`,`col_date`) VALUES \"vid1\": (" + + String.join(",", props1) + + "),\"vid2\": (" + String.join(",", props2) + ")"; + assert (expectStatement.equals(vertexStatement)); + } + + + public void testGetInsertStatementWithPolicy() { + vertices.add(new NebulaVertex("vid1", props1)); + vertices.add(new NebulaVertex("vid2", props2)); + + NebulaVertices nebulaVerticesWithPolicy = new NebulaVertices(tagName, propNames, vertices, + PolicyEnum.HASH); + String vertexStatementWithPolicy = nebulaVerticesWithPolicy.getInsertStatement(); + + String expectStatementWithPolicy = "INSERT VERTEX `person`(`col_string`," + + "`col_fixed_string`,`col_bool`," + + "`col_int`,`col_int64`,`col_double`,`col_date`) VALUES HASH(\"vid1\"): (" + + String.join(",", props1) + + "),HASH(\"vid2\"): (" + String.join(",", props2) + ")"; + assert (expectStatementWithPolicy.equals(vertexStatementWithPolicy)); + } + + public void testGetUpdateStatement() { + vertices.add(new NebulaVertex("\"vid1\"", props1)); + vertices.add(new NebulaVertex("\"vid2\"", props2)); + + NebulaVertices nebulaVertices = new NebulaVertices(tagName, propNames, vertices, null); + String vertexStatement = nebulaVertices.getUpdateStatement(); + String expectStatement = "UPDATE VERTEX ON `person` \"vid1\" SET `col_string`=\"Tom\"," + + "`col_fixed_string`=\"Tom\",`col_bool`=true,`col_int`=10,`col_int64`=100," + + "`col_double`=1.0,`col_date`=2021-11-12;" + + "UPDATE VERTEX ON `person` \"vid2\" SET `col_string`=\"Bob\"," + + "`col_fixed_string`=\"Bob\",`col_bool`=false,`col_int`=20,`col_int64`=200," + + "`col_double`=2.0,`col_date`=2021-05-01"; + assert (vertexStatement.equals(expectStatement)); + } + + public void testGetUpdateStatementWithPolicy() { + vertices.add(new NebulaVertex("vid1", props1)); + vertices.add(new NebulaVertex("vid2", props2)); + + NebulaVertices nebulaVertices = new NebulaVertices(tagName, propNames, vertices, + PolicyEnum.HASH); + String vertexStatement = nebulaVertices.getUpdateStatement(); + String expectStatement = "UPDATE VERTEX ON `person` HASH(\"vid1\") SET " + + "`col_string`=\"Tom\",`col_fixed_string`=\"Tom\",`col_bool`=true,`col_int`=10," + + "`col_int64`=100,`col_double`=1.0,`col_date`=2021-11-12;" + + "UPDATE VERTEX ON `person` HASH(\"vid2\") SET `col_string`=\"Bob\"," + + "`col_fixed_string`=\"Bob\",`col_bool`=false,`col_int`=20,`col_int64`=200," + + "`col_double`=2.0,`col_date`=2021-05-01"; + assert (vertexStatement.equals(expectStatement)); + } + + public void testGetDeleteStatement() { + vertices.add(new NebulaVertex("\"vid1\"", props1)); + vertices.add(new NebulaVertex("\"vid2\"", props2)); + + NebulaVertices nebulaVertices = new NebulaVertices(tagName, propNames, vertices, null); + String vertexStatement = nebulaVertices.getDeleteStatement(); + String expectStatement = "DELETE VERTEX \"vid1\",\"vid2\""; + assert (vertexStatement.equals(expectStatement)); + } + + public void testGetDeleteStatementWithPolicy() { + vertices.add(new NebulaVertex("vid1", props1)); + vertices.add(new NebulaVertex("vid2", props2)); + + NebulaVertices nebulaVertices = new NebulaVertices(tagName, propNames, vertices, + PolicyEnum.HASH); + String vertexStatement = nebulaVertices.getDeleteStatement(); + String expectStatement = "DELETE VERTEX HASH(\"vid1\"),HASH(\"vid2\")"; + assert (vertexStatement.equals(expectStatement)); + } + } From b0e64318634f7a579b1e0abbd9a2268464c287e2 Mon Sep 17 00:00:00 2001 From: Nicole00 Date: Thu, 16 Sep 2021 15:59:42 +0800 Subject: [PATCH 4/6] add example for UPDATE and DELETE write mode --- .../apache/flink/FlinkConnectorExample.java | 126 +++++++++++++++--- 1 file changed, 111 insertions(+), 15 deletions(-) diff --git a/example/src/main/java/org/apache/flink/FlinkConnectorExample.java b/example/src/main/java/org/apache/flink/FlinkConnectorExample.java index 5be800c..4ca155d 100644 --- a/example/src/main/java/org/apache/flink/FlinkConnectorExample.java +++ b/example/src/main/java/org/apache/flink/FlinkConnectorExample.java @@ -17,12 +17,16 @@ import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions; import org.apache.flink.connector.nebula.statement.ExecutionOptions; import org.apache.flink.connector.nebula.statement.VertexExecutionOptions; +import org.apache.flink.connector.nebula.utils.WriteModeEnum; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.types.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * make sure your nebula graph has create Space + */ public class FlinkConnectorExample { private static final Logger LOG = LoggerFactory.getLogger(FlinkConnectorExample.class); @@ -30,6 +34,8 @@ public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream> playerSource = constructVertexSourceData(env); sinkVertexData(env, playerSource); + updateVertexData(env, playerSource); + deleteVertexData(env, playerSource); DataStream> friendSource = constructEdgeSourceData(env); sinkEdgeData(env, friendSource); } @@ -40,14 +46,14 @@ public static void main(String[] args) { public static DataStream> constructVertexSourceData( StreamExecutionEnvironment env) { List> player = new ArrayList<>(); - List fields1 = Arrays.asList("15", "Bob", "18"); - List fields2 = Arrays.asList("16", "Tina", "19"); - List fields3 = Arrays.asList("17", "Jena", "20"); - List fields4 = Arrays.asList("18", "Tom", "20"); - List fields5 = Arrays.asList("19", "Viki", "55"); - List fields6 = Arrays.asList("20", "Jime", "32"); - List fields7 = Arrays.asList("21", "Jhon", "76"); - List fields8 = Arrays.asList("22", "Crea", "10"); + List fields1 = Arrays.asList("15", "Bob", "38"); + List fields2 = Arrays.asList("16", "Tina", "39"); + List fields3 = Arrays.asList("17", "Jena", "30"); + List fields4 = Arrays.asList("18", "Tom", "30"); + List fields5 = Arrays.asList("19", "Viki", "35"); + List fields6 = Arrays.asList("20", "Jime", "33"); + List fields7 = Arrays.asList("21", "Jhon", "36"); + List fields8 = Arrays.asList("22", "Crea", "30"); player.add(fields1); player.add(fields2); player.add(fields3); @@ -61,15 +67,15 @@ public static DataStream> constructVertexSourceData( } /** - * sink Nebula Graph + * sink Nebula Graph with default INSERT mode */ public static void sinkVertexData(StreamExecutionEnvironment env, DataStream> playerSource) { NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder() - .setGraphAddress("127.0.0.1:3699") - .setMetaAddress("127.0.0.1:45500") - .build(); + .setGraphAddress("127.0.0.1:9669") + .setMetaAddress("127.0.0.1:9559") + .build(); NebulaGraphConnectionProvider graphConnectionProvider = new NebulaGraphConnectionProvider(nebulaClientOptions); NebulaMetaConnectionProvider metaConnectionProvider = @@ -104,6 +110,96 @@ public static void sinkVertexData(StreamExecutionEnvironment env, } } + /** + * sink Nebula Graph with UPDATE mode + */ + public static void updateVertexData(StreamExecutionEnvironment env, + DataStream> playerSource) { + NebulaClientOptions nebulaClientOptions = + new NebulaClientOptions.NebulaClientOptionsBuilder() + .setGraphAddress("127.0.0.1:9669") + .setMetaAddress("127.0.0.1:9559") + .build(); + NebulaGraphConnectionProvider graphConnectionProvider = + new NebulaGraphConnectionProvider(nebulaClientOptions); + NebulaMetaConnectionProvider metaConnectionProvider = + new NebulaMetaConnectionProvider(nebulaClientOptions); + + ExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder() + .setGraphSpace("flinkSink") + .setTag("player") + .setIdIndex(0) + .setFields(Arrays.asList("name", "age")) + .setPositions(Arrays.asList(1, 2)) + .setWriteMode(WriteModeEnum.UPDATE) + .setBatch(2) + .builder(); + + NebulaBatchOutputFormat outPutFormat = + new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider) + .setExecutionOptions(executionOptions); + NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat); + DataStream dataStream = playerSource.map(row -> { + org.apache.flink.types.Row record = new org.apache.flink.types.Row(row.size()); + for (int i = 0; i < row.size(); i++) { + record.setField(i, row.get(i)); + } + return record; + }); + dataStream.addSink(nebulaSinkFunction); + try { + env.execute("Update Nebula Vertex"); + } catch (Exception e) { + LOG.error("error when update Nebula Graph Vertex, ", e); + System.exit(-1); + } + } + + /** + * sink Nebula Graph with DELETE mode + */ + public static void deleteVertexData(StreamExecutionEnvironment env, + DataStream> playerSource) { + NebulaClientOptions nebulaClientOptions = + new NebulaClientOptions.NebulaClientOptionsBuilder() + .setGraphAddress("127.0.0.1:9669") + .setMetaAddress("127.0.0.1:9559") + .build(); + NebulaGraphConnectionProvider graphConnectionProvider = + new NebulaGraphConnectionProvider(nebulaClientOptions); + NebulaMetaConnectionProvider metaConnectionProvider = + new NebulaMetaConnectionProvider(nebulaClientOptions); + + ExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder() + .setGraphSpace("flinkSink") + .setTag("player") + .setIdIndex(0) + .setFields(Arrays.asList("name", "age")) + .setPositions(Arrays.asList(1, 2)) + .setWriteMode(WriteModeEnum.DELETE) + .setBatch(2) + .builder(); + + NebulaBatchOutputFormat outPutFormat = + new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider) + .setExecutionOptions(executionOptions); + NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat); + DataStream dataStream = playerSource.map(row -> { + org.apache.flink.types.Row record = new org.apache.flink.types.Row(row.size()); + for (int i = 0; i < row.size(); i++) { + record.setField(i, row.get(i)); + } + return record; + }); + dataStream.addSink(nebulaSinkFunction); + try { + env.execute("Update Nebula Vertex"); + } catch (Exception e) { + LOG.error("error when update Nebula Graph Vertex, ", e); + System.exit(-1); + } + } + /** * construct flink data source @@ -133,9 +229,9 @@ public static void sinkEdgeData(StreamExecutionEnvironment env, DataStream> playerSource) { NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder() - .setGraphAddress("127.0.0.1:3699") - .setMetaAddress("127.0.0.1:45500") - .build(); + .setGraphAddress("127.0.0.1:9669") + .setMetaAddress("127.0.0.1:9559") + .build(); NebulaGraphConnectionProvider graphConnectionProvider = new NebulaGraphConnectionProvider(nebulaClientOptions); NebulaMetaConnectionProvider metaConnectionProvider = From 9bb1cbd07b9c4fb6dc787e19ea3c921ea833284d Mon Sep 17 00:00:00 2001 From: Nicole00 Date: Thu, 16 Sep 2021 16:19:44 +0800 Subject: [PATCH 5/6] fix test --- .../connector/nebula/sink/NebulaEdgeBatchExecutorTest.java | 2 ++ .../nebula/sink/NebulaOutputFormatConverterTest.java | 4 ++-- .../connector/nebula/sink/NebulaVertexBatchExecutorTest.java | 2 ++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaEdgeBatchExecutorTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaEdgeBatchExecutorTest.java index 34893be..d17c392 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaEdgeBatchExecutorTest.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaEdgeBatchExecutorTest.java @@ -218,6 +218,7 @@ public void testExecuteBatch() { */ @Test public void testExecuteBatchWithUpdate() { + testExecuteBatch(); ExecutionOptions options = builder .setGraphSpace("test_int") .setPolicy("HASH") @@ -309,6 +310,7 @@ public void testExecuteBatchWithStringVidAndInsert() { */ @Test public void testExecuteBatchWithStringVidAndUpdate() { + testExecuteBatchWithStringVidAndInsert(); ExecutionOptions options = builder .setGraphSpace("test_string") .setWriteMode(WriteModeEnum.UPDATE) diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaOutputFormatConverterTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaOutputFormatConverterTest.java index 979a5ab..8e61986 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaOutputFormatConverterTest.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaOutputFormatConverterTest.java @@ -180,8 +180,8 @@ public void testIntVidEdge() { schema); NebulaEdge edge = helper.createEdge(row, PolicyEnum.HASH); - assert (edge.getSource().equals("HASH(\"Tom\")")); - assert (edge.getTarget().equals("HASH(\"Jena\")")); + assert (edge.getSource().equals("Tom")); + assert (edge.getTarget().equals("Jena")); assert (edge.getRank() == null); assert (edge.getPropValues().size() == 1); assert (edge.getPropValuesString().equals("12.0")); diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaVertexBatchExecutorTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaVertexBatchExecutorTest.java index 052fb6c..214cb6f 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaVertexBatchExecutorTest.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/sink/NebulaVertexBatchExecutorTest.java @@ -214,6 +214,7 @@ public void testExecuteBatch() { */ @Test public void testExecuteBatchWithUpdate() { + testExecuteBatch(); ExecutionOptions options = builder .setGraphSpace("test_int") .setPolicy("HASH") @@ -305,6 +306,7 @@ public void testExecuteBatchWithStringVidAndInsert() { */ @Test public void testExecuteBatchWithStringVidAndUpdate() { + testExecuteBatchWithStringVidAndInsert(); ExecutionOptions options = builder .setGraphSpace("test_string") .setWriteMode(WriteModeEnum.UPDATE) From 464586f6df6f3eb45f60836f8afa93939c14065b Mon Sep 17 00:00:00 2001 From: Nicole00 Date: Thu, 14 Oct 2021 13:48:17 +0800 Subject: [PATCH 6/6] add error log --- .../connector/nebula/sink/NebulaEdgeBatchExecutor.java | 2 +- .../nebula/sink/NebulaRowEdgeOutputFormatConverter.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java index ef25f65..9c98e70 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java @@ -63,7 +63,7 @@ String executeBatch(Session session) { statement = nebulaEdges.getDeleteStatement(); break; default: - throw new IllegalArgumentException("write mode {} is not supported"); + throw new IllegalArgumentException("write mode is not supported"); } LOG.debug("write statement={}", statement); diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaRowEdgeOutputFormatConverter.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaRowEdgeOutputFormatConverter.java index df07820..b51e3f6 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaRowEdgeOutputFormatConverter.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaRowEdgeOutputFormatConverter.java @@ -56,6 +56,7 @@ public NebulaEdge createEdge(Row row, PolicyEnum policy) { Object srcId = row.getField(srcIdIndex); Object dstId = row.getField(dstIdIndex); if (srcId == null || dstId == null) { + Log.error("null srcId or dstId"); return null; } // extract edge properties