From 1422a7e6d235ae261214cf38281829fa063b497f Mon Sep 17 00:00:00 2001 From: wey-gu <1651790+wey-gu@users.noreply.github.com> Date: Fri, 6 May 2022 14:53:10 +0800 Subject: [PATCH] with storage address now we could manully override storage addresses instead of relying on meta --- .../connector/NebulaSparkReaderExample.scala | 28 ++++++++++++++++++ .../nebula/connector/NebulaConfig.scala | 12 ++++++++ .../nebula/connector/NebulaOptions.scala | 29 +++++++++++++++---- .../com/vesoft/nebula/connector/package.scala | 4 +++ .../reader/NebulaPartitionReader.scala | 8 +++++ 5 files changed, 75 insertions(+), 6 deletions(-) diff --git a/example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala b/example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala index b54cf954..cb488e11 100644 --- a/example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala +++ b/example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala @@ -165,3 +165,31 @@ object NebulaSparkReaderExample { println("vertex count: " + vertex.count()) } } + /** + * read Nebula vertex with storaged addresses manually specified. + */ + + def readVertexGraph(spark: SparkSession): Unit = { + LOG.info("start to read graphx vertex") + val config = + NebulaConnectionConfig + .builder() + .withMetaAddress("127.0.0.1:9559") + .withStorageAddress("127.0.0.1:9779") + .withTimeout(6000) + .withConenctionRetry(2) + .build() + val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig + .builder() + .withSpace("test") + .withLabel("person") + .withNoColumn(false) + .withReturnCols(List("birthday")) + .withLimit(10) + .withPartitionNum(10) + .build() + + val vertexRDD = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToGraphx() + LOG.info("vertex rdd first record: " + vertexRDD.first()) + LOG.info("vertex rdd count: {}", vertexRDD.count()) + } \ No newline at end of file diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala index c470c54e..228ce4b9 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala @@ -11,6 +11,7 @@ import org.slf4j.{Logger, LoggerFactory} import scala.collection.mutable.ListBuffer class NebulaConnectionConfig(metaAddress: String, + storageAddress: String = "", graphAddress: String, timeout: Int, connectionRetry: Int, @@ -23,6 +24,7 @@ class NebulaConnectionConfig(metaAddress: String, selfSignParam: SelfSSLSignParams) extends Serializable { def getMetaAddress = metaAddress + def getStorageAddress = storageAddress def getGraphAddress = graphAddress def getTimeout = timeout def getConnectionRetry = connectionRetry @@ -44,6 +46,7 @@ object NebulaConnectionConfig { private val LOG = LoggerFactory.getLogger(this.getClass) protected var metaAddress: String = _ + protected var storageAddress: String = "" protected var graphAddress: String = _ protected var timeout: Int = 6000 protected var connectionRetry: Int = 1 @@ -64,6 +67,14 @@ object NebulaConnectionConfig { this } + /** + * set nebula storage server address, multi addresses is split by English comma + */ + def withStorageAddress(storageAddress: String): ConfigBuilder = { + this.storageAddress = storageAddress + this + } + /** * set nebula graph server address, multi addresses is split by English comma */ @@ -189,6 +200,7 @@ object NebulaConnectionConfig { def build(): NebulaConnectionConfig = { check() new NebulaConnectionConfig(metaAddress, + storageAddress, graphAddress, timeout, connectionRetry, diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaOptions.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaOptions.scala index 11051a68..00df19fe 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaOptions.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaOptions.scala @@ -96,6 +96,9 @@ class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String])( require(parameters.isDefinedAt(META_ADDRESS), s"Option '$META_ADDRESS' is required") val metaAddress: String = parameters(META_ADDRESS) + /** storageAddress is optional */ + val storageAddress: String = parameters.getOrElse(STORAGE_ADDRESS, "") + require(parameters.isDefinedAt(SPACE_NAME) && StringUtils.isNotBlank(parameters(SPACE_NAME)), s"Option '$SPACE_NAME' is required and can not be blank") val spaceName: String = parameters(SPACE_NAME) @@ -188,6 +191,20 @@ class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String])( hostPorts.toList } + def getStorageAddress: List[Address] = { + val hostPorts: ListBuffer[Address] = new ListBuffer[Address] + if (storageAddress != "" && storageAddress != null) { + storageAddress + .split(",") + .foreach(hostPort => { + // check host & port by getting HostAndPort + val addr = HostAndPort.fromString(hostPort) + hostPorts.append((addr.getHostText, addr.getPort)) + }) + } + hostPorts.toList + } + def getGraphAddress: List[Address] = { val hostPorts: ListBuffer[Address] = new ListBuffer[Address] graphAddress @@ -199,7 +216,6 @@ class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String])( }) hostPorts.toList } - } class NebulaOptionsInWrite(@transient override val parameters: CaseInsensitiveMap[String]) @@ -208,11 +224,12 @@ class NebulaOptionsInWrite(@transient override val parameters: CaseInsensitiveMa object NebulaOptions { /** nebula common config */ - val SPACE_NAME: String = "spaceName" - val META_ADDRESS: String = "metaAddress" - val GRAPH_ADDRESS: String = "graphAddress" - val TYPE: String = "type" - val LABEL: String = "label" + val SPACE_NAME: String = "spaceName" + val META_ADDRESS: String = "metaAddress" + val STORAGE_ADDRESS: String = "storageAddress" + val GRAPH_ADDRESS: String = "graphAddress" + val TYPE: String = "type" + val LABEL: String = "label" /** connection config */ val TIMEOUT: String = "timeout" diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/package.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/package.scala index 59482660..b505d930 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/package.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/package.scala @@ -116,6 +116,7 @@ package object connector { .option(NebulaOptions.NO_COLUMN, readConfig.getNoColumn) .option(NebulaOptions.LIMIT, readConfig.getLimit) .option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress) + .option(NebulaOptions.STORAGE_ADDRESS, connectionConfig.getStorageAddress) .option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout) .option(NebulaOptions.CONNECTION_RETRY, connectionConfig.getConnectionRetry) .option(NebulaOptions.EXECUTION_RETRY, connectionConfig.getExecRetry) @@ -153,6 +154,7 @@ package object connector { .option(NebulaOptions.LIMIT, readConfig.getLimit) .option(NebulaOptions.PARTITION_NUMBER, readConfig.getPartitionNum) .option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress) + .option(NebulaOptions.STORAGE_ADDRESS, connectionConfig.getStorageAddress) .option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout) .option(NebulaOptions.CONNECTION_RETRY, connectionConfig.getConnectionRetry) .option(NebulaOptions.EXECUTION_RETRY, connectionConfig.getExecRetry) @@ -267,6 +269,7 @@ package object connector { .option(NebulaOptions.VID_AS_PROP, writeConfig.getVidAsProp) .option(NebulaOptions.WRITE_MODE, writeConfig.getWriteMode) .option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress) + .option(NebulaOptions.STORAGE_ADDRESS, connectionConfig.getStorageAddress) .option(NebulaOptions.GRAPH_ADDRESS, connectionConfig.getGraphAddress) .option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout) .option(NebulaOptions.CONNECTION_RETRY, connectionConfig.getConnectionRetry) @@ -314,6 +317,7 @@ package object connector { .option(NebulaOptions.RANK_AS_PROP, writeConfig.getRankAsProp) .option(NebulaOptions.WRITE_MODE, writeConfig.getWriteMode) .option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress) + .option(NebulaOptions.STORAGE_ADDRESS, connectionConfig.getStorageAddress) .option(NebulaOptions.GRAPH_ADDRESS, connectionConfig.getGraphAddress) .option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout) .option(NebulaOptions.CONNECTION_RETRY, connectionConfig.getConnectionRetry) diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaPartitionReader.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaPartitionReader.scala index c96ea39d..cff00109 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaPartitionReader.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaPartitionReader.scala @@ -95,6 +95,14 @@ abstract class NebulaPartitionReader extends InputPartitionReader[InternalRow] { this.storageClient = new StorageClient(address.asJava, nebulaOptions.timeout) } + if (nebulaOptions.getStorageAddress != null && nebulaOptions.getStorageAddress.size > 0) { + val storageAddress: ListBuffer[HostAddress] = new ListBuffer[HostAddress] + for (addr <- nebulaOptions.getStorageAddress) { + storageAddress.append(new HostAddress(addr._1, addr._2)) + } + this.storageClient.setStorageAddresses(storageAddress.asJava) + } + if (!storageClient.connect()) { throw new GraphConnectException("storage connect failed.") }