Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

with storage address #47

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,31 @@ object NebulaSparkReaderExample {
println("vertex count: " + vertex.count())
}
}
/**
* read Nebula vertex with storaged addresses manually specified.
*/

def readVertexGraph(spark: SparkSession): Unit = {
LOG.info("start to read graphx vertex")
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("127.0.0.1:9559")
.withStorageAddress("127.0.0.1:9779")
.withTimeout(6000)
.withConenctionRetry(2)
.build()
val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("test")
.withLabel("person")
.withNoColumn(false)
.withReturnCols(List("birthday"))
.withLimit(10)
.withPartitionNum(10)
.build()

val vertexRDD = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToGraphx()
LOG.info("vertex rdd first record: " + vertexRDD.first())
LOG.info("vertex rdd count: {}", vertexRDD.count())
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.slf4j.{Logger, LoggerFactory}
import scala.collection.mutable.ListBuffer

class NebulaConnectionConfig(metaAddress: String,
storageAddress: String = "",
graphAddress: String,
timeout: Int,
connectionRetry: Int,
Expand All @@ -23,6 +24,7 @@ class NebulaConnectionConfig(metaAddress: String,
selfSignParam: SelfSSLSignParams)
extends Serializable {
def getMetaAddress = metaAddress
def getStorageAddress = storageAddress
def getGraphAddress = graphAddress
def getTimeout = timeout
def getConnectionRetry = connectionRetry
Expand All @@ -44,6 +46,7 @@ object NebulaConnectionConfig {
private val LOG = LoggerFactory.getLogger(this.getClass)

protected var metaAddress: String = _
protected var storageAddress: String = ""
protected var graphAddress: String = _
protected var timeout: Int = 6000
protected var connectionRetry: Int = 1
Expand All @@ -64,6 +67,14 @@ object NebulaConnectionConfig {
this
}

/**
* set nebula storage server address, multi addresses is split by English comma
*/
def withStorageAddress(storageAddress: String): ConfigBuilder = {
this.storageAddress = storageAddress
this
}

/**
* set nebula graph server address, multi addresses is split by English comma
*/
Expand Down Expand Up @@ -189,6 +200,7 @@ object NebulaConnectionConfig {
def build(): NebulaConnectionConfig = {
check()
new NebulaConnectionConfig(metaAddress,
storageAddress,
graphAddress,
timeout,
connectionRetry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String])(
require(parameters.isDefinedAt(META_ADDRESS), s"Option '$META_ADDRESS' is required")
val metaAddress: String = parameters(META_ADDRESS)

/** storageAddress is optional */
val storageAddress: String = parameters.getOrElse(STORAGE_ADDRESS, "")

require(parameters.isDefinedAt(SPACE_NAME) && StringUtils.isNotBlank(parameters(SPACE_NAME)),
s"Option '$SPACE_NAME' is required and can not be blank")
val spaceName: String = parameters(SPACE_NAME)
Expand Down Expand Up @@ -188,6 +191,20 @@ class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String])(
hostPorts.toList
}

def getStorageAddress: List[Address] = {
val hostPorts: ListBuffer[Address] = new ListBuffer[Address]
if (storageAddress != "" && storageAddress != null) {
storageAddress
.split(",")
.foreach(hostPort => {
// check host & port by getting HostAndPort
val addr = HostAndPort.fromString(hostPort)
hostPorts.append((addr.getHostText, addr.getPort))
})
}
hostPorts.toList
}

def getGraphAddress: List[Address] = {
val hostPorts: ListBuffer[Address] = new ListBuffer[Address]
graphAddress
Expand All @@ -199,7 +216,6 @@ class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String])(
})
hostPorts.toList
}

}

class NebulaOptionsInWrite(@transient override val parameters: CaseInsensitiveMap[String])
Expand All @@ -208,11 +224,12 @@ class NebulaOptionsInWrite(@transient override val parameters: CaseInsensitiveMa
object NebulaOptions {

/** nebula common config */
val SPACE_NAME: String = "spaceName"
val META_ADDRESS: String = "metaAddress"
val GRAPH_ADDRESS: String = "graphAddress"
val TYPE: String = "type"
val LABEL: String = "label"
val SPACE_NAME: String = "spaceName"
val META_ADDRESS: String = "metaAddress"
val STORAGE_ADDRESS: String = "storageAddress"
val GRAPH_ADDRESS: String = "graphAddress"
val TYPE: String = "type"
val LABEL: String = "label"

/** connection config */
val TIMEOUT: String = "timeout"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ package object connector {
.option(NebulaOptions.NO_COLUMN, readConfig.getNoColumn)
.option(NebulaOptions.LIMIT, readConfig.getLimit)
.option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress)
.option(NebulaOptions.STORAGE_ADDRESS, connectionConfig.getStorageAddress)
.option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout)
.option(NebulaOptions.CONNECTION_RETRY, connectionConfig.getConnectionRetry)
.option(NebulaOptions.EXECUTION_RETRY, connectionConfig.getExecRetry)
Expand Down Expand Up @@ -153,6 +154,7 @@ package object connector {
.option(NebulaOptions.LIMIT, readConfig.getLimit)
.option(NebulaOptions.PARTITION_NUMBER, readConfig.getPartitionNum)
.option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress)
.option(NebulaOptions.STORAGE_ADDRESS, connectionConfig.getStorageAddress)
.option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout)
.option(NebulaOptions.CONNECTION_RETRY, connectionConfig.getConnectionRetry)
.option(NebulaOptions.EXECUTION_RETRY, connectionConfig.getExecRetry)
Expand Down Expand Up @@ -267,6 +269,7 @@ package object connector {
.option(NebulaOptions.VID_AS_PROP, writeConfig.getVidAsProp)
.option(NebulaOptions.WRITE_MODE, writeConfig.getWriteMode)
.option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress)
.option(NebulaOptions.STORAGE_ADDRESS, connectionConfig.getStorageAddress)
.option(NebulaOptions.GRAPH_ADDRESS, connectionConfig.getGraphAddress)
.option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout)
.option(NebulaOptions.CONNECTION_RETRY, connectionConfig.getConnectionRetry)
Expand Down Expand Up @@ -314,6 +317,7 @@ package object connector {
.option(NebulaOptions.RANK_AS_PROP, writeConfig.getRankAsProp)
.option(NebulaOptions.WRITE_MODE, writeConfig.getWriteMode)
.option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress)
.option(NebulaOptions.STORAGE_ADDRESS, connectionConfig.getStorageAddress)
.option(NebulaOptions.GRAPH_ADDRESS, connectionConfig.getGraphAddress)
.option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout)
.option(NebulaOptions.CONNECTION_RETRY, connectionConfig.getConnectionRetry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ abstract class NebulaPartitionReader extends InputPartitionReader[InternalRow] {
this.storageClient = new StorageClient(address.asJava, nebulaOptions.timeout)
}

if (nebulaOptions.getStorageAddress != null && nebulaOptions.getStorageAddress.size > 0) {
val storageAddress: ListBuffer[HostAddress] = new ListBuffer[HostAddress]
for (addr <- nebulaOptions.getStorageAddress) {
storageAddress.append(new HostAddress(addr._1, addr._2))
}
this.storageClient.setStorageAddresses(storageAddress.asJava)
}

if (!storageClient.connect()) {
throw new GraphConnectException("storage connect failed.")
}
Expand Down