Skip to content
This repository has been archived by the owner on Oct 18, 2021. It is now read-only.

Commit

Permalink
sync new features
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicole00 committed Jul 28, 2021
1 parent b5f60c1 commit 0335a41
Show file tree
Hide file tree
Showing 16 changed files with 82 additions and 57 deletions.
6 changes: 3 additions & 3 deletions example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>nebula-spark</artifactId>
<groupId>com.vesoft</groupId>
<version>2.0-SNAPSHOT</version>
<version>2.1.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down Expand Up @@ -117,7 +117,7 @@
<dependency>
<groupId>com.vesoft</groupId>
<artifactId>nebula-spark-connector</artifactId>
<version>2.0-SNAPSHOT</version>
<version>2.1.0</version>
</dependency>
</dependencies>
</project>
</project>
6 changes: 3 additions & 3 deletions nebula-algorithm/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<artifactId>nebula-spark</artifactId>
<groupId>com.vesoft</groupId>
<version>2.0-SNAPSHOT</version>
<version>2.1.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand All @@ -15,7 +15,7 @@

<properties>
<spark.version>2.4.4</spark.version>
<nebula.version>2.0-SNAPSHOT</nebula.version>
<nebula.version>2.1.0</nebula.version>
<config.version>1.4.0</config.version>
<scopt.version>3.7.1</scopt.version>
<scalatest.version>3.2.0</scalatest.version>
Expand Down Expand Up @@ -288,4 +288,4 @@
</plugins>
</build>

</project>
</project>
4 changes: 2 additions & 2 deletions nebula-exchange/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>nebula-spark</artifactId>
<groupId>com.vesoft</groupId>
<version>2.0-SNAPSHOT</version>
<version>2.1.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand All @@ -28,7 +28,7 @@
<scala-logging.version>3.9.2</scala-logging.version>
<scala-xml.version>2.11.0-M4</scala-xml.version>
<scopt.version>3.7.1</scopt.version>
<nebula.version>2.0.0-SNAPSHOT</nebula.version>
<nebula.version>2.0.0</nebula.version>
<s2.version>1.0.0</s2.version>
<neo.version>2.4.5-M1</neo.version>
<gremlin.version>3.4.6</gremlin.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ object Exchange {
LOG.info(s"batchFailure.reimport: ${batchFailure.value}")
}
spark.close()
sys.exit(0)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ class GraphProvider(addresses: List[HostAndPort], timeout: Int)
pool.close()
}

def switchSpace(session: Session, space: String): Boolean = {
def switchSpace(session: Session, space: String): ResultSet = {
val switchStatment = s"use $space"
LOG.info(s"switch space $space")
val result = submit(session, switchStatment)
result.isSucceeded
result
}

def submit(session: Session, statement: String): ResultSet = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class MetaProvider(addresses: List[HostAndPort], timeout: Int, retry: Int)

val columns = tagSchema.getColumns
for (colDef <- columns.asScala) {
schema.put(new String(colDef.getName), colDef.getType.getType.getValue)
schema.put(new String(colDef.getName), colDef.getType.getType)
}
schema.toMap
}
Expand All @@ -62,7 +62,7 @@ class MetaProvider(addresses: List[HostAndPort], timeout: Int, retry: Int)

val columns = edgeSchema.getColumns
for (colDef <- columns.asScala) {
schema.put(new String(colDef.getName), colDef.getType.getType.getValue)
schema.put(new String(colDef.getName), colDef.getType.getType)
}
schema.toMap
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ trait Processor extends Serializable {

if (row.isNullAt(index)) return null

PropertyType.findByValue(fieldTypeMap(field)) match {
fieldTypeMap(field) match {
case PropertyType.STRING | PropertyType.FIXED_STRING => {
var value = row.get(index).toString
if (value.equals(DEFAULT_EMPTY_VALUE)) {
Expand Down Expand Up @@ -76,7 +76,7 @@ trait Processor extends Serializable {
return nullVal
}

PropertyType.findByValue(fieldTypeMap(field)) match {
fieldTypeMap(field) match {
case PropertyType.UNKNOWN =>
throw new IllegalArgumentException("date type in nebula is UNKNOWN.")
case PropertyType.STRING | PropertyType.FIXED_STRING => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}

import com.google.common.base.Optional
import com.google.common.util.concurrent.{FutureCallback, RateLimiter}
import com.vesoft.nebula.ErrorCode
import com.vesoft.nebula.graph.ErrorCode
import com.vesoft.nebula.exchange.config.{
ConnectionConfigEntry,
DataBaseConfigEntry,
Expand Down Expand Up @@ -137,9 +137,9 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry,

def prepare(): Unit = {
val switchResult = graphProvider.switchSpace(session, dataBaseConfigEntry.space)
if (!switchResult) {
if (!switchResult.isSucceeded) {
this.close()
throw new RuntimeException("Switch Failed")
throw new RuntimeException("Switch Failed for " + switchResult.getErrorMessage)
}

LOG.info(s"Connection to ${dataBaseConfigEntry.graphAddress}")
Expand Down Expand Up @@ -206,15 +206,15 @@ class NebulaWriterCallback(latch: CountDownLatch,

override def onSuccess(results: java.util.List[Optional[Integer]]): Unit = {
if (pathAndOffset.isDefined) {
if (results.asScala.forall(_.get() == ErrorCode.SUCCEEDED.getValue))
if (results.asScala.forall(_.get() == ErrorCode.SUCCEEDED))
HDFSUtils.saveContent(pathAndOffset.get._1, pathAndOffset.get._2.toString)
else
throw new RuntimeException(
s"Some error code: ${results.asScala.filter(_.get() != ErrorCode.SUCCEEDED.getValue).head} appear")
s"Some error code: ${results.asScala.filter(_.get() != ErrorCode.SUCCEEDED).head} appear")
}
for (result <- results.asScala) {
latch.countDown()
if (result.get() == ErrorCode.SUCCEEDED.getValue) {
if (result.get() == ErrorCode.SUCCEEDED) {
batchSuccess.add(1)
} else {
LOG.error(s"batch insert error with code ${result.get()}, batch size is ${results.size()}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,20 @@ class ProcessorSuite extends Processor {
))
val row = new GenericRowWithSchema(values.toArray, schema)
val map = Map(
"col1" -> PropertyType.STRING.getValue,
"col2" -> PropertyType.FIXED_STRING.getValue,
"col3" -> PropertyType.INT8.getValue,
"col4" -> PropertyType.INT16.getValue,
"col5" -> PropertyType.INT32.getValue,
"col6" -> PropertyType.INT64.getValue,
"col7" -> PropertyType.DATE.getValue,
"col8" -> PropertyType.DATETIME.getValue,
"col9" -> PropertyType.TIME.getValue,
"col10" -> PropertyType.TIMESTAMP.getValue,
"col11" -> PropertyType.BOOL.getValue,
"col12" -> PropertyType.DOUBLE.getValue,
"col13" -> PropertyType.FLOAT.getValue,
"col14" -> PropertyType.STRING.getValue
"col1" -> PropertyType.STRING,
"col2" -> PropertyType.FIXED_STRING,
"col3" -> PropertyType.INT8,
"col4" -> PropertyType.INT16,
"col5" -> PropertyType.INT32,
"col6" -> PropertyType.INT64,
"col7" -> PropertyType.DATE,
"col8" -> PropertyType.DATETIME,
"col9" -> PropertyType.TIME,
"col10" -> PropertyType.TIMESTAMP,
"col11" -> PropertyType.BOOL,
"col12" -> PropertyType.DOUBLE,
"col13" -> PropertyType.FLOAT,
"col14" -> PropertyType.STRING
)

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,18 @@ class NebulaUtilsSuite {

val map: Map[String, Int] =
NebulaUtils.getDataSourceFieldType(sourceConfig, space, metaProvider)
assert(map("col1") == PropertyType.STRING.getValue)
assert(map("col2") == PropertyType.FIXED_STRING.getValue)
assert(map("col3") == PropertyType.INT8.getValue)
assert(map("col4") == PropertyType.INT16.getValue)
assert(map("col5") == PropertyType.INT32.getValue)
assert(map("col6") == PropertyType.INT64.getValue)
assert(map("col7") == PropertyType.DATE.getValue)
assert(map("col8") == PropertyType.DATETIME.getValue)
assert(map("col9") == PropertyType.TIMESTAMP.getValue)
assert(map("col10") == PropertyType.BOOL.getValue)
assert(map("col11") == PropertyType.DOUBLE.getValue)
assert(map("col12") == PropertyType.FLOAT.getValue)
assert(map("col1") == PropertyType.STRING)
assert(map("col2") == PropertyType.FIXED_STRING)
assert(map("col3") == PropertyType.INT8)
assert(map("col4") == PropertyType.INT16)
assert(map("col5") == PropertyType.INT32)
assert(map("col6") == PropertyType.INT64)
assert(map("col7") == PropertyType.DATE)
assert(map("col8") == PropertyType.DATETIME)
assert(map("col9") == PropertyType.TIMESTAMP)
assert(map("col10") == PropertyType.BOOL)
assert(map("col11") == PropertyType.DOUBLE)
assert(map("col12") == PropertyType.FLOAT)
}

@Test
Expand Down
29 changes: 26 additions & 3 deletions nebula-spark-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>nebula-spark</artifactId>
<groupId>com.vesoft</groupId>
<version>2.0-SNAPSHOT</version>
<version>2.1.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand All @@ -14,7 +14,7 @@

<properties>
<spark.version>2.4.4</spark.version>
<nebula.version>2.0.0-SNAPSHOT</nebula.version>
<nebula.version>2.0.0</nebula.version>
<compiler.source.version>1.8</compiler.source.version>
<compiler.target.version>1.8</compiler.target.version>
<scalatest.version>3.2.3</scalatest.version>
Expand Down Expand Up @@ -151,6 +151,29 @@
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>2.0.0</version>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
Expand Down Expand Up @@ -179,4 +202,4 @@
</plugins>
</build>

</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ object WriteNebulaVertexConfig {
|| vidPolicy.equalsIgnoreCase(KeyPolicy.UUID.toString),
"config vidPolicy is illegal, please don't set vidPolicy or set vidPolicy \"HASH\" or \"UUID\""
)
assert(user != null && !user.isEmpty, "user is empty")
assert(passwd != null && !passwd.isEmpty, "passwd is empty")
try {
WriteMode.withName(writeMode.toLowerCase())
} catch {
Expand Down Expand Up @@ -442,6 +444,8 @@ object WriteNebulaEdgeConfig {
"config dstPolicy is illegal, please don't set dstPolicy or set dstPolicy \"HASH\" or \"UUID\""
)
assert(batch > 0, s"config batch must be positive, your batch is $batch.")
assert(user != null && !user.isEmpty, "user is empty")
assert(passwd != null && !passwd.isEmpty, "passwd is empty")
try {
WriteMode.withName(writeMode.toLowerCase)
} catch {
Expand Down Expand Up @@ -478,7 +482,6 @@ class ReadNebulaConfig(space: String,
def getNoColumn = noColumn
def getPartitionNum = partitionNum
def getLimit = limit
// todo add filter
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class MetaProvider(addresses: List[Address]) extends AutoCloseable {

val columns = tagSchema.getColumns
for (colDef <- columns.asScala) {
schema.put(new String(colDef.getName), colDef.getType.getType.getValue)
schema.put(new String(colDef.getName), colDef.getType.getType)
}
schema.toMap
}
Expand All @@ -58,7 +58,7 @@ class MetaProvider(addresses: List[Address]) extends AutoCloseable {

val columns = edgeSchema.getColumns
for (colDef <- columns.asScala) {
schema.put(new String(colDef.getName), colDef.getType.getType.getValue)
schema.put(new String(colDef.getName), colDef.getType.getType)
}
schema.toMap
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ object NebulaExecutor {
val propValue = record.get(index, types(index))

val fieldName = schema.fields(index).name
PropertyType.findByValue(fieldTypeMap(fieldName)) match {
fieldTypeMap(fieldName).toInt match {
case PropertyType.STRING | PropertyType.FIXED_STRING =>
NebulaUtils.escapeUtil(propValue.toString).mkString("\"", "", "\"")
case PropertyType.DATE => "date(\"" + propValue + "\")"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class NebulaExecutorSuite extends AnyFunSuite with BeforeAndAfterAll {
NebulaExecutor.toUpdateExecuteStatement("person", propNames, nebulaVertex)
val expectVertexUpdate =
"UPDATE VERTEX ON `person` \"vid1\" SET `col_string`=\"name\",`col_fixed_string`=\"name\"," +
"`col_bool`=true,`col_int`=10,`col_int64`=100,`col_double`=1.0,`col_date`=2021-11-12;"
"`col_bool`=true,`col_int`=10,`col_int64`=100,`col_double`=1.0,`col_date`=2021-11-12"
assert(expectVertexUpdate.equals(updateVertexStatement))
}

Expand All @@ -209,7 +209,7 @@ class NebulaExecutorSuite extends AnyFunSuite with BeforeAndAfterAll {
val expectEdgeUpdate =
"UPDATE EDGE ON `friend` \"source\"->\"target\"@0 SET `col_string`=\"name\"," +
"`col_fixed_string`=\"name\",`col_bool`=true,`col_int`=10,`col_int64`=100," +
"`col_double`=1.0,`col_date`=2021-11-12;"
"`col_double`=1.0,`col_date`=2021-11-12"
assert(expectEdgeUpdate.equals(updateEdgeStatement))
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<groupId>com.vesoft</groupId>
<artifactId>nebula-spark</artifactId>
<packaging>pom</packaging>
<version>2.0-SNAPSHOT</version>
<version>2.1.0</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down

0 comments on commit 0335a41

Please sign in to comment.