diff --git a/example/pom.xml b/example/pom.xml index ade407f5..09bc124a 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -5,7 +5,7 @@ nebula-spark com.vesoft - 2.0-SNAPSHOT + 2.1.0 ../pom.xml 4.0.0 @@ -117,7 +117,7 @@ com.vesoft nebula-spark-connector - 2.0-SNAPSHOT + 2.1.0 - \ No newline at end of file + diff --git a/nebula-algorithm/pom.xml b/nebula-algorithm/pom.xml index a4b433e4..611c7ffc 100644 --- a/nebula-algorithm/pom.xml +++ b/nebula-algorithm/pom.xml @@ -6,7 +6,7 @@ nebula-spark com.vesoft - 2.0-SNAPSHOT + 2.1.0 ../pom.xml 4.0.0 @@ -15,7 +15,7 @@ 2.4.4 - 2.0-SNAPSHOT + 2.1.0 1.4.0 3.7.1 3.2.0 @@ -288,4 +288,4 @@ - \ No newline at end of file + diff --git a/nebula-exchange/pom.xml b/nebula-exchange/pom.xml index 13cd01ab..777ef448 100644 --- a/nebula-exchange/pom.xml +++ b/nebula-exchange/pom.xml @@ -5,7 +5,7 @@ nebula-spark com.vesoft - 2.0-SNAPSHOT + 2.1.0 ../pom.xml 4.0.0 @@ -28,7 +28,7 @@ 3.9.2 2.11.0-M4 3.7.1 - 2.0.0-SNAPSHOT + 2.0.0 1.0.0 2.4.5-M1 3.4.6 diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 21ffcc7c..9ddb6694 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -212,7 +212,6 @@ object Exchange { LOG.info(s"batchFailure.reimport: ${batchFailure.value}") } spark.close() - sys.exit(0) } /** diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/GraphProvider.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/GraphProvider.scala index 84b83076..6404df07 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/GraphProvider.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/GraphProvider.scala @@ -47,11 +47,11 @@ class GraphProvider(addresses: List[HostAndPort], timeout: Int) pool.close() } - def switchSpace(session: Session, space: String): Boolean = { + def switchSpace(session: Session, space: String): ResultSet = { val switchStatment = s"use $space" LOG.info(s"switch space $space") val result = submit(session, switchStatment) - result.isSucceeded + result } def submit(session: Session, statement: String): ResultSet = { diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/MetaProvider.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/MetaProvider.scala index b7ba6b52..9a2b7577 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/MetaProvider.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/MetaProvider.scala @@ -51,7 +51,7 @@ class MetaProvider(addresses: List[HostAndPort], timeout: Int, retry: Int) val columns = tagSchema.getColumns for (colDef <- columns.asScala) { - schema.put(new String(colDef.getName), colDef.getType.getType.getValue) + schema.put(new String(colDef.getName), colDef.getType.getType) } schema.toMap } @@ -62,7 +62,7 @@ class MetaProvider(addresses: List[HostAndPort], timeout: Int, retry: Int) val columns = edgeSchema.getColumns for (colDef <- columns.asScala) { - schema.put(new String(colDef.getName), colDef.getType.getType.getValue) + schema.put(new String(colDef.getName), colDef.getType.getType) } schema.toMap } diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala index d130eb58..4a2e8d8c 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala @@ -44,7 +44,7 @@ trait Processor extends Serializable { if (row.isNullAt(index)) return null - PropertyType.findByValue(fieldTypeMap(field)) match { + fieldTypeMap(field) match { case PropertyType.STRING | PropertyType.FIXED_STRING => { var value = row.get(index).toString if (value.equals(DEFAULT_EMPTY_VALUE)) { @@ -76,7 +76,7 @@ trait Processor extends Serializable { return nullVal } - PropertyType.findByValue(fieldTypeMap(field)) match { + fieldTypeMap(field) match { case PropertyType.UNKNOWN => throw new IllegalArgumentException("date type in nebula is UNKNOWN.") case PropertyType.STRING | PropertyType.FIXED_STRING => { diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/writer/ServerBaseWriter.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/writer/ServerBaseWriter.scala index e469dd8a..5608de0d 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/writer/ServerBaseWriter.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/writer/ServerBaseWriter.scala @@ -10,7 +10,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import com.google.common.base.Optional import com.google.common.util.concurrent.{FutureCallback, RateLimiter} -import com.vesoft.nebula.ErrorCode +import com.vesoft.nebula.graph.ErrorCode import com.vesoft.nebula.exchange.config.{ ConnectionConfigEntry, DataBaseConfigEntry, @@ -137,9 +137,9 @@ class NebulaGraphClientWriter(dataBaseConfigEntry: DataBaseConfigEntry, def prepare(): Unit = { val switchResult = graphProvider.switchSpace(session, dataBaseConfigEntry.space) - if (!switchResult) { + if (!switchResult.isSucceeded) { this.close() - throw new RuntimeException("Switch Failed") + throw new RuntimeException("Switch Failed for " + switchResult.getErrorMessage) } LOG.info(s"Connection to ${dataBaseConfigEntry.graphAddress}") @@ -206,15 +206,15 @@ class NebulaWriterCallback(latch: CountDownLatch, override def onSuccess(results: java.util.List[Optional[Integer]]): Unit = { if (pathAndOffset.isDefined) { - if (results.asScala.forall(_.get() == ErrorCode.SUCCEEDED.getValue)) + if (results.asScala.forall(_.get() == ErrorCode.SUCCEEDED)) HDFSUtils.saveContent(pathAndOffset.get._1, pathAndOffset.get._2.toString) else throw new RuntimeException( - s"Some error code: ${results.asScala.filter(_.get() != ErrorCode.SUCCEEDED.getValue).head} appear") + s"Some error code: ${results.asScala.filter(_.get() != ErrorCode.SUCCEEDED).head} appear") } for (result <- results.asScala) { latch.countDown() - if (result.get() == ErrorCode.SUCCEEDED.getValue) { + if (result.get() == ErrorCode.SUCCEEDED) { batchSuccess.add(1) } else { LOG.error(s"batch insert error with code ${result.get()}, batch size is ${results.size()}") diff --git a/nebula-exchange/src/test/scala/com/vesoft/nebula/exchange/processor/ProcessorSuite.scala b/nebula-exchange/src/test/scala/com/vesoft/nebula/exchange/processor/ProcessorSuite.scala index 6510e36f..b67e56a3 100644 --- a/nebula-exchange/src/test/scala/com/vesoft/nebula/exchange/processor/ProcessorSuite.scala +++ b/nebula-exchange/src/test/scala/com/vesoft/nebula/exchange/processor/ProcessorSuite.scala @@ -56,20 +56,20 @@ class ProcessorSuite extends Processor { )) val row = new GenericRowWithSchema(values.toArray, schema) val map = Map( - "col1" -> PropertyType.STRING.getValue, - "col2" -> PropertyType.FIXED_STRING.getValue, - "col3" -> PropertyType.INT8.getValue, - "col4" -> PropertyType.INT16.getValue, - "col5" -> PropertyType.INT32.getValue, - "col6" -> PropertyType.INT64.getValue, - "col7" -> PropertyType.DATE.getValue, - "col8" -> PropertyType.DATETIME.getValue, - "col9" -> PropertyType.TIME.getValue, - "col10" -> PropertyType.TIMESTAMP.getValue, - "col11" -> PropertyType.BOOL.getValue, - "col12" -> PropertyType.DOUBLE.getValue, - "col13" -> PropertyType.FLOAT.getValue, - "col14" -> PropertyType.STRING.getValue + "col1" -> PropertyType.STRING, + "col2" -> PropertyType.FIXED_STRING, + "col3" -> PropertyType.INT8, + "col4" -> PropertyType.INT16, + "col5" -> PropertyType.INT32, + "col6" -> PropertyType.INT64, + "col7" -> PropertyType.DATE, + "col8" -> PropertyType.DATETIME, + "col9" -> PropertyType.TIME, + "col10" -> PropertyType.TIMESTAMP, + "col11" -> PropertyType.BOOL, + "col12" -> PropertyType.DOUBLE, + "col13" -> PropertyType.FLOAT, + "col14" -> PropertyType.STRING ) @Test diff --git a/nebula-exchange/src/test/scala/com/vesoft/nebula/exchange/utils/NebulaUtilsSuite.scala b/nebula-exchange/src/test/scala/com/vesoft/nebula/exchange/utils/NebulaUtilsSuite.scala index e2e6f461..9628951a 100644 --- a/nebula-exchange/src/test/scala/com/vesoft/nebula/exchange/utils/NebulaUtilsSuite.scala +++ b/nebula-exchange/src/test/scala/com/vesoft/nebula/exchange/utils/NebulaUtilsSuite.scala @@ -89,18 +89,18 @@ class NebulaUtilsSuite { val map: Map[String, Int] = NebulaUtils.getDataSourceFieldType(sourceConfig, space, metaProvider) - assert(map("col1") == PropertyType.STRING.getValue) - assert(map("col2") == PropertyType.FIXED_STRING.getValue) - assert(map("col3") == PropertyType.INT8.getValue) - assert(map("col4") == PropertyType.INT16.getValue) - assert(map("col5") == PropertyType.INT32.getValue) - assert(map("col6") == PropertyType.INT64.getValue) - assert(map("col7") == PropertyType.DATE.getValue) - assert(map("col8") == PropertyType.DATETIME.getValue) - assert(map("col9") == PropertyType.TIMESTAMP.getValue) - assert(map("col10") == PropertyType.BOOL.getValue) - assert(map("col11") == PropertyType.DOUBLE.getValue) - assert(map("col12") == PropertyType.FLOAT.getValue) + assert(map("col1") == PropertyType.STRING) + assert(map("col2") == PropertyType.FIXED_STRING) + assert(map("col3") == PropertyType.INT8) + assert(map("col4") == PropertyType.INT16) + assert(map("col5") == PropertyType.INT32) + assert(map("col6") == PropertyType.INT64) + assert(map("col7") == PropertyType.DATE) + assert(map("col8") == PropertyType.DATETIME) + assert(map("col9") == PropertyType.TIMESTAMP) + assert(map("col10") == PropertyType.BOOL) + assert(map("col11") == PropertyType.DOUBLE) + assert(map("col12") == PropertyType.FLOAT) } @Test diff --git a/nebula-spark-connector/pom.xml b/nebula-spark-connector/pom.xml index 35854dfd..0cfd9453 100644 --- a/nebula-spark-connector/pom.xml +++ b/nebula-spark-connector/pom.xml @@ -5,7 +5,7 @@ nebula-spark com.vesoft - 2.0-SNAPSHOT + 2.1.0 ../pom.xml 4.0.0 @@ -14,7 +14,7 @@ 2.4.4 - 2.0.0-SNAPSHOT + 2.0.0 1.8 1.8 3.2.3 @@ -151,6 +151,29 @@ + + org.apache.maven.plugins + maven-surefire-plugin + + + **/*Test.* + **/*Suite.* + + + + + org.scalatest + scalatest-maven-plugin + 2.0.0 + + + test + + test + + + + org.apache.maven.plugins maven-javadoc-plugin @@ -179,4 +202,4 @@ - \ No newline at end of file + diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala index f20d58a2..12dac925 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala @@ -226,6 +226,8 @@ object WriteNebulaVertexConfig { || vidPolicy.equalsIgnoreCase(KeyPolicy.UUID.toString), "config vidPolicy is illegal, please don't set vidPolicy or set vidPolicy \"HASH\" or \"UUID\"" ) + assert(user != null && !user.isEmpty, "user is empty") + assert(passwd != null && !passwd.isEmpty, "passwd is empty") try { WriteMode.withName(writeMode.toLowerCase()) } catch { @@ -442,6 +444,8 @@ object WriteNebulaEdgeConfig { "config dstPolicy is illegal, please don't set dstPolicy or set dstPolicy \"HASH\" or \"UUID\"" ) assert(batch > 0, s"config batch must be positive, your batch is $batch.") + assert(user != null && !user.isEmpty, "user is empty") + assert(passwd != null && !passwd.isEmpty, "passwd is empty") try { WriteMode.withName(writeMode.toLowerCase) } catch { @@ -478,7 +482,6 @@ class ReadNebulaConfig(space: String, def getNoColumn = noColumn def getPartitionNum = partitionNum def getLimit = limit - // todo add filter } /** diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/nebula/MetaProvider.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/nebula/MetaProvider.scala index ab1abdea..8c062ba8 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/nebula/MetaProvider.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/nebula/MetaProvider.scala @@ -47,7 +47,7 @@ class MetaProvider(addresses: List[Address]) extends AutoCloseable { val columns = tagSchema.getColumns for (colDef <- columns.asScala) { - schema.put(new String(colDef.getName), colDef.getType.getType.getValue) + schema.put(new String(colDef.getName), colDef.getType.getType) } schema.toMap } @@ -58,7 +58,7 @@ class MetaProvider(addresses: List[Address]) extends AutoCloseable { val columns = edgeSchema.getColumns for (colDef <- columns.asScala) { - schema.put(new String(colDef.getName), colDef.getType.getType.getValue) + schema.put(new String(colDef.getName), colDef.getType.getType) } schema.toMap } diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaExecutor.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaExecutor.scala index c02c3366..01f5b601 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaExecutor.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/writer/NebulaExecutor.scala @@ -142,7 +142,7 @@ object NebulaExecutor { val propValue = record.get(index, types(index)) val fieldName = schema.fields(index).name - PropertyType.findByValue(fieldTypeMap(fieldName)) match { + fieldTypeMap(fieldName).toInt match { case PropertyType.STRING | PropertyType.FIXED_STRING => NebulaUtils.escapeUtil(propValue.toString).mkString("\"", "", "\"") case PropertyType.DATE => "date(\"" + propValue + "\")" diff --git a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/NebulaExecutorSuite.scala b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/NebulaExecutorSuite.scala index b9400823..c273a5b6 100644 --- a/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/NebulaExecutorSuite.scala +++ b/nebula-spark-connector/src/test/scala/com/vesoft/nebula/connector/writer/NebulaExecutorSuite.scala @@ -186,7 +186,7 @@ class NebulaExecutorSuite extends AnyFunSuite with BeforeAndAfterAll { NebulaExecutor.toUpdateExecuteStatement("person", propNames, nebulaVertex) val expectVertexUpdate = "UPDATE VERTEX ON `person` \"vid1\" SET `col_string`=\"name\",`col_fixed_string`=\"name\"," + - "`col_bool`=true,`col_int`=10,`col_int64`=100,`col_double`=1.0,`col_date`=2021-11-12;" + "`col_bool`=true,`col_int`=10,`col_int64`=100,`col_double`=1.0,`col_date`=2021-11-12" assert(expectVertexUpdate.equals(updateVertexStatement)) } @@ -209,7 +209,7 @@ class NebulaExecutorSuite extends AnyFunSuite with BeforeAndAfterAll { val expectEdgeUpdate = "UPDATE EDGE ON `friend` \"source\"->\"target\"@0 SET `col_string`=\"name\"," + "`col_fixed_string`=\"name\",`col_bool`=true,`col_int`=10,`col_int64`=100," + - "`col_double`=1.0,`col_date`=2021-11-12;" + "`col_double`=1.0,`col_date`=2021-11-12" assert(expectEdgeUpdate.equals(updateEdgeStatement)) } } diff --git a/pom.xml b/pom.xml index ebbd7ca1..99bce29f 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ com.vesoft nebula-spark pom - 2.0-SNAPSHOT + 2.1.0 UTF-8