Skip to content

Commit

Permalink
with storage address
Browse files Browse the repository at this point in the history
now we could manully override storage addresses
instead of relying on meta
  • Loading branch information
wey-gu committed May 7, 2022
1 parent e6392e5 commit 1422a7e
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,31 @@ object NebulaSparkReaderExample {
println("vertex count: " + vertex.count())
}
}
/**
* read Nebula vertex with storaged addresses manually specified.
*/

def readVertexGraph(spark: SparkSession): Unit = {
LOG.info("start to read graphx vertex")
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("127.0.0.1:9559")
.withStorageAddress("127.0.0.1:9779")
.withTimeout(6000)
.withConenctionRetry(2)
.build()
val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("test")
.withLabel("person")
.withNoColumn(false)
.withReturnCols(List("birthday"))
.withLimit(10)
.withPartitionNum(10)
.build()

val vertexRDD = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToGraphx()
LOG.info("vertex rdd first record: " + vertexRDD.first())
LOG.info("vertex rdd count: {}", vertexRDD.count())
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.slf4j.{Logger, LoggerFactory}
import scala.collection.mutable.ListBuffer

class NebulaConnectionConfig(metaAddress: String,
storageAddress: String = "",
graphAddress: String,
timeout: Int,
connectionRetry: Int,
Expand All @@ -23,6 +24,7 @@ class NebulaConnectionConfig(metaAddress: String,
selfSignParam: SelfSSLSignParams)
extends Serializable {
def getMetaAddress = metaAddress
def getStorageAddress = storageAddress
def getGraphAddress = graphAddress
def getTimeout = timeout
def getConnectionRetry = connectionRetry
Expand All @@ -44,6 +46,7 @@ object NebulaConnectionConfig {
private val LOG = LoggerFactory.getLogger(this.getClass)

protected var metaAddress: String = _
protected var storageAddress: String = ""
protected var graphAddress: String = _
protected var timeout: Int = 6000
protected var connectionRetry: Int = 1
Expand All @@ -64,6 +67,14 @@ object NebulaConnectionConfig {
this
}

/**
* set nebula storage server address, multi addresses is split by English comma
*/
def withStorageAddress(storageAddress: String): ConfigBuilder = {
this.storageAddress = storageAddress
this
}

/**
* set nebula graph server address, multi addresses is split by English comma
*/
Expand Down Expand Up @@ -189,6 +200,7 @@ object NebulaConnectionConfig {
def build(): NebulaConnectionConfig = {
check()
new NebulaConnectionConfig(metaAddress,
storageAddress,
graphAddress,
timeout,
connectionRetry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String])(
require(parameters.isDefinedAt(META_ADDRESS), s"Option '$META_ADDRESS' is required")
val metaAddress: String = parameters(META_ADDRESS)

/** storageAddress is optional */
val storageAddress: String = parameters.getOrElse(STORAGE_ADDRESS, "")

require(parameters.isDefinedAt(SPACE_NAME) && StringUtils.isNotBlank(parameters(SPACE_NAME)),
s"Option '$SPACE_NAME' is required and can not be blank")
val spaceName: String = parameters(SPACE_NAME)
Expand Down Expand Up @@ -188,6 +191,20 @@ class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String])(
hostPorts.toList
}

def getStorageAddress: List[Address] = {
val hostPorts: ListBuffer[Address] = new ListBuffer[Address]
if (storageAddress != "" && storageAddress != null) {
storageAddress
.split(",")
.foreach(hostPort => {
// check host & port by getting HostAndPort
val addr = HostAndPort.fromString(hostPort)
hostPorts.append((addr.getHostText, addr.getPort))
})
}
hostPorts.toList
}

def getGraphAddress: List[Address] = {
val hostPorts: ListBuffer[Address] = new ListBuffer[Address]
graphAddress
Expand All @@ -199,7 +216,6 @@ class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String])(
})
hostPorts.toList
}

}

class NebulaOptionsInWrite(@transient override val parameters: CaseInsensitiveMap[String])
Expand All @@ -208,11 +224,12 @@ class NebulaOptionsInWrite(@transient override val parameters: CaseInsensitiveMa
object NebulaOptions {

/** nebula common config */
val SPACE_NAME: String = "spaceName"
val META_ADDRESS: String = "metaAddress"
val GRAPH_ADDRESS: String = "graphAddress"
val TYPE: String = "type"
val LABEL: String = "label"
val SPACE_NAME: String = "spaceName"
val META_ADDRESS: String = "metaAddress"
val STORAGE_ADDRESS: String = "storageAddress"
val GRAPH_ADDRESS: String = "graphAddress"
val TYPE: String = "type"
val LABEL: String = "label"

/** connection config */
val TIMEOUT: String = "timeout"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ package object connector {
.option(NebulaOptions.NO_COLUMN, readConfig.getNoColumn)
.option(NebulaOptions.LIMIT, readConfig.getLimit)
.option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress)
.option(NebulaOptions.STORAGE_ADDRESS, connectionConfig.getStorageAddress)
.option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout)
.option(NebulaOptions.CONNECTION_RETRY, connectionConfig.getConnectionRetry)
.option(NebulaOptions.EXECUTION_RETRY, connectionConfig.getExecRetry)
Expand Down Expand Up @@ -153,6 +154,7 @@ package object connector {
.option(NebulaOptions.LIMIT, readConfig.getLimit)
.option(NebulaOptions.PARTITION_NUMBER, readConfig.getPartitionNum)
.option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress)
.option(NebulaOptions.STORAGE_ADDRESS, connectionConfig.getStorageAddress)
.option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout)
.option(NebulaOptions.CONNECTION_RETRY, connectionConfig.getConnectionRetry)
.option(NebulaOptions.EXECUTION_RETRY, connectionConfig.getExecRetry)
Expand Down Expand Up @@ -267,6 +269,7 @@ package object connector {
.option(NebulaOptions.VID_AS_PROP, writeConfig.getVidAsProp)
.option(NebulaOptions.WRITE_MODE, writeConfig.getWriteMode)
.option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress)
.option(NebulaOptions.STORAGE_ADDRESS, connectionConfig.getStorageAddress)
.option(NebulaOptions.GRAPH_ADDRESS, connectionConfig.getGraphAddress)
.option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout)
.option(NebulaOptions.CONNECTION_RETRY, connectionConfig.getConnectionRetry)
Expand Down Expand Up @@ -314,6 +317,7 @@ package object connector {
.option(NebulaOptions.RANK_AS_PROP, writeConfig.getRankAsProp)
.option(NebulaOptions.WRITE_MODE, writeConfig.getWriteMode)
.option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress)
.option(NebulaOptions.STORAGE_ADDRESS, connectionConfig.getStorageAddress)
.option(NebulaOptions.GRAPH_ADDRESS, connectionConfig.getGraphAddress)
.option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout)
.option(NebulaOptions.CONNECTION_RETRY, connectionConfig.getConnectionRetry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ abstract class NebulaPartitionReader extends InputPartitionReader[InternalRow] {
this.storageClient = new StorageClient(address.asJava, nebulaOptions.timeout)
}

if (nebulaOptions.getStorageAddress != null && nebulaOptions.getStorageAddress.size > 0) {
val storageAddress: ListBuffer[HostAddress] = new ListBuffer[HostAddress]
for (addr <- nebulaOptions.getStorageAddress) {
storageAddress.append(new HostAddress(addr._1, addr._2))
}
this.storageClient.setStorageAddresses(storageAddress.asJava)
}

if (!storageClient.connect()) {
throw new GraphConnectException("storage connect failed.")
}
Expand Down

0 comments on commit 1422a7e

Please sign in to comment.