Skip to content

Commit

Permalink
correcting the handling for logical types (#169)
Browse files Browse the repository at this point in the history
  • Loading branch information
stheppi authored Apr 26, 2017
1 parent 079f5ac commit c3c83b4
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 50 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ allprojects {
// The following 4 need to align to compile against a particular version
confluentVersion = '3.2.0'
kafkaVersion = '0.10.2.0'
dataMountaineerCommonVersion = "0.7.4"
dataMountaineerCommonVersion = "0.7.6"
dataMountaineerTestKitVersion = "0.5"

// For 3.1.x versions use
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import java.text.SimpleDateFormat
import java.util.TimeZone

import com.datamountaineer.streamreactor.connect.hbase.BytesHelper._
import com.datamountaineer.streamreactor.connect.hbase.StructFieldsExtractorBytes._
import com.typesafe.scalalogging.slf4j.StrictLogging
import org.apache.kafka.connect.data._

Expand Down Expand Up @@ -57,42 +56,40 @@ case class StructFieldsExtractorBytes(includeAllFields: Boolean, fieldsAliasMap:
private def getFieldBytes(field: Field, struct: Struct): Option[Array[Byte]] = {
Option(struct.get(field))
.map { value =>
field.schema().`type`() match {
case Schema.Type.BOOLEAN => value.fromBoolean()
case Schema.Type.BYTES =>
if (Decimal.LOGICAL_NAME.equals(field.schema().name())) {
Decimal.toLogical(field.schema(), value.asInstanceOf[Array[Byte]]).fromBigDecimal()
} else value.fromBytes()
case Schema.Type.FLOAT32 => value.fromFloat()
case Schema.Type.FLOAT64 => value.fromDouble()
case Schema.Type.INT8 => value.fromByte()
case Schema.Type.INT16 => value.fromShort()
case Schema.Type.INT32 =>
field.schema().name match {
case Date.LOGICAL_NAME =>
DateFormat.format(Date.toLogical(field.schema(), value.asInstanceOf[Int])).fromString()
case Time.LOGICAL_NAME =>
TimeFormat.format(Time.toLogical(field.schema(), value.asInstanceOf[Int])).fromString()
case other => value.fromInt()
Option(field.schema().name()).collect {
case Decimal.LOGICAL_NAME =>
value match {
case java.math.BigDecimal => value.fromBigDecimal()
case arr: Array[Byte] => Decimal.toLogical(field.schema, arr).fromBigDecimal()
case _ => throw new IllegalArgumentException(s"${field.name()} is not handled for value:$value")
}
case Time.LOGICAL_NAME =>
value.asInstanceOf[Any] match {
case i: Int => StructFieldsExtractorBytes.TimeFormat.format(Time.toLogical(field.schema, i)).fromString()
case d@java.util.Date => StructFieldsExtractorBytes.TimeFormat.format(d).fromString()
case _ => throw new IllegalArgumentException(s"${field.name()} is not handled for value:$value")
}
case Schema.Type.INT64 =>
if (Timestamp.LOGICAL_NAME == field.schema().name()) {
DateFormat.format(Timestamp.toLogical(field.schema(), value.asInstanceOf[Long])).fromString()
} else value.fromLong()
case Schema.Type.STRING => value.fromString()
case other =>
other.name() match {
case Decimal.LOGICAL_NAME =>
Decimal.toLogical(field.schema(), value.asInstanceOf[Array[Byte]]).fromBigDecimal()
case Date.LOGICAL_NAME =>
DateFormat.format(Date.toLogical(field.schema(), value.asInstanceOf[Int])).fromString()
case Time.LOGICAL_NAME =>
TimeFormat.format(Time.toLogical(field.schema(), value.asInstanceOf[Int])).fromString()
case Timestamp.LOGICAL_NAME =>
DateFormat.format(Timestamp.toLogical(field.schema(), value.asInstanceOf[Long])).fromString()

case _ => sys.error(s"$other is not a recognized schema!")
case Timestamp.LOGICAL_NAME =>
value.asInstanceOf[Any] match {
case d@java.util.Date => StructFieldsExtractorBytes.DateFormat.format(d).fromString()
case l: Long => StructFieldsExtractorBytes.DateFormat.format(Timestamp.toLogical(field.schema, l)).fromString()
case _ => throw new IllegalArgumentException(s"${field.name()} is not handled for value:$value")
}
}.getOrElse {

field.schema().`type`() match {
case Schema.Type.BOOLEAN => value.fromBoolean()
case Schema.Type.BYTES => value.fromBytes()
case Schema.Type.FLOAT32 => value.fromFloat()
case Schema.Type.FLOAT64 => value.fromDouble()
case Schema.Type.INT8 => value.fromByte()
case Schema.Type.INT16 => value.fromShort()
case Schema.Type.INT32 => value.fromInt()
case Schema.Type.INT64 => value.fromLong()
case Schema.Type.STRING => value.fromString()
case other => sys.error(s"$other is not a recognized schema!")
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,25 @@ case class StructFieldsExtractor(includeAllFields: Boolean,
}

schema.name() match {
case Decimal.LOGICAL_NAME => Decimal.toLogical(schema, value.asInstanceOf[Array[Byte]])
case Date.LOGICAL_NAME => StructFieldsExtractor.DateFormat.format(Date.toLogical(schema, value.asInstanceOf[Int]))
case Time.LOGICAL_NAME => StructFieldsExtractor.TimeFormat.format(Time.toLogical(schema, value.asInstanceOf[Int]))
case Timestamp.LOGICAL_NAME => StructFieldsExtractor.DateFormat.format(Timestamp.toLogical(schema, value.asInstanceOf[Long]))
case Decimal.LOGICAL_NAME =>
value match {
case java.math.BigDecimal => value
case arr: Array[Byte] => Decimal.toLogical(schema, arr)
case _ => throw new IllegalArgumentException(s"${field.name()} is not handled for value:$value")
}
case Time.LOGICAL_NAME =>
value.asInstanceOf[Any] match {
case i: Int => StructFieldsExtractor.TimeFormat.format(Time.toLogical(schema, i))
case d@java.util.Date => StructFieldsExtractor.TimeFormat.format(d)
case _ => throw new IllegalArgumentException(s"${field.name()} is not handled for value:$value")
}

case Timestamp.LOGICAL_NAME =>
value.asInstanceOf[Any] match {
case d@java.util.Date => StructFieldsExtractor.DateFormat.format(d)
case l: Long => StructFieldsExtractor.DateFormat.format(Timestamp.toLogical(schema, l))
case _ => throw new IllegalArgumentException(s"${field.name()} is not handled for value:$value")
}
case _ => value
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,31 @@ object TagsExtractor extends StrictLogging {
Option(struct.schema().field(key))
.flatMap { field =>
val schema = field.schema()
val value = schema.name() match {
case Decimal.LOGICAL_NAME => Decimal.toLogical(schema, struct.getBytes(key))
case Date.LOGICAL_NAME => StructFieldsExtractor.DateFormat.format(Date.toLogical(schema, struct.getInt32(key)))
case Time.LOGICAL_NAME => StructFieldsExtractor.TimeFormat.format(Time.toLogical(schema, struct.getInt32(key)))
case Timestamp.LOGICAL_NAME => StructFieldsExtractor.DateFormat.format(Timestamp.toLogical(schema, struct.getInt64(key)))
case _ => struct.get(key)
Option(struct.get(field)).map { value =>
schema.name() match {
case Decimal.LOGICAL_NAME =>
value match {
case java.math.BigDecimal => value
case arr: Array[Byte] => Decimal.toLogical(schema, arr)
case _ => throw new IllegalArgumentException(s"${field.name()} is not handled for value:$value")
}
case Time.LOGICAL_NAME =>
value.asInstanceOf[Any] match {
case i: Int => StructFieldsExtractor.TimeFormat.format(Time.toLogical(schema, i))
case d@java.util.Date => StructFieldsExtractor.TimeFormat.format(d)
case _ => throw new IllegalArgumentException(s"${field.name()} is not handled for value:$value")
}

case Timestamp.LOGICAL_NAME =>
value.asInstanceOf[Any] match {
case d@java.util.Date => StructFieldsExtractor.DateFormat.format(d)
case l: Long => StructFieldsExtractor.DateFormat.format(Timestamp.toLogical(schema, l))
case _ => throw new IllegalArgumentException(s"${field.name()} is not handled for value:$value")
}
case _ => value
}
}
Option(value)

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.datamountaineer.streamreactor.connect.voltdb

import java.text.SimpleDateFormat
import java.util.TimeZone

import com.typesafe.scalalogging.slf4j.StrictLogging
import org.apache.kafka.connect.data.{Field, Struct, _}

Expand Down Expand Up @@ -55,10 +58,26 @@ case class StructFieldsExtractor(targetTable: String,
.map { value =>
//handle specific schema
schema.name() match {
case Decimal.LOGICAL_NAME => Decimal.toLogical(schema, value.asInstanceOf[Array[Byte]])
case Date.LOGICAL_NAME => Date.toLogical(schema, value.asInstanceOf[Int])
case Time.LOGICAL_NAME => Time.toLogical(schema, value.asInstanceOf[Int])
case Timestamp.LOGICAL_NAME => Timestamp.toLogical(schema, value.asInstanceOf[Long])
case Decimal.LOGICAL_NAME =>
value match {
case java.math.BigDecimal => value
case arr: Array[Byte] => Decimal.toLogical(schema, arr)
case _ => throw new IllegalArgumentException(s"${field.name()} is not handled for value:$value")
}
case Time.LOGICAL_NAME =>
value.asInstanceOf[Any] match {
case i: Int => StructFieldsExtractor.TimeFormat.format(Time.toLogical(schema, i))
case d@java.util.Date => StructFieldsExtractor.TimeFormat.format(d)
case _ => throw new IllegalArgumentException(s"${field.name()} is not handled for value:$value")
}

case Timestamp.LOGICAL_NAME =>
value.asInstanceOf[Any] match {
case d@java.util.Date => StructFieldsExtractor.DateFormat.format(d)
case l: Long => StructFieldsExtractor.DateFormat.format(Timestamp.toLogical(schema, l))
case _ => throw new IllegalArgumentException(s"${field.name()} is not handled for value:$value")
}

case _ => value
}
}.orNull
Expand All @@ -69,3 +88,8 @@ case class StructFieldsExtractor(targetTable: String,
}


object StructFieldsExtractor {
val DateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
val TimeFormat: SimpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSSZ")
DateFormat.setTimeZone(TimeZone.getTimeZone("UTC"))
}

0 comments on commit c3c83b4

Please sign in to comment.