From ae78a44fd6ed048a142a6339f61a550cd435d838 Mon Sep 17 00:00:00 2001 From: Nicole00 Date: Tue, 7 Dec 2021 17:18:52 +0800 Subject: [PATCH] fix the microsec process --- .../nebula/exchange/processor/Processor.scala | 51 ++++++++++++------- .../exchange/processor/ProcessorSuite.scala | 28 +++++++--- 2 files changed, 52 insertions(+), 27 deletions(-) diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala index f33faf31..a33964c2 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala @@ -1,4 +1,3 @@ - /* Copyright (c) 2020 vesoft inc. All rights reserved. * * This source code is licensed under Apache 2.0 License. @@ -6,7 +5,18 @@ package com.vesoft.nebula.exchange.processor -import com.vesoft.nebula.{Date, DateTime, NullType, Time, Value, Geography, Coordinate, Point, LineString, Polygon} +import com.vesoft.nebula.{ + Date, + DateTime, + NullType, + Time, + Value, + Geography, + Coordinate, + Point, + LineString, + Polygon +} import com.vesoft.nebula.exchange.utils.NebulaUtils.DEFAULT_EMPTY_VALUE import com.vesoft.nebula.exchange.utils.{HDFSUtils, NebulaUtils} import com.vesoft.nebula.meta.PropertyType @@ -14,6 +24,7 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.types.{IntegerType, LongType, StringType} import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer + /** * processor is a converter. * It is responsible for converting the dataframe row data into Nebula Graph's vertex or edge, @@ -35,7 +46,7 @@ trait Processor extends Serializable { * eg: convert attribute value 2020-01-01 to date("2020-01-01") * * Time type: add time() function for attribute value. - * eg: convert attribute value 12:12:12:1111 to time("12:12:12:1111") + * eg: convert attribute value 12:12:12.1111 to time("12:12:12.1111") * * DataTime type: add datetime() function for attribute value. * eg: convert attribute value 2020-01-01T22:30:40 to datetime("2020-01-01T22:30:40") @@ -98,10 +109,10 @@ trait Processor extends Serializable { throw new UnsupportedOperationException( s"wrong format for time value: ${row.get(index)}, correct format is 12:00:00:0000") } - new Time(values(0).toByte, - values(1).toByte, - values(2).toByte, - if (values.length > 3) values(3).toInt else 0) + val secs: Array[String] = values(2).split("\\.") + val sec: Byte = secs(0).toByte + val microSec: Int = if (secs.length == 2) secs(1).toInt else 0 + new Time(values(0).toByte, values(1).toByte, sec, microSec) } case PropertyType.DATE => { val values = row.get(index).toString.split("-") @@ -121,13 +132,13 @@ trait Processor extends Serializable { } else { throw new UnsupportedOperationException( s"wrong format for datetime value: $rowValue, " + - s"correct format is 2020-01-01T12:00:00:0000 or 2020-01-01 12:00:00:0000") + s"correct format is 2020-01-01T12:00:00.0000 or 2020-01-01 12:00:00.0000") } if (dateTimeValue.size < 2) { throw new UnsupportedOperationException( s"wrong format for datetime value: $rowValue, " + - s"correct format is 2020-01-01T12:00:00:0000 or 2020-01-01 12:00:00:0000") + s"correct format is 2020-01-01T12:00:00.0000 or 2020-01-01 12:00:00.0000") } val dateValues = dateTimeValue(0).split("-") @@ -136,16 +147,18 @@ trait Processor extends Serializable { if (dateValues.size < 3 || timeValues.size < 3) { throw new UnsupportedOperationException( s"wrong format for datetime value: $rowValue, " + - s"correct format is 2020-01-01T12:00:00:0000 or 2020-01-01 12:00:00") + s"correct format is 2020-01-01T12:00:00.0000 or 2020-01-01 12:00:00") } - val microsec: Int = if (timeValues.size == 4) timeValues(3).toInt else 0 + val secs: Array[String] = timeValues(2).split("\\.") + val sec: Byte = secs(0).toByte + val microsec: Int = if (secs.length == 2) secs(1).toInt else 0 new DateTime(dateValues(0).toShort, dateValues(1).toByte, dateValues(2).toByte, timeValues(0).toByte, timeValues(1).toByte, - timeValues(2).toByte, + sec, microsec) } case PropertyType.TIMESTAMP => { @@ -157,7 +170,7 @@ trait Processor extends Serializable { row.get(index).toString.toLong } case PropertyType.GEOGRAPHY => { - val wkt = row.get(index).toString + val wkt = row.get(index).toString val jtsGeom = new org.locationtech.jts.io.WKTReader().read(wkt) convertJTSGeometryToGeography(jtsGeom) } @@ -186,18 +199,18 @@ trait Processor extends Serializable { } case "LineString" => { val jtsLineString = jtsGeom.asInstanceOf[org.locationtech.jts.geom.LineString] - val jtsCoordList = jtsLineString.getCoordinates - val coordList = new ListBuffer[Coordinate]() + val jtsCoordList = jtsLineString.getCoordinates + val coordList = new ListBuffer[Coordinate]() for (jtsCoord <- jtsCoordList) { coordList += new Coordinate(jtsCoord.x, jtsCoord.y) } Geography.lsVal(new LineString(coordList.asJava)) } case "Polygon" => { - val jtsPolygon = jtsGeom.asInstanceOf[org.locationtech.jts.geom.Polygon] + val jtsPolygon = jtsGeom.asInstanceOf[org.locationtech.jts.geom.Polygon] val coordListList = new java.util.ArrayList[java.util.List[Coordinate]]() - val jtsShell = jtsPolygon.getExteriorRing - val coordList = new ListBuffer[Coordinate]() + val jtsShell = jtsPolygon.getExteriorRing + val coordList = new ListBuffer[Coordinate]() for (jtsCoord <- jtsShell.getCoordinates) { coordList += new Coordinate(jtsCoord.x, jtsCoord.y) } @@ -206,7 +219,7 @@ trait Processor extends Serializable { val jtsHolesNum = jtsPolygon.getNumInteriorRing for (i <- 0 until jtsHolesNum) { val coordList = new ListBuffer[Coordinate]() - val jtsHole = jtsPolygon.getInteriorRingN(i) + val jtsHole = jtsPolygon.getInteriorRingN(i) for (jtsCoord <- jtsHole.getCoordinates) { coordList += new Coordinate(jtsCoord.x, jtsCoord.y) } diff --git a/nebula-exchange/src/test/scala/com/vesoft/nebula/exchange/processor/ProcessorSuite.scala b/nebula-exchange/src/test/scala/com/vesoft/nebula/exchange/processor/ProcessorSuite.scala index a4c0d539..87e709e9 100644 --- a/nebula-exchange/src/test/scala/com/vesoft/nebula/exchange/processor/ProcessorSuite.scala +++ b/nebula-exchange/src/test/scala/com/vesoft/nebula/exchange/processor/ProcessorSuite.scala @@ -6,7 +6,18 @@ package scala.com.vesoft.nebula.exchange.processor import com.vesoft.nebula.exchange.processor.Processor -import com.vesoft.nebula.{Date, DateTime, NullType, Time, Value, Geography, Coordinate, Point, LineString, Polygon} +import com.vesoft.nebula.{ + Date, + DateTime, + NullType, + Time, + Value, + Geography, + Coordinate, + Point, + LineString, + Polygon +} import com.vesoft.nebula.meta.PropertyType import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.types.{ @@ -30,8 +41,8 @@ class ProcessorSuite extends Processor { 1000, 100000, "2021-01-01", - "2021-01-01T12:00:00", - "12:00:00", + "2021-01-01T12:00:00.100", + "12:00:00.100", "2021-01-01T12:00:00", true, 12.01, @@ -92,8 +103,9 @@ class ProcessorSuite extends Processor { assert(extraValueForClient(row, "col6", map).toString.toLong == 100000) assert(extraValueForClient(row, "col7", map).toString.equals("date(\"2021-01-01\")")) assert( - extraValueForClient(row, "col8", map).toString.equals("datetime(\"2021-01-01T12:00:00\")")) - assert(extraValueForClient(row, "col9", map).toString.equals("time(\"12:00:00\")")) + extraValueForClient(row, "col8", map).toString + .equals("datetime(\"2021-01-01T12:00:00.100\")")) + assert(extraValueForClient(row, "col9", map).toString.equals("time(\"12:00:00.100\")")) assert( extraValueForClient(row, "col10", map).toString.equals("timestamp(\"2021-01-01T12:00:00\")")) assert(extraValueForClient(row, "col11", map).toString.toBoolean) @@ -120,10 +132,10 @@ class ProcessorSuite extends Processor { assert(extraValueForSST(row, "col6", map).toString.toLong == 100000) val date = new Date(2021, 1, 1) assert(extraValueForSST(row, "col7", map).equals(date)) - val datetime = new DateTime(2021, 1, 1, 12, 0, 0, 0) + val datetime = new DateTime(2021, 1, 1, 12, 0, 0, 100) assert(extraValueForSST(row, "col8", map).equals(datetime)) - val time = new Time(12, 0, 0, 0) + val time = new Time(12, 0, 0, 100) assert(extraValueForSST(row, "col9", map).equals(time)) try { @@ -141,7 +153,7 @@ class ProcessorSuite extends Processor { assert(extraValueForSST(row, "col14", map).equals(nullValue)) // POINT(3 8) - val geogPoint = Geography.ptVal(new Point(new Coordinate(3, 8))) + val geogPoint = Geography.ptVal(new Point(new Coordinate(3, 8))) val geogPointExpect = extraValueForSST(row, "col15", map) assert(geogPointExpect.equals(geogPoint))