From b6ab39f8e2362c93cbad319916632cfb8ee5cb02 Mon Sep 17 00:00:00 2001
From: Anqi <16240361+Nicole00@users.noreply.github.com>
Date: Sat, 10 Oct 2020 17:06:09 +0800
Subject: [PATCH] Support nebula-spark reader v1.0 (#155)
* Support nebula-spark reader v1.0
---
.gitignore | 1 +
client/pom.xml | 2 +-
tools/nebula-spark/pom.xml | 80 ++++++++++++-
.../com/vesoft/nebula/bean/ConnectInfo.java | 50 ---------
.../vesoft/nebula/bean/DataSourceConfig.java | 92 +++++++++++++++
.../com/vesoft/nebula/bean/Parameters.java | 17 +++
.../java/com/vesoft/nebula/bean/ScanInfo.java | 74 ------------
.../com/vesoft/nebula/common/Checkable.java | 13 ---
.../java/com/vesoft/nebula/common/Type.java | 2 +-
.../nebula/example/NebulaReaderExample.java | 30 +++--
.../nebula/reader/AbstractNebulaIterator.java | 48 +++-----
.../vesoft/nebula/reader/DefaultSource.java | 77 +++++++++++++
.../nebula/reader/NebulaDataSource.java | 47 --------
.../vesoft/nebula/reader/NebulaPartition.java | 9 +-
.../com/vesoft/nebula/reader/NebulaRDD.java | 39 ++++---
.../vesoft/nebula/reader/NebulaRelation.java | 106 +++++++++++-------
.../nebula/reader/ScanEdgeIterator.java | 64 ++++++-----
.../nebula/reader/ScanVertexIterator.java | 62 +++++-----
.../vesoft/nebula/util/DataTypeConverter.java | 18 +++
...pache.spark.sql.sources.DataSourceRegister | 2 +
20 files changed, 471 insertions(+), 362 deletions(-)
delete mode 100644 tools/nebula-spark/src/main/java/com/vesoft/nebula/bean/ConnectInfo.java
create mode 100644 tools/nebula-spark/src/main/java/com/vesoft/nebula/bean/DataSourceConfig.java
create mode 100644 tools/nebula-spark/src/main/java/com/vesoft/nebula/bean/Parameters.java
delete mode 100644 tools/nebula-spark/src/main/java/com/vesoft/nebula/bean/ScanInfo.java
delete mode 100644 tools/nebula-spark/src/main/java/com/vesoft/nebula/common/Checkable.java
create mode 100644 tools/nebula-spark/src/main/java/com/vesoft/nebula/reader/DefaultSource.java
delete mode 100644 tools/nebula-spark/src/main/java/com/vesoft/nebula/reader/NebulaDataSource.java
create mode 100644 tools/nebula-spark/src/main/java/com/vesoft/nebula/util/DataTypeConverter.java
create mode 100644 tools/nebula-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
diff --git a/.gitignore b/.gitignore
index 06163b7b5..84e7a6bca 100644
--- a/.gitignore
+++ b/.gitignore
@@ -33,3 +33,4 @@ target/
spark-importer.ipr
spark-importer.iws
+.DS_Store
diff --git a/client/pom.xml b/client/pom.xml
index b26194fad..0dbb9d696 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -174,7 +174,7 @@
org.codehaus.mojo
findbugs-maven-plugin
- 3.0.6-SNAPSHOT
+ 3.0.4
High
Default
diff --git a/tools/nebula-spark/pom.xml b/tools/nebula-spark/pom.xml
index 9aa7beb3f..ff6238fc3 100644
--- a/tools/nebula-spark/pom.xml
+++ b/tools/nebula-spark/pom.xml
@@ -21,17 +21,17 @@
org.apache.spark
- spark-core_2.12
+ spark-core_2.11
${spark.version}
org.apache.spark
- spark-graphx_2.12
+ spark-sql_2.11
${spark.version}
org.apache.spark
- spark-sql_2.12
+ spark-graphx_2.11
${spark.version}
@@ -43,6 +43,21 @@
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+ 3.2.0
+
+
+
+ test-jar
+
+
+
+
+
+
org.apache.maven.plugins
maven-compiler-plugin
@@ -51,6 +66,65 @@
${compiler.target.version}
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.2.1
+
+
+ package
+
+ shade
+
+
+ false
+
+
+ org.apache.spark:*
+ org.apache.hadoop:*
+ org.apache.hive:*
+ log4j:log4j
+ org.apache.orc:*
+ xml-apis:xml-apis
+ javax.inject:javax.inject
+ org.spark-project.hive:hive-exec
+ stax:stax-api
+ org.glassfish.hk2.external:aopalliance-repackaged
+
+
+
+
+ *:*
+
+ com/vesoft/tools/**
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+ 3.2.2
+
+
+
+ compile
+ testCompile
+
+
+
+
+
\ No newline at end of file
diff --git a/tools/nebula-spark/src/main/java/com/vesoft/nebula/bean/ConnectInfo.java b/tools/nebula-spark/src/main/java/com/vesoft/nebula/bean/ConnectInfo.java
deleted file mode 100644
index 43cea8cde..000000000
--- a/tools/nebula-spark/src/main/java/com/vesoft/nebula/bean/ConnectInfo.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/* Copyright (c) 2020 vesoft inc. All rights reserved.
- *
- * This source code is licensed under Apache 2.0 License,
- * attached with Common Clause Condition 1.0, found in the LICENSES directory.
- */
-
-package com.vesoft.nebula.bean;
-
-import com.google.common.base.Preconditions;
-import com.google.common.net.HostAndPort;
-import com.vesoft.nebula.common.Checkable;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.commons.lang.StringUtils;
-
-public class ConnectInfo implements Checkable, Serializable {
-
- private String spaceName;
-
- private String hostAndPorts;
-
- public ConnectInfo(String spaceName, String hostAndPorts) {
- this.spaceName = spaceName;
- this.hostAndPorts = hostAndPorts;
- check();
- }
-
- public String getSpaceName() {
- return this.spaceName;
- }
-
- public List getHostAndPorts() {
- List hostAndPortList = new ArrayList<>();
- String[] hostAndPortArray = hostAndPorts.split(",");
- for (String hostAndPort : hostAndPortArray) {
- hostAndPortList.add(HostAndPort.fromString(hostAndPort));
- }
- return hostAndPortList;
- }
-
- @Override
- public void check() throws IllegalArgumentException {
- Preconditions.checkArgument(StringUtils.isNotEmpty(spaceName),
- "The spaceName can't be null or empty");
-
- Preconditions.checkArgument(StringUtils.isNotEmpty(hostAndPorts),
- "The hostAndPorts can't be null or empty");
- }
-}
diff --git a/tools/nebula-spark/src/main/java/com/vesoft/nebula/bean/DataSourceConfig.java b/tools/nebula-spark/src/main/java/com/vesoft/nebula/bean/DataSourceConfig.java
new file mode 100644
index 000000000..218a53964
--- /dev/null
+++ b/tools/nebula-spark/src/main/java/com/vesoft/nebula/bean/DataSourceConfig.java
@@ -0,0 +1,92 @@
+/* Copyright (c) 2020 vesoft inc. All rights reserved.
+ *
+ * This source code is licensed under Apache 2.0 License,
+ * attached with Common Clause Condition 1.0, found in the LICENSES directory.
+ */
+
+package com.vesoft.nebula.bean;
+
+import com.google.common.net.HostAndPort;
+import org.apache.commons.lang.StringUtils;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+
+public class DataSourceConfig implements Serializable {
+
+ private final String nameSpace;
+
+ private final String type;
+
+ private final String label;
+
+ private final String returnColString;
+
+ private boolean allCols = false;
+
+ private final int partitionNumber;
+
+ private final String hostAndPorts;
+
+ /**
+ * @param nameSpace nameSpace
+ * @param type scan element type
+ * @param label vertex or edge label
+ * @param partitionNumber partition number
+ * @param returnColString scan col string example: name,age
+ * @param hostAndPorts host and port
+ */
+ public DataSourceConfig(String nameSpace, String type, String label, String returnColString, int partitionNumber, String hostAndPorts) {
+ this.nameSpace = nameSpace;
+ this.type = type;
+ this.label = label;
+ this.returnColString = returnColString;
+ this.partitionNumber = partitionNumber;
+ this.hostAndPorts = hostAndPorts;
+ }
+
+ public String getNameSpace() {
+ return nameSpace;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public String getLabel() {
+ return label;
+ }
+
+ public int getPartitionNumber() {
+ return partitionNumber;
+ }
+
+ public boolean getAllCols() {
+ return allCols;
+ }
+
+ public Map> getReturnColMap() {
+ Map> result = new HashMap<>(1);
+ if (StringUtils.isBlank(returnColString)) {
+ allCols = true;
+ result.put(label, new ArrayList<>());
+ } else {
+ List properties = Arrays.asList(returnColString.split(","));
+ result.put(label, properties);
+ }
+ return result;
+ }
+
+ public List getHostAndPorts() {
+ List hostAndPortList = new ArrayList<>();
+ String[] hostAndPortArray = hostAndPorts.split(",");
+ for (String hostAndPort : hostAndPortArray) {
+ hostAndPortList.add(HostAndPort.fromString(hostAndPort));
+ }
+ return hostAndPortList;
+ }
+}
diff --git a/tools/nebula-spark/src/main/java/com/vesoft/nebula/bean/Parameters.java b/tools/nebula-spark/src/main/java/com/vesoft/nebula/bean/Parameters.java
new file mode 100644
index 000000000..8cfeb953e
--- /dev/null
+++ b/tools/nebula-spark/src/main/java/com/vesoft/nebula/bean/Parameters.java
@@ -0,0 +1,17 @@
+/* Copyright (c) 2020 vesoft inc. All rights reserved.
+ *
+ * This source code is licensed under Apache 2.0 License,
+ * attached with Common Clause Condition 1.0, found in the LICENSES directory.
+ */
+
+package com.vesoft.nebula.bean;
+
+public class Parameters {
+
+ public static final String SPACE_NAME = "spaceName";
+ public static final String LABEL = "label";
+ public static final String TYPE = "type";
+ public static final String HOST_AND_PORTS = "hostAndPorts";
+ public static final String RETURN_COLS = "returnCols";
+ public static final String PARTITION_NUMBER = "partitionNumber";
+}
diff --git a/tools/nebula-spark/src/main/java/com/vesoft/nebula/bean/ScanInfo.java b/tools/nebula-spark/src/main/java/com/vesoft/nebula/bean/ScanInfo.java
deleted file mode 100644
index 956cc765f..000000000
--- a/tools/nebula-spark/src/main/java/com/vesoft/nebula/bean/ScanInfo.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/* Copyright (c) 2020 vesoft inc. All rights reserved.
- *
- * This source code is licensed under Apache 2.0 License,
- * attached with Common Clause Condition 1.0, found in the LICENSES directory.
- */
-
-package com.vesoft.nebula.bean;
-
-import com.google.common.base.Preconditions;
-import com.vesoft.nebula.common.Type;
-import com.vesoft.nebula.common.Checkable;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Pattern;
-
-public class ScanInfo implements Checkable, Serializable {
-
- private String scanType;
-
- private String returnColString;
-
- private int partitionNumber;
-
- private static final String RETURN_COL_REGEX = "(\\w+=(\\w+)(,\\w+)*)(;\\w+=(\\w+)(,\\w+)*)*";
-
- /**
- * @param scanType scan element type
- * @param returnColString return col string example: labela=name,age;labelb=name
- */
- public ScanInfo(String scanType, String returnColString, int partitionNumber) {
- this.scanType = scanType;
- this.returnColString = returnColString;
- this.partitionNumber = partitionNumber;
- }
-
- @Override
- public void check() throws IllegalArgumentException {
- boolean isLegalType = Type.VERTEX.getType().equalsIgnoreCase(scanType)
- || Type.EDGE.getType().equalsIgnoreCase(scanType);
- Preconditions.checkArgument(isLegalType,
- "scan type '%s' is illegal, it should be '%s' or '%s'",
- scanType, Type.VERTEX.getType(), Type.EDGE.getType());
-
- boolean isReturnColLegal = Pattern.matches(RETURN_COL_REGEX, returnColString);
- Preconditions.checkArgument(isReturnColLegal,
- "return col string '%s' is illegal, the pattern should like a=b,c;d=e",
- returnColString);
- }
-
- public String getScanType() {
- return scanType;
- }
-
- public int getPartitionNumber() {
- return partitionNumber;
- }
-
- public Map> getReturnColMap() {
- check();
-
- Map> result = new HashMap<>();
- String[] returnColSplits = returnColString.split(";");
- for (String returnColSplit : returnColSplits) {
- String[] labelPropertyMap = returnColSplit.split("=");
- String label = labelPropertyMap[0];
- List properties = Arrays.asList(labelPropertyMap[1].split(","));
- result.put(label, properties);
- }
- return result;
- }
-}
diff --git a/tools/nebula-spark/src/main/java/com/vesoft/nebula/common/Checkable.java b/tools/nebula-spark/src/main/java/com/vesoft/nebula/common/Checkable.java
deleted file mode 100644
index bb53a04cf..000000000
--- a/tools/nebula-spark/src/main/java/com/vesoft/nebula/common/Checkable.java
+++ /dev/null
@@ -1,13 +0,0 @@
-/* Copyright (c) 2020 vesoft inc. All rights reserved.
- *
- * This source code is licensed under Apache 2.0 License,
- * attached with Common Clause Condition 1.0, found in the LICENSES directory.
- */
-
-package com.vesoft.nebula.common;
-
-public interface Checkable {
-
- void check() throws IllegalArgumentException;
-
-}
diff --git a/tools/nebula-spark/src/main/java/com/vesoft/nebula/common/Type.java b/tools/nebula-spark/src/main/java/com/vesoft/nebula/common/Type.java
index 7a2cb3e8e..4b891b8a6 100644
--- a/tools/nebula-spark/src/main/java/com/vesoft/nebula/common/Type.java
+++ b/tools/nebula-spark/src/main/java/com/vesoft/nebula/common/Type.java
@@ -8,7 +8,7 @@
public enum Type {
- VERTEX("VERTEX"),EDGE("EDGE");
+ VERTEX("VERTEX"), EDGE("EDGE");
private String type;
diff --git a/tools/nebula-spark/src/main/java/com/vesoft/nebula/example/NebulaReaderExample.java b/tools/nebula-spark/src/main/java/com/vesoft/nebula/example/NebulaReaderExample.java
index af92f0dd9..524084170 100644
--- a/tools/nebula-spark/src/main/java/com/vesoft/nebula/example/NebulaReaderExample.java
+++ b/tools/nebula-spark/src/main/java/com/vesoft/nebula/example/NebulaReaderExample.java
@@ -7,6 +7,7 @@
package com.vesoft.nebula.example;
import com.facebook.thrift.protocol.TCompactProtocol;
+import com.vesoft.nebula.bean.Parameters;
import com.vesoft.nebula.common.Type;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
@@ -31,30 +32,35 @@ public static void main(String[] args) {
Dataset vertexDataset = sparkSession
.read()
- .format("com.vesoft.nebula.reader.NebulaDataSource")
- .option("importType", Type.VERTEX.getType())
- .option("hostAndPorts", "127.0.0.1:45500")
- .option("spaceName", "test")
- .option("returnCols", "course=name;building=name;student=name")
+ .format("nebula")
+ .option(Parameters.TYPE, Type.VERTEX.getType())
+ .option(Parameters.HOST_AND_PORTS, "127.0.0.1:45500")
+ .option(Parameters.PARTITION_NUMBER, "100")
+ .option(Parameters.SPACE_NAME, "nb")
+ .option(Parameters.LABEL, "player")
+ // if configuration "returnCols" is null or "", then return all cols
+ .option(Parameters.RETURN_COLS, "")
.load();
- LOGGER.info("vertex schema: ");
+ LOGGER.info("vertex course schema: ");
vertexDataset.printSchema();
vertexDataset.show();
Dataset edgeDataset = sparkSession
.read()
- .format("com.vesoft.nebula.reader.NebulaDataSource")
- .option("importType", Type.EDGE.getType())
- .option("hostAndPorts", "127.0.0.1:45500")
- .option("spaceName", "test")
- .option("returnCols", "like=likeness;select=grade")
+ .format("nebula")
+ .option(Parameters.TYPE, Type.EDGE.getType())
+ .option(Parameters.HOST_AND_PORTS, "127.0.0.1:45500")
+ .option(Parameters.PARTITION_NUMBER, "100")
+ .option(Parameters.SPACE_NAME, "nb")
+ .option(Parameters.LABEL, "serve")
+ // if configuration "returnCols" is null or "", then return all cols
+ .option(Parameters.RETURN_COLS, "")
.load();
LOGGER.info("edge schema: ");
edgeDataset.printSchema();
edgeDataset.show();
LOGGER.info("vertex count: {}, edge count: {}", vertexDataset.count(), edgeDataset.count());
-
}
}
diff --git a/tools/nebula-spark/src/main/java/com/vesoft/nebula/reader/AbstractNebulaIterator.java b/tools/nebula-spark/src/main/java/com/vesoft/nebula/reader/AbstractNebulaIterator.java
index 70e621ecb..a95eb2389 100644
--- a/tools/nebula-spark/src/main/java/com/vesoft/nebula/reader/AbstractNebulaIterator.java
+++ b/tools/nebula-spark/src/main/java/com/vesoft/nebula/reader/AbstractNebulaIterator.java
@@ -7,18 +7,15 @@
package com.vesoft.nebula.reader;
import com.facebook.thrift.TException;
-import com.vesoft.nebula.bean.ConnectInfo;
-import com.vesoft.nebula.bean.ScanInfo;
+import com.vesoft.nebula.bean.DataSourceConfig;
import com.vesoft.nebula.exception.GraphConnectException;
import com.vesoft.nebula.client.meta.MetaClientImpl;
import com.vesoft.nebula.client.storage.StorageClientImpl;
import com.vesoft.nebula.client.storage.processor.Processor;
import com.vesoft.nebula.data.Result;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+
+import java.util.*;
+
import org.apache.spark.Partition;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
@@ -30,34 +27,21 @@ public abstract class AbstractNebulaIterator extends AbstractIterator {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNebulaIterator.class);
- protected Iterator dataIterator;
+ protected Iterator dataIterator;
protected Iterator scanPartIterator;
+ protected Map> resultValues = new HashMap<>();
protected StorageClientImpl storageClient;
protected MetaClientImpl metaClient;
protected Processor processor;
- protected ConnectInfo connectInfo;
protected Map> returnCols;
- protected Map> labelPropIndexMap;
- protected int propSize;
- public AbstractNebulaIterator(ConnectInfo connectInfo, Partition split,
- ScanInfo scanInfo, Map propIndexMap) {
- this.propSize = propIndexMap.size();
- this.connectInfo = connectInfo;
- this.returnCols = scanInfo.getReturnColMap();
- this.labelPropIndexMap = new HashMap<>();
- for (Map.Entry> colEntry : returnCols.entrySet()) {
- List colIndexs = new ArrayList<>();
- List propNames = colEntry.getValue();
- for (String propName : propNames) {
- colIndexs.add(propIndexMap.get(propName));
- }
- labelPropIndexMap.put(colEntry.getKey(), colIndexs);
- }
- LOGGER.info("labelPropIndexMap: {}", labelPropIndexMap);
- metaClient = new MetaClientImpl(connectInfo.getHostAndPorts());
+ public AbstractNebulaIterator(Partition split,
+ DataSourceConfig dataSourceConfig) {
+ this.returnCols = dataSourceConfig.getReturnColMap();
+
+ metaClient = new MetaClientImpl(dataSourceConfig.getHostAndPorts());
try {
metaClient.connect();
} catch (TException e) {
@@ -66,11 +50,11 @@ public AbstractNebulaIterator(ConnectInfo connectInfo, Partition split,
storageClient = new StorageClientImpl(metaClient);
// allocate scanPart to this partition
- int totalPart = metaClient.getPartsAlloc(connectInfo.getSpaceName()).size();
+ int totalPart = metaClient.getPartsAlloc(dataSourceConfig.getNameSpace()).size();
NebulaPartition nebulaPartition = (NebulaPartition) split;
List scanParts = nebulaPartition.getScanParts(totalPart,
- scanInfo.getPartitionNumber());
- LOGGER.info("partition index: {}, scanPart: {}", split.index(), scanParts);
+ dataSourceConfig.getPartitionNumber());
+ LOGGER.info("partition index: {}, scanPart: {}", split.index(), scanParts.toString());
scanPartIterator = scanParts.iterator();
}
@@ -79,8 +63,8 @@ public AbstractNebulaIterator(ConnectInfo connectInfo, Partition split,
@Override
public Row next() {
- return RowFactory.create(dataIterator.next());
+ return RowFactory.create(resultValues.get(dataIterator.next()).toArray());
}
- protected abstract Iterator process(Result result);
+ protected abstract Iterator process(Result result);
}
diff --git a/tools/nebula-spark/src/main/java/com/vesoft/nebula/reader/DefaultSource.java b/tools/nebula-spark/src/main/java/com/vesoft/nebula/reader/DefaultSource.java
new file mode 100644
index 000000000..e8a50a800
--- /dev/null
+++ b/tools/nebula-spark/src/main/java/com/vesoft/nebula/reader/DefaultSource.java
@@ -0,0 +1,77 @@
+/* Copyright (c) 2020 vesoft inc. All rights reserved.
+ *
+ * This source code is licensed under Apache 2.0 License,
+ * attached with Common Clause Condition 1.0, found in the LICENSES directory.
+ */
+
+package com.vesoft.nebula.reader;
+
+import com.google.common.base.Preconditions;
+import com.vesoft.nebula.bean.Parameters;
+import com.vesoft.nebula.bean.DataSourceConfig;
+import com.vesoft.nebula.common.Type;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.sources.BaseRelation;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.sources.RelationProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.immutable.Map;
+
+import java.util.regex.Pattern;
+
+public class DefaultSource implements RelationProvider, DataSourceRegister {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSource.class);
+ private static final String RETURN_COL_REGEX = "(\\w+)(,\\w+)*";
+
+ private DataSourceConfig dataSourceConfig;
+
+ @Override
+ public String shortName() {
+ return "nebula";
+ }
+
+ @Override
+ public BaseRelation createRelation(SQLContext sqlContext, Map parameters) {
+ // check and parse parameter spaceName
+ Preconditions.checkArgument(parameters.get(Parameters.SPACE_NAME).isDefined(), "spaceName is not configured.");
+ String spaceName = parameters.get(Parameters.SPACE_NAME).get();
+
+ // check and parse parameter type
+ Preconditions.checkArgument(parameters.get(Parameters.TYPE).isDefined(), "type is not configured.");
+
+ String type = parameters.get(Parameters.TYPE).get();
+ Preconditions.checkArgument(type.equalsIgnoreCase(Type.EDGE.getType())
+ || type.equalsIgnoreCase(Type.VERTEX.getType()),
+ "type '%s' is illegal, it should be '%s' or '%s'",
+ type, Type.VERTEX.getType(), Type.EDGE.getType());
+
+ // check and parse parameter label
+ Preconditions.checkArgument(parameters.get(Parameters.LABEL).isDefined(), "label is not configured.");
+ String label = parameters.get(Parameters.LABEL).get();
+
+ // check and parse parameter hostAndPorts
+ Preconditions.checkArgument(parameters.get(Parameters.HOST_AND_PORTS).isDefined(), "hostAndPorts is not configured.");
+ String hostAndPorts = parameters.get(Parameters.HOST_AND_PORTS).get();
+
+ // check and parse parameter returnCols
+ Preconditions.checkArgument(parameters.get(Parameters.RETURN_COLS).isDefined(), "returnCols is not configured.");
+ String returnCols = parameters.get(Parameters.RETURN_COLS).get();
+ boolean isReturnColLegal = StringUtils.isBlank(returnCols) || Pattern.matches(RETURN_COL_REGEX, returnCols);
+ Preconditions.checkArgument(isReturnColLegal,
+ "returnCols '%s' is illegal, the pattern should be blank or string like a,b",
+ returnCols);
+
+ // check and parse parameter partitionNumber
+ Preconditions.checkArgument(parameters.get(Parameters.PARTITION_NUMBER).isDefined(), "partition is not configured.");
+ String partitionNumber = parameters.get(Parameters.PARTITION_NUMBER).get();
+
+ dataSourceConfig = new DataSourceConfig(spaceName, type,
+ label, returnCols, Integer.parseInt(partitionNumber), hostAndPorts);
+ LOGGER.info("dataSourceConfig: {}", dataSourceConfig);
+
+ return new NebulaRelation(sqlContext, dataSourceConfig);
+ }
+}
diff --git a/tools/nebula-spark/src/main/java/com/vesoft/nebula/reader/NebulaDataSource.java b/tools/nebula-spark/src/main/java/com/vesoft/nebula/reader/NebulaDataSource.java
deleted file mode 100644
index 1e766b5e9..000000000
--- a/tools/nebula-spark/src/main/java/com/vesoft/nebula/reader/NebulaDataSource.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/* Copyright (c) 2020 vesoft inc. All rights reserved.
- *
- * This source code is licensed under Apache 2.0 License,
- * attached with Common Clause Condition 1.0, found in the LICENSES directory.
- */
-
-package com.vesoft.nebula.reader;
-
-import com.vesoft.nebula.bean.ConnectInfo;
-import com.vesoft.nebula.bean.ScanInfo;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.sql.sources.BaseRelation;
-import org.apache.spark.sql.sources.RelationProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.collection.immutable.Map;
-import scala.runtime.AbstractFunction0;
-
-public class NebulaDataSource implements RelationProvider {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(NebulaDataSource.class);
-
- private ConnectInfo connectInfo;
- private ScanInfo scanInfo;
-
- @Override
- public BaseRelation createRelation(SQLContext sqlContext, Map parameters) {
- // parse and check parameters
- String spaceName = parameters.get("spaceName").get();
- String hostAndPorts = parameters.get("hostAndPorts").get();
- connectInfo = new ConnectInfo(spaceName, hostAndPorts);
- LOGGER.info("connectInfo, {}", connectInfo);
-
- String partitionNumber = parameters.getOrElse("partitionNumber",
- new AbstractFunction0() {
- @Override
- public String apply() {
- return String.valueOf(Runtime.getRuntime().availableProcessors());
- }
- });
- scanInfo = new ScanInfo(parameters.get("importType").get(),
- parameters.get("returnCols").get(), Integer.valueOf(partitionNumber));
- LOGGER.info("scanInfo: {}", scanInfo);
-
- return new NebulaRelation(sqlContext, connectInfo, scanInfo);
- }
-}
diff --git a/tools/nebula-spark/src/main/java/com/vesoft/nebula/reader/NebulaPartition.java b/tools/nebula-spark/src/main/java/com/vesoft/nebula/reader/NebulaPartition.java
index bd4eb1b67..235caa5c6 100644
--- a/tools/nebula-spark/src/main/java/com/vesoft/nebula/reader/NebulaPartition.java
+++ b/tools/nebula-spark/src/main/java/com/vesoft/nebula/reader/NebulaPartition.java
@@ -8,6 +8,7 @@
import java.util.ArrayList;
import java.util.List;
+
import org.apache.spark.Partition;
public class NebulaPartition implements Partition {
@@ -25,19 +26,17 @@ public int index() {
/**
* allocate scanPart to partition
- * @param totalPart nebula data part num
- * @return scan data part list
+ *
+ * @param totalPart nebula data part num
+ * @return scan data part list
*/
public List getScanParts(int totalPart, int totalPartition) {
List scanParts = new ArrayList<>();
-
int currentPart = index + 1;
while (currentPart <= totalPart) {
scanParts.add(currentPart);
currentPart += totalPartition;
}
-
return scanParts;
-
}
}
diff --git a/tools/nebula-spark/src/main/java/com/vesoft/nebula/reader/NebulaRDD.java b/tools/nebula-spark/src/main/java/com/vesoft/nebula/reader/NebulaRDD.java
index 80bc21011..bb6654624 100644
--- a/tools/nebula-spark/src/main/java/com/vesoft/nebula/reader/NebulaRDD.java
+++ b/tools/nebula-spark/src/main/java/com/vesoft/nebula/reader/NebulaRDD.java
@@ -6,10 +6,9 @@
package com.vesoft.nebula.reader;
-import com.vesoft.nebula.bean.ConnectInfo;
-import com.vesoft.nebula.bean.ScanInfo;
+import com.vesoft.nebula.bean.DataSourceConfig;
import com.vesoft.nebula.common.Type;
-import java.util.Map;
+
import org.apache.spark.Partition;
import org.apache.spark.TaskContext;
import org.apache.spark.rdd.RDD;
@@ -24,38 +23,38 @@ public class NebulaRDD extends RDD {
private static final ClassTag ROW_TAG = ClassManifestFactory$.MODULE$.fromClass(Row.class);
- private ConnectInfo connectInfo;
- private ScanInfo scanInfo;
- private Map propIndexMap;
+ private DataSourceConfig dataSourceConfig;
/**
- * @param sqlContext sqlContext
- * @param scanInfo scan info
- * @param connectInfo nebula connect info
- * @param propIndexMap label and its properties in schema index map
+ * @param sqlContext sqlContext
+ * @param dataSourceConfig scan info
*/
- public NebulaRDD(SQLContext sqlContext, ScanInfo scanInfo,
- ConnectInfo connectInfo, Map propIndexMap) {
+ public NebulaRDD(SQLContext sqlContext, DataSourceConfig dataSourceConfig) {
super(sqlContext.sparkContext(), new ArrayBuffer<>(), ROW_TAG);
- this.propIndexMap = propIndexMap;
- this.scanInfo = scanInfo;
- this.connectInfo = connectInfo;
+ this.dataSourceConfig = dataSourceConfig;
}
+ /**
+ * start to scan vertex or edge data
+ *
+ * @param split
+ * @param context
+ * @return Iterator
+ */
@Override
public Iterator compute(Partition split, TaskContext context) {
- String scanType = scanInfo.getScanType();
- if (Type.VERTEX.getType().equalsIgnoreCase(scanType)) {
- return new ScanVertexIterator(connectInfo, split, scanInfo, propIndexMap);
+ String type = dataSourceConfig.getType();
+ if (Type.VERTEX.getType().equalsIgnoreCase(type)) {
+ return new ScanVertexIterator(split, dataSourceConfig);
} else {
- return new ScanEdgeIterator(connectInfo, split, scanInfo, propIndexMap);
+ return new ScanEdgeIterator(split, dataSourceConfig);
}
}
@Override
public Partition[] getPartitions() {
- int partitionNumber = scanInfo.getPartitionNumber();
+ int partitionNumber = dataSourceConfig.getPartitionNumber();
Partition[] partitions = new Partition[partitionNumber];
for (int i = 0; i < partitionNumber; i++) {
Partition partition = new NebulaPartition(i);
diff --git a/tools/nebula-spark/src/main/java/com/vesoft/nebula/reader/NebulaRelation.java b/tools/nebula-spark/src/main/java/com/vesoft/nebula/reader/NebulaRelation.java
index 71edce695..7ff95c5da 100644
--- a/tools/nebula-spark/src/main/java/com/vesoft/nebula/reader/NebulaRelation.java
+++ b/tools/nebula-spark/src/main/java/com/vesoft/nebula/reader/NebulaRelation.java
@@ -6,16 +6,19 @@
package com.vesoft.nebula.reader;
-import com.vesoft.nebula.bean.ConnectInfo;
-import com.vesoft.nebula.bean.ScanInfo;
+import com.facebook.thrift.TException;
+import com.vesoft.nebula.bean.DataSourceConfig;
+import com.vesoft.nebula.client.meta.MetaClientImpl;
import com.vesoft.nebula.common.Type;
+
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
+import java.util.HashMap;
+import java.util.ArrayList;
+
+import com.vesoft.nebula.exception.GraphConnectException;
+import com.vesoft.nebula.util.DataTypeConverter;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
@@ -33,64 +36,83 @@ public class NebulaRelation extends BaseRelation implements Serializable, TableS
private SQLContext sqlContext;
private StructType schema;
+ private final Map> labelFields = new HashMap<>();
- private ConnectInfo connectInfo;
- private ScanInfo scanInfo;
- private Map propIndexMap;
+ private DataSourceConfig dataSourceConfig;
- public NebulaRelation(SQLContext sqlContext, ConnectInfo connectInfo, ScanInfo scanInfo) {
+ public NebulaRelation(SQLContext sqlContext, DataSourceConfig dataSourceConfig) {
this.sqlContext = sqlContext;
- this.connectInfo = connectInfo;
- this.scanInfo = scanInfo;
+ this.dataSourceConfig = dataSourceConfig;
- initParameters(scanInfo);
+ initSchema(dataSourceConfig);
}
- private void initParameters(ScanInfo scanInfo) {
-
- Set returnPropSet = new HashSet<>();
- Map> returnColMap = scanInfo.getReturnColMap();
+ /**
+ * init result dataset's schema
+ *
+ * @param dataSourceConfig
+ */
+ private void initSchema(DataSourceConfig dataSourceConfig) {
+ Map> returnColMap = dataSourceConfig.getReturnColMap();
LOGGER.info("return col map: {}", returnColMap);
- for (Map.Entry> returnColEntry : returnColMap.entrySet()) {
- List returnCols = returnColEntry.getValue();
- for (String returnCol : returnCols) {
- returnPropSet.add(returnCol);
- }
- }
- LOGGER.info("return prop set: {}", returnPropSet);
List fields = new ArrayList<>();
- if (Type.VERTEX.getType().equalsIgnoreCase(scanInfo.getScanType())) {
- fields.add(DataTypes.createStructField("id", DataTypes.StringType, false));
- } else {
- fields.add(DataTypes.createStructField("src", DataTypes.StringType, false));
- fields.add(DataTypes.createStructField("dst", DataTypes.StringType, false));
- }
- /*
- spark read result has only one dataset, but scan result contain difference properties,
- so create a schema contain all properties
- */
- for (String returnCol : returnPropSet) {
- fields.add(DataTypes.createStructField(returnCol, DataTypes.StringType, true));
+
+ MetaClientImpl metaClient = new MetaClientImpl(dataSourceConfig.getHostAndPorts());
+ try {
+ metaClient.connect();
+ } catch (TException e) {
+ throw new GraphConnectException(e.getMessage(), e);
}
- schema = DataTypes.createStructType(fields);
- propIndexMap = new HashMap<>();
- for (int i = 0; i < fields.size(); i++) {
- propIndexMap.put(fields.get(i).name(), i);
+ Map schemaColAndType;
+ for (Map.Entry> returnColEntry : returnColMap.entrySet()) {
+ if (Type.VERTEX.getType().equalsIgnoreCase(dataSourceConfig.getType())) {
+ fields.add(DataTypes.createStructField("_vertexId", DataTypes.StringType, false));
+ schemaColAndType = metaClient.getTagSchema(dataSourceConfig.getNameSpace(), dataSourceConfig.getLabel());
+ } else {
+ fields.add(DataTypes.createStructField("_srcId", DataTypes.StringType, false));
+ fields.add(DataTypes.createStructField("_dstId", DataTypes.StringType, false));
+ schemaColAndType = metaClient.getEdgeSchema(dataSourceConfig.getNameSpace(), dataSourceConfig.getLabel());
+ }
+
+ if (dataSourceConfig.getAllCols()) {
+ // if allCols is true, then fields should contain all properties.
+ for (Map.Entry colTypeEntry : schemaColAndType.entrySet()) {
+ fields.add(DataTypes.createStructField(colTypeEntry.getKey(),
+ DataTypeConverter.convertDataType(colTypeEntry.getValue()),
+ true));
+ }
+ } else {
+ for (String returnCol : returnColEntry.getValue()) {
+ if (schemaColAndType.containsKey(returnCol)) {
+ fields.add(DataTypes.createStructField(returnCol,
+ DataTypeConverter.convertDataType(schemaColAndType.get(returnCol)),
+ true));
+ } else {
+ LOGGER.warn("label {} doesn't contain col {}", dataSourceConfig.getLabel(), returnCol);
+ }
+ }
+ }
+
+ labelFields.put(returnColEntry.getKey(), fields);
+ schema = new StructType(fields.toArray(new StructField[fields.size()]));
+ LOGGER.info("return prop set for label {} : {}", returnColEntry.getKey(), fields);
}
- LOGGER.info("propIndexMap: {}", propIndexMap);
}
+ @Override
public SQLContext sqlContext() {
return sqlContext;
}
+ @Override
public StructType schema() {
return schema;
}
+ @Override
public RDD buildScan() {
- return new NebulaRDD(sqlContext, scanInfo, connectInfo, propIndexMap);
+ return new NebulaRDD(sqlContext, dataSourceConfig);
}
}
diff --git a/tools/nebula-spark/src/main/java/com/vesoft/nebula/reader/ScanEdgeIterator.java b/tools/nebula-spark/src/main/java/com/vesoft/nebula/reader/ScanEdgeIterator.java
index f7dd2c3eb..8ca8b8633 100644
--- a/tools/nebula-spark/src/main/java/com/vesoft/nebula/reader/ScanEdgeIterator.java
+++ b/tools/nebula-spark/src/main/java/com/vesoft/nebula/reader/ScanEdgeIterator.java
@@ -6,31 +6,35 @@
package com.vesoft.nebula.reader;
-import com.vesoft.nebula.bean.ConnectInfo;
-import com.vesoft.nebula.bean.ScanInfo;
+import com.vesoft.nebula.bean.DataSourceConfig;
import com.vesoft.nebula.client.storage.processor.ScanEdgeProcessor;
import com.vesoft.nebula.data.Property;
import com.vesoft.nebula.data.Result;
+import com.vesoft.nebula.data.Row;
import com.vesoft.nebula.storage.ScanEdgeResponse;
+
import java.io.IOException;
+import java.util.Map;
+import java.util.List;
import java.util.ArrayList;
import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+
import org.apache.spark.Partition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ScanEdgeIterator extends AbstractNebulaIterator {
- private Logger logger = LoggerFactory.getLogger(ScanEdgeIterator.class);
+ private final Logger LOGGER = LoggerFactory.getLogger(ScanEdgeIterator.class);
private Iterator responseIterator;
- public ScanEdgeIterator(ConnectInfo connectInfo, Partition split,
- ScanInfo scanInfo, Map propIndexMap) {
- super(connectInfo, split, scanInfo, propIndexMap);
- processor = new ScanEdgeProcessor(metaClient);
+ private DataSourceConfig dataSourceConfig;
+
+ public ScanEdgeIterator(Partition split,
+ DataSourceConfig dataSourceConfig) {
+ super(split, dataSourceConfig);
+ this.dataSourceConfig = dataSourceConfig;
}
@Override
@@ -43,22 +47,22 @@ public boolean hasNext() {
if (responseIterator == null || !responseIterator.hasNext()) {
if (scanPartIterator.hasNext()) {
try {
- responseIterator = storageClient.scanEdge(connectInfo.getSpaceName(),
- scanPartIterator.next(), returnCols,
- false, 1000, 0L, Long.MAX_VALUE);
+ responseIterator = storageClient.scanEdge(dataSourceConfig.getNameSpace(),
+ scanPartIterator.next(), returnCols, dataSourceConfig.getAllCols(),
+ 1000, 0L, Long.MAX_VALUE);
} catch (IOException e) {
- logger.error(e.getMessage(), e);
+ LOGGER.error(e.getMessage(), e);
}
continue;
}
break;
- } else if (responseIterator.hasNext()) {
+ } else {
ScanEdgeResponse next = responseIterator.next();
if (next != null) {
- Result processResult = processor.process(connectInfo.getSpaceName(), next);
+ processor = new ScanEdgeProcessor(metaClient);
+ Result processResult = processor.process(dataSourceConfig.getNameSpace(), next);
dataIterator = process(processResult);
}
- continue;
}
}
if (dataIterator == null) {
@@ -68,26 +72,24 @@ public boolean hasNext() {
}
@Override
- protected Iterator process(Result result) {
- List resultValues = new ArrayList<>();
-
- Map> dataMap = result.getRows();
- for (Map.Entry> dataEntry : dataMap.entrySet()) {
+ protected Iterator process(Result result) {
+ Map> dataMap = result.getRows();
+ for (Map.Entry> dataEntry : dataMap.entrySet()) {
String labelName = dataEntry.getKey();
- List propIndexs = labelPropIndexMap.get(labelName);
- List rows = dataEntry.getValue();
- for (com.vesoft.nebula.data.Row row : rows) {
- Iterator nameIndexIterator = propIndexs.iterator();
- String[] fields = new String[propSize + 2];
- fields[0] = String.valueOf(row.getDefaultProperties()[0].getValue());
- fields[1] = String.valueOf(row.getDefaultProperties()[2].getValue());
+ List rows = dataEntry.getValue();
+ for (Row row : rows) {
+ List