Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/3.2.0 #133

Merged
merged 3 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
Version 3.2.0 (2023-12-01)
--------------------------
Parse Event from a ByteBuffer (#130)
Add _schema_version field to each context during toShreddedJson transformation (#132)

Version 3.1.0 (2023-03-17)
--------------------------
Disable validation of field lengths when parsing event (#127)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ limitations under the License.
[license-image]: http://img.shields.io/badge/license-Apache--2-blue.svg?style=flat
[license]: http://www.apache.org/licenses/LICENSE-2.0

[release-image]: http://img.shields.io/badge/release-2.0.1-blue.svg?style=flat
[release-image]: http://img.shields.io/badge/release-3.2.0-blue.svg?style=flat
[releases]: https://github.com/snowplow/snowplow-scala-analytics-sdk/releases

[scala-doc]: http://snowplow.github.io/snowplow-scala-analytics-sdk/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
*/
package com.snowplowanalytics.snowplow.analytics.scalasdk.decode

import cats.implicits._
import shapeless._
import shapeless.ops.record._
import shapeless.ops.hlist._
import cats.data.{NonEmptyList, Validated}
import java.nio.ByteBuffer
import scala.collection.mutable.ListBuffer
import com.snowplowanalytics.snowplow.analytics.scalasdk.ParsingError.{FieldNumberMismatch, NotTSV, RowDecodingError}

private[scalasdk] trait Parser[A] extends TSVParser[A] {
Expand All @@ -42,10 +45,37 @@ private[scalasdk] trait Parser[A] extends TSVParser[A] {
decoded.map(decodedValue => generic.from(decodedValue))
}
}

def parseBytes(row: ByteBuffer): DecodeResult[A] = {
val values = Parser.splitBuffer(row)
if (values.length == 1)
Validated.Invalid(NotTSV)
else if (values.length != expectedNumFields)
Validated.Invalid(FieldNumberMismatch(values.length))
else {
val decoded = decoder.decodeBytes(values.result()).leftMap(e => RowDecodingError(e))
decoded.map(decodedValue => generic.from(decodedValue))
}
}
}

object Parser {

private val tab: Byte = '\t'.toByte

private def splitBuffer(row: ByteBuffer): ListBuffer[ByteBuffer] = {
var current = row.duplicate
val builder = ListBuffer(current)
(row.position() until row.limit()).foreach { i =>
if (row.get(i) === tab) {
current.limit(i)
current = row.duplicate.position(i + 1)
builder += current
}
}
builder
}

private[scalasdk] sealed trait DeriveParser[A] {

def knownKeys[R <: HList, K <: HList, L <: HList](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import shapeless._
import cats.syntax.validated._
import cats.syntax.either._
import cats.syntax.apply._
import java.nio.ByteBuffer
import com.snowplowanalytics.snowplow.analytics.scalasdk.ParsingError.RowDecodingErrorInfo.UnhandledRowDecodingError

private[scalasdk] trait RowDecoderCompanion {
Expand Down Expand Up @@ -44,6 +45,21 @@ private[scalasdk] trait RowDecoderCompanion {
case Nil => UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel
}

/** Parse TSV row into HList */
private def parseBytes[H: ValueDecoder, T <: HList](
key: Key,
tailDecoder: RowDecoder[T],
maxLength: Option[Int],
row: List[ByteBuffer]
): RowDecodeResult[H :: T] =
row match {
case h :: t =>
val hv: RowDecodeResult[H] = ValueDecoder[H].parseBytes(key, h, maxLength).toValidatedNel
val tv: RowDecodeResult[T] = tailDecoder.decodeBytes(t)
(hv, tv).mapN(_ :: _)
case Nil => UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel
}

implicit def hnilFromRow: DeriveRowDecoder[HNil] =
new DeriveRowDecoder[HNil] {
def get(knownKeys: List[Key], maxLengths: Map[String, Int]): RowDecoder[HNil] =
Expand All @@ -55,6 +71,14 @@ private[scalasdk] trait RowDecoderCompanion {
case _ =>
UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel
}

def decodeBytes(row: List[ByteBuffer]): RowDecodeResult[HNil] =
row match {
case Nil =>
HNil.validNel
case _ =>
UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel
}
}
}

Expand All @@ -67,6 +91,7 @@ private[scalasdk] trait RowDecoderCompanion {
val maxLength = maxLengths.get(key.name)
new RowDecoder[H :: T] {
def apply(row: List[String]): RowDecodeResult[H :: T] = parse(key, tailDecoder, maxLength, row)
def decodeBytes(row: List[ByteBuffer]): RowDecodeResult[H :: T] = parseBytes(key, tailDecoder, maxLength, row)
}
case Nil =>
// Shapeless type checking makes this impossible
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
*/
package com.snowplowanalytics.snowplow.analytics.scalasdk.decode

import cats.implicits._
import cats.data.{NonEmptyList, Validated}
import java.nio.ByteBuffer
import scala.collection.mutable.ListBuffer
import com.snowplowanalytics.snowplow.analytics.scalasdk.ParsingError.{FieldNumberMismatch, NotTSV, RowDecodingError}
import scala.deriving._
import scala.compiletime._
Expand All @@ -30,10 +33,32 @@ private[scalasdk] trait Parser[A] extends TSVParser[A] {
else if (values.length != expectedNumFields) Validated.Invalid(FieldNumberMismatch(values.length))
else decoder(values.toList).leftMap(e => RowDecodingError(e))
}

def parseBytes(row: ByteBuffer): DecodeResult[A] = {
val values = Parser.splitBuffer(row)
if (values.length == 1) Validated.Invalid(NotTSV)
else if (values.length != expectedNumFields) Validated.Invalid(FieldNumberMismatch(values.length))
else decoder.decodeBytes(values.result()).leftMap(e => RowDecodingError(e))
}
}

object Parser {

private val tab: Byte = '\t'.toByte

private def splitBuffer(row: ByteBuffer): ListBuffer[ByteBuffer] = {
var current = row.duplicate
val builder = ListBuffer(current)
(row.position() until row.limit()).foreach { i =>
if (row.get(i) === tab) {
current.limit(i)
current = row.duplicate.position(i + 1)
builder += current
}
}
builder
}

private[scalasdk] sealed trait DeriveParser[A] {
inline def knownKeys(implicit mirror: Mirror.ProductOf[A]): List[String] =
constValueTuple[mirror.MirroredElemLabels].toArray.map(_.toString).toList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import cats.syntax.validated._
import cats.syntax.either._
import cats.syntax.apply._
import com.snowplowanalytics.snowplow.analytics.scalasdk.ParsingError.RowDecodingErrorInfo.UnhandledRowDecodingError
import java.nio.ByteBuffer
import scala.deriving._
import scala.compiletime._

Expand Down Expand Up @@ -52,6 +53,20 @@ private[scalasdk] trait RowDecoderCompanion {
case Nil => UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel
}

private def parseBytes[H: ValueDecoder, T <: Tuple](
key: Key,
tailDecoder: RowDecoder[T],
maxLength: Option[Int],
row: List[ByteBuffer]
): RowDecodeResult[H *: T] =
row match {
case h :: t =>
val hv: RowDecodeResult[H] = ValueDecoder[H].parseBytes(key, h, maxLength).toValidatedNel
val tv: RowDecodeResult[T] = tailDecoder.decodeBytes(t)
(hv, tv).mapN(_ *: _)
case Nil => UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel
}

implicit def hnilFromRow: DeriveRowDecoder[EmptyTuple] =
new DeriveRowDecoder[EmptyTuple] {
def get(knownKeys: List[Key], maxLengths: Map[String, Int]): RowDecoder[EmptyTuple] =
Expand All @@ -63,6 +78,14 @@ private[scalasdk] trait RowDecoderCompanion {
case _ =>
UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel
}

def decodeBytes(row: List[ByteBuffer]): RowDecodeResult[EmptyTuple] =
row match {
case Nil =>
EmptyTuple.validNel
case _ =>
UnhandledRowDecodingError("Not enough values, format is invalid").invalidNel
}
}
}

Expand All @@ -75,6 +98,7 @@ private[scalasdk] trait RowDecoderCompanion {
val maxLength = maxLengths.get(key.name)
new RowDecoder[H *: T] {
def apply(row: List[String]): RowDecodeResult[H *: T] = parse(key, tailDecoder, maxLength, row)
def decodeBytes(row: List[ByteBuffer]): RowDecodeResult[H *: T] = parseBytes(key, tailDecoder, maxLength, row)
}
case Nil =>
// Shapeless type checking makes this impossible
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package com.snowplowanalytics.snowplow.analytics.scalasdk
import java.time.Instant
import java.util.UUID
import java.time.format.DateTimeFormatter
import java.nio.ByteBuffer

// circe
import io.circe.{Decoder, Encoder, Json, JsonObject}
Expand Down Expand Up @@ -280,6 +281,9 @@ object Event {
def parse(line: String): DecodeResult[Event] =
stdParser.parse(line)

def parseBytes(bytes: ByteBuffer): DecodeResult[Event] =
stdParser.parseBytes(bytes)

private lazy val fieldNames: List[String] =
Parser.deriveFor[Event].knownKeys

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,18 @@ object SnowplowEvent {
case class Contexts(data: List[SelfDescribingData[Json]]) extends AnyVal {
def toShreddedJson: Map[String, Json] =
data.groupBy(x => (x.schema.vendor, x.schema.name, x.schema.format, x.schema.version.model)).map {
case ((vendor, name, _, model), d) =>
(transformSchema(Data.Contexts(Data.CustomContexts), vendor, name, model),
d.map { selfdesc =>
selfdesc.data
}.asJson
)
case ((vendor, name, _, model), contextsSdd) =>
val transformedName = transformSchema(Data.Contexts(Data.CustomContexts), vendor, name, model)
val transformedData = contextsSdd.map(addSchemaVersionToData).asJson
(transformedName, transformedData)
}
}

private def addSchemaVersionToData(contextSdd: SelfDescribingData[Json]): Json = {
val version = Json.obj("_schema_version" -> contextSdd.schema.toSchemaUri.asJson)
contextSdd.data.deepMerge(version)
}

implicit final val contextsCirceEncoder: Encoder[Contexts] =
Encoder.instance { contexts =>
if (contexts.data.isEmpty) JsonObject.empty.asJson
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@
package com.snowplowanalytics.snowplow.analytics.scalasdk
package decode

import java.nio.ByteBuffer

private[scalasdk] trait RowDecoder[L] extends Serializable { self =>
def apply(row: List[String]): RowDecodeResult[L]
def decodeBytes(row: List[ByteBuffer]): RowDecodeResult[L]
def map[B](f: L => B): RowDecoder[B] =
new RowDecoder[B] {
def apply(row: List[String]): RowDecodeResult[B] = self.apply(row).map(f)
def decodeBytes(row: List[ByteBuffer]): RowDecodeResult[B] = self.decodeBytes(row).map(f)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
*/
package com.snowplowanalytics.snowplow.analytics.scalasdk.decode

import java.nio.ByteBuffer

/** Parser for a TSV-encoded string */
trait TSVParser[A] extends Serializable {
def parseBytes(bytes: ByteBuffer): DecodeResult[A]
def parse(row: String): DecodeResult[A]
}
Loading
Loading