Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connector for spark 2.2 #49

Merged
merged 18 commits into from
Aug 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,126 @@ Nebula Spark Connector 2.0/3.0 only supports Nebula Graph 2.x/3.x. If you are us

For more information on usage, please refer to [Example](https://github.com/vesoft-inc/nebula-spark-connector/tree/master/example/src/main/scala/com/vesoft/nebula/examples/connector).

## PySpark with Nebula Spark Connector

Below is an example of calling nebula-spark-connector jar package in pyspark.

### Read in PySpark

Read from NebulaGraph with `metaAddress` of `"metad0:9559"` as a dataframe:

```python
df = spark.read.format(
"com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "vertex").option(
"spaceName", "basketballplayer").option(
"label", "player").option(
"returnCols", "name,age").option(
"metaAddress", "metad0:9559").option(
"partitionNumber", 1).load()
```

You may then `show` the dataframe as follow:

```python
>>> df.show(n=2)
+---------+--------------+---+
|_vertexId| name|age|
+---------+--------------+---+
|player105| Danny Green| 31|
|player109|Tiago Splitter| 34|
+---------+--------------+---+
only showing top 2 rows
```

### Write in PySpark

Let's try a write example, by default, the `writeMode` is `insert`

```python
df.write.format("com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "vertex").option(
"spaceName", "basketballplayer").option(
"label", "player").option(
"vidPolicy", "").option(
"vertexField", "_vertexId").option(
"batch", 1).option(
"metaAddress", "metad0:9559").option(
"graphAddress", "graphd1:9669").option(
"passwd", "nebula").option(
"user", "root").save()
```

For delete or update write mode, we could(for instance)specify with `writeMode` as `delete` like:
```python
df.write.format("com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "vertex").option(
"spaceName", "basketballplayer").option(
"label", "player").option(
"vidPolicy", "").option(
"vertexField", "_vertexId").option(
"batch", 1).option(
"metaAddress", "metad0:9559").option(
"graphAddress", "graphd1:9669").option(
"passwd", "nebula").option(
"writeMode", "delete").option(
"user", "root").save()
```

### Options in PySpark

For more options, i.e. delete edge with vertex being deleted, refer to [nebula/connector/NebulaOptions.scala
](https://github.com/vesoft-inc/nebula-spark-connector/blob/master/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaOptions.scala), we could know it's named as `deleteEdge` in option.

```scala
/** write config */
val RATE_LIMIT: String = "rateLimit"
val VID_POLICY: String = "vidPolicy"
val SRC_POLICY: String = "srcPolicy"
val DST_POLICY: String = "dstPolicy"
val VERTEX_FIELD = "vertexField"
val SRC_VERTEX_FIELD = "srcVertexField"
val DST_VERTEX_FIELD = "dstVertexField"
val RANK_FIELD = "rankFiled"
val BATCH: String = "batch"
val VID_AS_PROP: String = "vidAsProp"
val SRC_AS_PROP: String = "srcAsProp"
val DST_AS_PROP: String = "dstAsProp"
val RANK_AS_PROP: String = "rankAsProp"
val WRITE_MODE: String = "writeMode"
val DELETE_EDGE: String = "deleteEdge"
```

### Call Nebula Spark Connector in PySpark shell and .py file

Also, below are examples on how we run above code with pyspark shell or in python code files:

- Call with PySpark shell:

```bash
/spark/bin/pyspark --driver-class-path nebula-spark-connector-3.0.0.jar --jars nebula-spark-connector-3.0.0.jar
```

- In Python code:

```
from pyspark.sql import SparkSession

spark = SparkSession.builder.config(
"nebula-spark-connector-3.0.0.jar",
"/path_to/nebula-spark-connector-3.0.0.jar").appName(
"nebula-connector").getOrCreate()

df = spark.read.format(
"com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "vertex").option(
"spaceName", "basketballplayer").option(
"label", "player").option(
"returnCols", "name,age").option(
"metaAddress", "metad0:9559").option(
"partitionNumber", 1).load()
```

## Version match

There are the version correspondence between Nebula Spark Connector and Nebula:
Expand Down
119 changes: 119 additions & 0 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,125 @@ Nebula Spark Connector 2.0/3.0 仅支持 Nebula Graph 2.x/3.x。如果您正在

更多使用示例请参考 [Example](https://github.com/vesoft-inc/nebula-spark-connector/tree/master/example/src/main/scala/com/vesoft/nebula/examples/connector) 。

## PySpark 中使用 Nebula Spark Connector

### PySpark 中读取 NebulaGraph 中数据

从 `metaAddress` 为 `"metad0:9559"` 的 Nebula Graph 中读取整个 tag 下的数据为一个 dataframe:

```python
df = spark.read.format(
"com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "vertex").option(
"spaceName", "basketballplayer").option(
"label", "player").option(
"returnCols", "name,age").option(
"metaAddress", "metad0:9559").option(
"partitionNumber", 1).load()
```

然后可以像这样 `show` 这个 dataframe:

```python
>>> df.show(n=2)
+---------+--------------+---+
|_vertexId| name|age|
+---------+--------------+---+
|player105| Danny Green| 31|
|player109|Tiago Splitter| 34|
+---------+--------------+---+
only showing top 2 rows
```

### PySpark 中写 NebulaGraph 中数据

再试一试写入数据的例子,默认不指定的情况下 `writeMode` 是 `insert`:

```python
df.write.format("com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "vertex").option(
"spaceName", "basketballplayer").option(
"label", "player").option(
"vidPolicy", "").option(
"vertexField", "_vertexId").option(
"batch", 1).option(
"metaAddress", "metad0:9559").option(
"graphAddress", "graphd1:9669").option(
"passwd", "nebula").option(
"user", "root").save()
```
如果想指定 `delete` 或者 `update` 的非默认写入模式,增加 `writeMode` 的配置,比如 `delete` 的例子:

```python
df.write.format("com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "vertex").option(
"spaceName", "basketballplayer").option(
"label", "player").option(
"vidPolicy", "").option(
"vertexField", "_vertexId").option(
"batch", 1).option(
"metaAddress", "metad0:9559").option(
"graphAddress", "graphd1:9669").option(
"passwd", "nebula").option(
"writeMode", "delete").option(
"user", "root").save()
```

### 关于 PySpark 读写的 option


对于其他的 option,比如删除点的时候的 `withDeleteEdge` 可以参考 [nebula/connector/NebulaOptions.scala
](https://github.com/vesoft-inc/nebula-spark-connector/blob/master/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaOptions.scala) 的字符串配置定义,我们可以看到它的字符串定义字段是 `deleteEdge` :

```scala
/** write config */
val RATE_LIMIT: String = "rateLimit"
val VID_POLICY: String = "vidPolicy"
val SRC_POLICY: String = "srcPolicy"
val DST_POLICY: String = "dstPolicy"
val VERTEX_FIELD = "vertexField"
val SRC_VERTEX_FIELD = "srcVertexField"
val DST_VERTEX_FIELD = "dstVertexField"
val RANK_FIELD = "rankFiled"
val BATCH: String = "batch"
val VID_AS_PROP: String = "vidAsProp"
val SRC_AS_PROP: String = "srcAsProp"
val DST_AS_PROP: String = "dstAsProp"
val RANK_AS_PROP: String = "rankAsProp"
val WRITE_MODE: String = "writeMode"
val DELETE_EDGE: String = "deleteEdge"
```

### 如何在 PySpark 中调用 Nebula Spark Connector

最后,这里给出用 PySpark Shell 和在 Python 代码里调用 Spark Connector 的例子:

- Call with PySpark shell:

```bash
/spark/bin/pyspark --driver-class-path nebula-spark-connector-3.0.0.jar --jars nebula-spark-connector-3.0.0.jar
```

- In Python code:

```
from pyspark.sql import SparkSession

spark = SparkSession.builder.config(
"nebula-spark-connector-3.0.0.jar",
"/path_to/nebula-spark-connector-3.0.0.jar").appName(
"nebula-connector").getOrCreate()

df = spark.read.format(
"com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "vertex").option(
"spaceName", "basketballplayer").option(
"label", "player").option(
"returnCols", "name,age").option(
"metaAddress", "metad0:9559").option(
"partitionNumber", 1).load()
```

## 版本匹配
Nebula Spark Connector 和 Nebula 的版本对应关系如下:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ object NebulaSparkWriterExample {
.withUser("root")
.withPasswd("nebula")
.withWriteMode(WriteMode.DELETE)
// config deleteEdge true, means delete related edges when delete vertex
// refer https://docs.nebula-graph.com.cn/master/3.ngql-guide/12.vertex-statements/4.delete-vertex/#_1
.withDeleteEdge(true)
.build()
df.write.nebula(config, nebulaWriteVertexConfig).writeVertices()
}
Expand Down
Loading