Skip to content

Commit

Permalink
connector for spark 2.2 (#49)
Browse files Browse the repository at this point in the history
* pyspark example added (#51)

* pyspark example added

* Update README_CN.md

* support delete related edges when delete vertex (#53)

* support delete related edges when delete vertex

* add test

* add example for delete vertex with edge (#54)

* doc: pyspark write example (#55)

* doc: pyspark write example

* Added pyshell calling lines and python file header

discussed in #50
Thanks to @Reid00

* Update README.md

wording

* Update README_CN.md

* Update README.md

* Update README_CN.md

* Update README.md

* Update README_CN.md

* spark2.2 reader initial commit

* spark2.2 reader initial commit

* extract common config for multi spark version

* delete common config files

* extract common config and utils

* remove common test

* spark connector reader for spark 2.2

* spark connector writer for spark 2.2

* revert example

* refactor spark version & close metaProvider after finish writing

* refactor common package name

* fix scan part

* refactor spark version for spark2.2

* connector writer for spark2.2

Co-authored-by: Wey Gu <[email protected]>
  • Loading branch information
Nicole00 and wey-gu authored Aug 31, 2022
1 parent e6392e5 commit e6bad51
Show file tree
Hide file tree
Showing 60 changed files with 3,861 additions and 142 deletions.
120 changes: 120 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,126 @@ Nebula Spark Connector 2.0/3.0 only supports Nebula Graph 2.x/3.x. If you are us
For more information on usage, please refer to [Example](https://github.com/vesoft-inc/nebula-spark-connector/tree/master/example/src/main/scala/com/vesoft/nebula/examples/connector).
## PySpark with Nebula Spark Connector
Below is an example of calling nebula-spark-connector jar package in pyspark.
### Read in PySpark
Read from NebulaGraph with `metaAddress` of `"metad0:9559"` as a dataframe:
```python
df = spark.read.format(
"com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "vertex").option(
"spaceName", "basketballplayer").option(
"label", "player").option(
"returnCols", "name,age").option(
"metaAddress", "metad0:9559").option(
"partitionNumber", 1).load()
```

You may then `show` the dataframe as follow:

```python
>>> df.show(n=2)
+---------+--------------+---+
|_vertexId| name|age|
+---------+--------------+---+
|player105| Danny Green| 31|
|player109|Tiago Splitter| 34|
+---------+--------------+---+
only showing top 2 rows
```

### Write in PySpark

Let's try a write example, by default, the `writeMode` is `insert`

```python
df.write.format("com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "vertex").option(
"spaceName", "basketballplayer").option(
"label", "player").option(
"vidPolicy", "").option(
"vertexField", "_vertexId").option(
"batch", 1).option(
"metaAddress", "metad0:9559").option(
"graphAddress", "graphd1:9669").option(
"passwd", "nebula").option(
"user", "root").save()
```

For delete or update write mode, we could(for instance)specify with `writeMode` as `delete` like:
```python
df.write.format("com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "vertex").option(
"spaceName", "basketballplayer").option(
"label", "player").option(
"vidPolicy", "").option(
"vertexField", "_vertexId").option(
"batch", 1).option(
"metaAddress", "metad0:9559").option(
"graphAddress", "graphd1:9669").option(
"passwd", "nebula").option(
"writeMode", "delete").option(
"user", "root").save()
```

### Options in PySpark

For more options, i.e. delete edge with vertex being deleted, refer to [nebula/connector/NebulaOptions.scala
](https://github.com/vesoft-inc/nebula-spark-connector/blob/master/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaOptions.scala), we could know it's named as `deleteEdge` in option.

```scala
/** write config */
val RATE_LIMIT: String = "rateLimit"
val VID_POLICY: String = "vidPolicy"
val SRC_POLICY: String = "srcPolicy"
val DST_POLICY: String = "dstPolicy"
val VERTEX_FIELD = "vertexField"
val SRC_VERTEX_FIELD = "srcVertexField"
val DST_VERTEX_FIELD = "dstVertexField"
val RANK_FIELD = "rankFiled"
val BATCH: String = "batch"
val VID_AS_PROP: String = "vidAsProp"
val SRC_AS_PROP: String = "srcAsProp"
val DST_AS_PROP: String = "dstAsProp"
val RANK_AS_PROP: String = "rankAsProp"
val WRITE_MODE: String = "writeMode"
val DELETE_EDGE: String = "deleteEdge"
```

### Call Nebula Spark Connector in PySpark shell and .py file

Also, below are examples on how we run above code with pyspark shell or in python code files:

- Call with PySpark shell:

```bash
/spark/bin/pyspark --driver-class-path nebula-spark-connector-3.0.0.jar --jars nebula-spark-connector-3.0.0.jar
```

- In Python code:

```
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(
"nebula-spark-connector-3.0.0.jar",
"/path_to/nebula-spark-connector-3.0.0.jar").appName(
"nebula-connector").getOrCreate()
df = spark.read.format(
"com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "vertex").option(
"spaceName", "basketballplayer").option(
"label", "player").option(
"returnCols", "name,age").option(
"metaAddress", "metad0:9559").option(
"partitionNumber", 1).load()
```

## Version match

There are the version correspondence between Nebula Spark Connector and Nebula:
Expand Down
119 changes: 119 additions & 0 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,125 @@ Nebula Spark Connector 2.0/3.0 仅支持 Nebula Graph 2.x/3.x。如果您正在
更多使用示例请参考 [Example](https://github.com/vesoft-inc/nebula-spark-connector/tree/master/example/src/main/scala/com/vesoft/nebula/examples/connector) 。
## PySpark 中使用 Nebula Spark Connector
### PySpark 中读取 NebulaGraph 中数据
从 `metaAddress` 为 `"metad0:9559"` 的 Nebula Graph 中读取整个 tag 下的数据为一个 dataframe:
```python
df = spark.read.format(
"com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "vertex").option(
"spaceName", "basketballplayer").option(
"label", "player").option(
"returnCols", "name,age").option(
"metaAddress", "metad0:9559").option(
"partitionNumber", 1).load()
```

然后可以像这样 `show` 这个 dataframe:

```python
>>> df.show(n=2)
+---------+--------------+---+
|_vertexId| name|age|
+---------+--------------+---+
|player105| Danny Green| 31|
|player109|Tiago Splitter| 34|
+---------+--------------+---+
only showing top 2 rows
```

### PySpark 中写 NebulaGraph 中数据

再试一试写入数据的例子,默认不指定的情况下 `writeMode``insert`

```python
df.write.format("com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "vertex").option(
"spaceName", "basketballplayer").option(
"label", "player").option(
"vidPolicy", "").option(
"vertexField", "_vertexId").option(
"batch", 1).option(
"metaAddress", "metad0:9559").option(
"graphAddress", "graphd1:9669").option(
"passwd", "nebula").option(
"user", "root").save()
```
如果想指定 `delete` 或者 `update` 的非默认写入模式,增加 `writeMode` 的配置,比如 `delete` 的例子:

```python
df.write.format("com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "vertex").option(
"spaceName", "basketballplayer").option(
"label", "player").option(
"vidPolicy", "").option(
"vertexField", "_vertexId").option(
"batch", 1).option(
"metaAddress", "metad0:9559").option(
"graphAddress", "graphd1:9669").option(
"passwd", "nebula").option(
"writeMode", "delete").option(
"user", "root").save()
```

### 关于 PySpark 读写的 option


对于其他的 option,比如删除点的时候的 `withDeleteEdge` 可以参考 [nebula/connector/NebulaOptions.scala
](https://github.com/vesoft-inc/nebula-spark-connector/blob/master/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaOptions.scala) 的字符串配置定义,我们可以看到它的字符串定义字段是 `deleteEdge`

```scala
/** write config */
val RATE_LIMIT: String = "rateLimit"
val VID_POLICY: String = "vidPolicy"
val SRC_POLICY: String = "srcPolicy"
val DST_POLICY: String = "dstPolicy"
val VERTEX_FIELD = "vertexField"
val SRC_VERTEX_FIELD = "srcVertexField"
val DST_VERTEX_FIELD = "dstVertexField"
val RANK_FIELD = "rankFiled"
val BATCH: String = "batch"
val VID_AS_PROP: String = "vidAsProp"
val SRC_AS_PROP: String = "srcAsProp"
val DST_AS_PROP: String = "dstAsProp"
val RANK_AS_PROP: String = "rankAsProp"
val WRITE_MODE: String = "writeMode"
val DELETE_EDGE: String = "deleteEdge"
```

### 如何在 PySpark 中调用 Nebula Spark Connector

最后,这里给出用 PySpark Shell 和在 Python 代码里调用 Spark Connector 的例子:

- Call with PySpark shell:

```bash
/spark/bin/pyspark --driver-class-path nebula-spark-connector-3.0.0.jar --jars nebula-spark-connector-3.0.0.jar
```

- In Python code:

```
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(
"nebula-spark-connector-3.0.0.jar",
"/path_to/nebula-spark-connector-3.0.0.jar").appName(
"nebula-connector").getOrCreate()
df = spark.read.format(
"com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "vertex").option(
"spaceName", "basketballplayer").option(
"label", "player").option(
"returnCols", "name,age").option(
"metaAddress", "metad0:9559").option(
"partitionNumber", 1).load()
```

## 版本匹配
Nebula Spark Connector 和 Nebula 的版本对应关系如下:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ object NebulaSparkWriterExample {
.withUser("root")
.withPasswd("nebula")
.withWriteMode(WriteMode.DELETE)
// config deleteEdge true, means delete related edges when delete vertex
// refer https://docs.nebula-graph.com.cn/master/3.ngql-guide/12.vertex-statements/4.delete-vertex/#_1
.withDeleteEdge(true)
.build()
df.write.nebula(config, nebulaWriteVertexConfig).writeVertices()
}
Expand Down
Loading

0 comments on commit e6bad51

Please sign in to comment.