Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix the microsec process for datetime #34

Merged
merged 1 commit into from
Dec 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,19 +1,30 @@

/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.exchange.processor

import com.vesoft.nebula.{Date, DateTime, NullType, Time, Value, Geography, Coordinate, Point, LineString, Polygon}
import com.vesoft.nebula.{
Date,
DateTime,
NullType,
Time,
Value,
Geography,
Coordinate,
Point,
LineString,
Polygon
}
import com.vesoft.nebula.exchange.utils.NebulaUtils.DEFAULT_EMPTY_VALUE
import com.vesoft.nebula.exchange.utils.{HDFSUtils, NebulaUtils}
import com.vesoft.nebula.meta.PropertyType
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{IntegerType, LongType, StringType}
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer

/**
* processor is a converter.
* It is responsible for converting the dataframe row data into Nebula Graph's vertex or edge,
Expand All @@ -35,7 +46,7 @@ trait Processor extends Serializable {
* eg: convert attribute value 2020-01-01 to date("2020-01-01")
*
* Time type: add time() function for attribute value.
* eg: convert attribute value 12:12:12:1111 to time("12:12:12:1111")
* eg: convert attribute value 12:12:12.1111 to time("12:12:12.1111")
*
* DataTime type: add datetime() function for attribute value.
* eg: convert attribute value 2020-01-01T22:30:40 to datetime("2020-01-01T22:30:40")
Expand Down Expand Up @@ -98,10 +109,10 @@ trait Processor extends Serializable {
throw new UnsupportedOperationException(
s"wrong format for time value: ${row.get(index)}, correct format is 12:00:00:0000")
}
new Time(values(0).toByte,
values(1).toByte,
values(2).toByte,
if (values.length > 3) values(3).toInt else 0)
val secs: Array[String] = values(2).split("\\.")
val sec: Byte = secs(0).toByte
val microSec: Int = if (secs.length == 2) secs(1).toInt else 0
new Time(values(0).toByte, values(1).toByte, sec, microSec)
}
case PropertyType.DATE => {
val values = row.get(index).toString.split("-")
Expand All @@ -121,13 +132,13 @@ trait Processor extends Serializable {
} else {
throw new UnsupportedOperationException(
s"wrong format for datetime value: $rowValue, " +
s"correct format is 2020-01-01T12:00:00:0000 or 2020-01-01 12:00:00:0000")
s"correct format is 2020-01-01T12:00:00.0000 or 2020-01-01 12:00:00.0000")
}

if (dateTimeValue.size < 2) {
throw new UnsupportedOperationException(
s"wrong format for datetime value: $rowValue, " +
s"correct format is 2020-01-01T12:00:00:0000 or 2020-01-01 12:00:00:0000")
s"correct format is 2020-01-01T12:00:00.0000 or 2020-01-01 12:00:00.0000")
}

val dateValues = dateTimeValue(0).split("-")
Expand All @@ -136,16 +147,18 @@ trait Processor extends Serializable {
if (dateValues.size < 3 || timeValues.size < 3) {
throw new UnsupportedOperationException(
s"wrong format for datetime value: $rowValue, " +
s"correct format is 2020-01-01T12:00:00:0000 or 2020-01-01 12:00:00")
s"correct format is 2020-01-01T12:00:00.0000 or 2020-01-01 12:00:00")
}

val microsec: Int = if (timeValues.size == 4) timeValues(3).toInt else 0
val secs: Array[String] = timeValues(2).split("\\.")
val sec: Byte = secs(0).toByte
val microsec: Int = if (secs.length == 2) secs(1).toInt else 0
new DateTime(dateValues(0).toShort,
dateValues(1).toByte,
dateValues(2).toByte,
timeValues(0).toByte,
timeValues(1).toByte,
timeValues(2).toByte,
sec,
microsec)
}
case PropertyType.TIMESTAMP => {
Expand All @@ -157,7 +170,7 @@ trait Processor extends Serializable {
row.get(index).toString.toLong
}
case PropertyType.GEOGRAPHY => {
val wkt = row.get(index).toString
val wkt = row.get(index).toString
val jtsGeom = new org.locationtech.jts.io.WKTReader().read(wkt)
convertJTSGeometryToGeography(jtsGeom)
}
Expand Down Expand Up @@ -186,18 +199,18 @@ trait Processor extends Serializable {
}
case "LineString" => {
val jtsLineString = jtsGeom.asInstanceOf[org.locationtech.jts.geom.LineString]
val jtsCoordList = jtsLineString.getCoordinates
val coordList = new ListBuffer[Coordinate]()
val jtsCoordList = jtsLineString.getCoordinates
val coordList = new ListBuffer[Coordinate]()
for (jtsCoord <- jtsCoordList) {
coordList += new Coordinate(jtsCoord.x, jtsCoord.y)
}
Geography.lsVal(new LineString(coordList.asJava))
}
case "Polygon" => {
val jtsPolygon = jtsGeom.asInstanceOf[org.locationtech.jts.geom.Polygon]
val jtsPolygon = jtsGeom.asInstanceOf[org.locationtech.jts.geom.Polygon]
val coordListList = new java.util.ArrayList[java.util.List[Coordinate]]()
val jtsShell = jtsPolygon.getExteriorRing
val coordList = new ListBuffer[Coordinate]()
val jtsShell = jtsPolygon.getExteriorRing
val coordList = new ListBuffer[Coordinate]()
for (jtsCoord <- jtsShell.getCoordinates) {
coordList += new Coordinate(jtsCoord.x, jtsCoord.y)
}
Expand All @@ -206,7 +219,7 @@ trait Processor extends Serializable {
val jtsHolesNum = jtsPolygon.getNumInteriorRing
for (i <- 0 until jtsHolesNum) {
val coordList = new ListBuffer[Coordinate]()
val jtsHole = jtsPolygon.getInteriorRingN(i)
val jtsHole = jtsPolygon.getInteriorRingN(i)
for (jtsCoord <- jtsHole.getCoordinates) {
coordList += new Coordinate(jtsCoord.x, jtsCoord.y)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,18 @@
package scala.com.vesoft.nebula.exchange.processor

import com.vesoft.nebula.exchange.processor.Processor
import com.vesoft.nebula.{Date, DateTime, NullType, Time, Value, Geography, Coordinate, Point, LineString, Polygon}
import com.vesoft.nebula.{
Date,
DateTime,
NullType,
Time,
Value,
Geography,
Coordinate,
Point,
LineString,
Polygon
}
import com.vesoft.nebula.meta.PropertyType
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types.{
Expand All @@ -30,8 +41,8 @@ class ProcessorSuite extends Processor {
1000,
100000,
"2021-01-01",
"2021-01-01T12:00:00",
"12:00:00",
"2021-01-01T12:00:00.100",
"12:00:00.100",
"2021-01-01T12:00:00",
true,
12.01,
Expand Down Expand Up @@ -92,8 +103,9 @@ class ProcessorSuite extends Processor {
assert(extraValueForClient(row, "col6", map).toString.toLong == 100000)
assert(extraValueForClient(row, "col7", map).toString.equals("date(\"2021-01-01\")"))
assert(
extraValueForClient(row, "col8", map).toString.equals("datetime(\"2021-01-01T12:00:00\")"))
assert(extraValueForClient(row, "col9", map).toString.equals("time(\"12:00:00\")"))
extraValueForClient(row, "col8", map).toString
.equals("datetime(\"2021-01-01T12:00:00.100\")"))
assert(extraValueForClient(row, "col9", map).toString.equals("time(\"12:00:00.100\")"))
assert(
extraValueForClient(row, "col10", map).toString.equals("timestamp(\"2021-01-01T12:00:00\")"))
assert(extraValueForClient(row, "col11", map).toString.toBoolean)
Expand All @@ -120,10 +132,10 @@ class ProcessorSuite extends Processor {
assert(extraValueForSST(row, "col6", map).toString.toLong == 100000)
val date = new Date(2021, 1, 1)
assert(extraValueForSST(row, "col7", map).equals(date))
val datetime = new DateTime(2021, 1, 1, 12, 0, 0, 0)
val datetime = new DateTime(2021, 1, 1, 12, 0, 0, 100)
assert(extraValueForSST(row, "col8", map).equals(datetime))

val time = new Time(12, 0, 0, 0)
val time = new Time(12, 0, 0, 100)
assert(extraValueForSST(row, "col9", map).equals(time))

try {
Expand All @@ -141,7 +153,7 @@ class ProcessorSuite extends Processor {
assert(extraValueForSST(row, "col14", map).equals(nullValue))

// POINT(3 8)
val geogPoint = Geography.ptVal(new Point(new Coordinate(3, 8)))
val geogPoint = Geography.ptVal(new Point(new Coordinate(3, 8)))
val geogPointExpect = extraValueForSST(row, "col15", map)

assert(geogPointExpect.equals(geogPoint))
Expand Down