Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery JSON column: encode as Jackson JsonNode #5523

Merged
merged 7 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,10 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
),
ProblemFilters.exclude[DirectAbstractMethodProblem](
"org.apache.beam.sdk.coders.Coder.getCoderArguments"
),
// added BQ Json object
ProblemFilters.exclude[MissingTypesProblem](
"com.spotify.scio.bigquery.types.package$Json$"
)
)

Expand Down Expand Up @@ -962,6 +966,7 @@ lazy val `scio-google-cloud-platform` = project
libraryDependencies ++= Seq(
// compile
"com.esotericsoftware" % "kryo-shaded" % kryoVersion,
"com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion,
"com.google.api" % "gax" % gcpBom.key.value,
"com.google.api" % "gax-grpc" % gcpBom.key.value,
"com.google.api-client" % "google-api-client" % gcpBom.key.value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
package com.spotify.scio.bigquery

import com.google.protobuf.ByteString
import com.spotify.scio._
import com.spotify.scio.avro._
import com.spotify.scio.coders.Coder
import com.spotify.scio.bigquery.BigQueryTypedTable.Format
import com.spotify.scio.bigquery.client.BigQuery
import com.spotify.scio.bigquery.types.{BigNumeric, Geography, Json}
import com.spotify.scio.testing._
import magnolify.scalacheck.auto._
import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder}
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
import org.apache.avro.generic.GenericRecord
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.joda.time.{Instant, LocalDate, LocalDateTime, LocalTime}
import org.joda.time.format.DateTimeFormat
Expand All @@ -42,24 +42,53 @@ object TypedBigQueryIT {
long: Long,
float: Float,
double: Double,
numeric: BigDecimal,
string: String,
byteString: ByteString,
timestamp: Instant,
date: LocalDate,
time: LocalTime,
datetime: LocalDateTime
datetime: LocalDateTime,
geography: Geography,
json: Json,
bigNumeric: BigNumeric
)

// Workaround for millis rounding error
val epochGen: Gen[Long] = Gen.chooseNum[Long](0L, 1000000000000L).map(x => x / 1000 * 1000)
def arbBigDecimal(precision: Int, scale: Int): Arbitrary[BigDecimal] = Arbitrary {
val max = BigInt(10).pow(precision) - 1
Gen.choose(-max, max).map(BigDecimal(_, scale))
}

implicit val arbNumeric: Arbitrary[BigDecimal] =
arbBigDecimal(Numeric.MaxNumericPrecision, Numeric.MaxNumericScale)
implicit val arbString: Arbitrary[String] = Arbitrary(Gen.alphaStr)
implicit val arbByteString: Arbitrary[ByteString] = Arbitrary(
Gen.alphaStr.map(ByteString.copyFromUtf8)
)
// Workaround for millis rounding error
val epochGen: Gen[Long] = Gen.chooseNum[Long](0L, 1000000000000L).map(x => x / 1000 * 1000)
implicit val arbInstant: Arbitrary[Instant] = Arbitrary(epochGen.map(new Instant(_)))
implicit val arbDate: Arbitrary[LocalDate] = Arbitrary(epochGen.map(new LocalDate(_)))
implicit val arbTime: Arbitrary[LocalTime] = Arbitrary(epochGen.map(new LocalTime(_)))
implicit val arbDatetime: Arbitrary[LocalDateTime] = Arbitrary(epochGen.map(new LocalDateTime(_)))
implicit val arbGeography: Arbitrary[Geography] = Arbitrary(
for {
x <- Gen.numChar
y <- Gen.numChar
} yield Geography(s"POINT($x $y)")
)
implicit val arbJson: Arbitrary[Json] = Arbitrary(
for {
key <- Gen.alphaStr
value <- Gen.alphaStr
} yield Json(s"""{"$key":"$value"}""")
)

implicit val arbBigNumeric: Arbitrary[BigNumeric] = Arbitrary {
// Precision: 76.76 (the 77th digit is partial)
arbBigDecimal(BigNumeric.MaxNumericPrecision - 1, BigNumeric.MaxNumericScale).arbitrary
.map(BigNumeric.apply)
}

private val recordGen =
implicitly[Arbitrary[Record]].arbitrary
Expand All @@ -71,9 +100,9 @@ object TypedBigQueryIT {
s"data-integration-test:bigquery_avro_it.$name${now}_${Random.nextInt(Int.MaxValue)}"
Table.Spec(spec)
}
private val typedTable = table("records")
private val tableRowTable = table("records_tablerow")
private val avroTable = table("records_avro")
private val avroLogicalTypeTable = table("records_avro_logical_type")

private val records = Gen.listOfN(100, recordGen).sample.get
private val options = PipelineOptionsFactory
Expand All @@ -87,122 +116,57 @@ object TypedBigQueryIT {
class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll {
import TypedBigQueryIT._

override protected def beforeAll(): Unit = {
val sc = ScioContext(options)
sc.parallelize(records).saveAsTypedBigQueryTable(tableRowTable)

sc.run()
()
}

override protected def afterAll(): Unit = {
BigQuery.defaultInstance().tables.delete(tableRowTable.ref)
BigQuery.defaultInstance().tables.delete(avroTable.ref)
BigQuery.defaultInstance().tables.delete(avroLogicalTypeTable.ref)
}

"TypedBigQuery" should "read records" in {
val sc = ScioContext(options)
sc.typedBigQuery[Record](tableRowTable) should containInAnyOrder(records)
sc.run()
val bq = BigQuery.defaultInstance()
bq.tables.delete(typedTable.ref)
bq.tables.delete(tableRowTable.ref)
bq.tables.delete(avroTable.ref)
}

it should "convert to avro format" in {
val sc = ScioContext(options)
implicit val coder = avroGenericRecordCoder(Record.avroSchema)
sc.typedBigQuery[Record](tableRowTable)
.map(Record.toAvro)
.map(Record.fromAvro) should containInAnyOrder(
records
)
sc.run()
}
"TypedBigQuery" should "handle records as TableRow" in {
runWithRealContext(options) { sc =>
sc.parallelize(records)
.saveAsTypedBigQueryTable(typedTable, createDisposition = CREATE_IF_NEEDED)
}.waitUntilFinish()

"BigQueryTypedTable" should "read TableRow records" in {
val sc = ScioContext(options)
sc
.bigQueryTable(tableRowTable)
.map(Record.fromTableRow) should containInAnyOrder(records)
sc.run()
runWithRealContext(options) { sc =>
val data = sc.typedBigQuery[Record](typedTable)
data should containInAnyOrder(records)
}
}

it should "read GenericRecord recors" in {
val sc = ScioContext(options)
implicit val coder = avroGenericRecordCoder(Record.avroSchema)
sc
.bigQueryTable(tableRowTable, Format.GenericRecord)
.map(Record.fromAvro) should containInAnyOrder(records)
sc.run()
"BigQueryTypedTable" should "handle records as TableRow format" in {
runWithRealContext(options) { sc =>
sc.parallelize(records)
.map(Record.toTableRow)
.saveAsBigQueryTable(
tableRowTable,
schema = Record.schema,
createDisposition = CREATE_IF_NEEDED
)
}.waitUntilFinish()

runWithRealContext(options) { sc =>
val data = sc.bigQueryTable(tableRowTable).map(Record.fromTableRow)
data should containInAnyOrder(records)
}
}

it should "write GenericRecord records" in {
val sc = ScioContext(options)
implicit val coder = avroGenericRecordCoder(Record.avroSchema)
val schema =
BigQueryUtil.parseSchema("""
|{
| "fields": [
| {"mode": "NULLABLE", "name": "bool", "type": "BOOLEAN"},
| {"mode": "NULLABLE", "name": "int", "type": "INTEGER"},
| {"mode": "NULLABLE", "name": "long", "type": "INTEGER"},
| {"mode": "NULLABLE", "name": "float", "type": "FLOAT"},
| {"mode": "NULLABLE", "name": "double", "type": "FLOAT"},
| {"mode": "NULLABLE", "name": "string", "type": "STRING"},
| {"mode": "NULLABLE", "name": "byteString", "type": "BYTES"},
| {"mode": "NULLABLE", "name": "timestamp", "type": "INTEGER"},
| {"mode": "NULLABLE", "name": "date", "type": "STRING"},
| {"mode": "NULLABLE", "name": "time", "type": "STRING"},
| {"mode": "NULLABLE", "name": "datetime", "type": "STRING"}
| ]
|}
""".stripMargin)
val tap = sc
.bigQueryTable(tableRowTable, Format.GenericRecord)
.saveAsBigQueryTable(avroTable, schema = schema, createDisposition = CREATE_IF_NEEDED)

val result = sc.run().waitUntilDone()
result.tap(tap).map(Record.fromAvro).value.toSet shouldBe records.toSet
it should "handle records as avro format" in {
implicit val coder: Coder[GenericRecord] = avroGenericRecordCoder(Record.avroSchema)
runWithRealContext(options) { sc =>
sc.parallelize(records)
.map(Record.toAvro)
.saveAsBigQueryTable(
avroTable,
schema = Record.schema, // This is a bad API. an avro schema should be expected
createDisposition = CREATE_IF_NEEDED
)
}.waitUntilFinish()

runWithRealContext(options) { sc =>
val data = sc.bigQueryTable(avroTable, Format.GenericRecord).map(Record.fromAvro)
data should containInAnyOrder(records)
}
}

it should "write GenericRecord records with logical types" in {
val sc = ScioContext(options)
// format: off
val schema: Schema = SchemaBuilder
.record("Record")
.namespace("com.spotify.scio.bigquery")
.fields()
.name("date").`type`(LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT))).withDefault(0)
.name("time").`type`(LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG))).withDefault(0L)
.name("datetime").`type`().stringType().stringDefault("")
.endRecord()
// format: on

implicit val coder = avroGenericRecordCoder(schema)
val ltRecords: Seq[GenericRecord] =
Seq(
new GenericRecordBuilder(schema)
.set("date", 10)
.set("time", 1000L)
.set("datetime", "2020-08-03 11:11:11")
.build()
)

val tableSchema =
BigQueryUtil.parseSchema("""
|{
| "fields": [
| {"mode": "REQUIRED", "name": "date", "type": "DATE"},
| {"mode": "REQUIRED", "name": "time", "type": "TIME"},
| {"mode": "REQUIRED", "name": "datetime", "type": "STRING"}
| ]
|}
""".stripMargin)
val tap = sc
.parallelize(ltRecords)
.saveAsBigQueryTable(avroLogicalTypeTable, tableSchema, createDisposition = CREATE_IF_NEEDED)

val result = sc.run().waitUntilDone()
result.tap(tap).value.toList.size shouldBe 1
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -412,9 +412,11 @@ private[types] object ConverterProvider {
case t if t =:= typeOf[Geography] =>
q"$tree.wkt"
case t if t =:= typeOf[Json] =>
q"$tree.wkt"
// for TableRow/json, use JSON to prevent escaping
q"_root_.com.spotify.scio.bigquery.types.Json.parse($tree)"
case t if t =:= typeOf[BigNumeric] =>
q"$tree.wkt"
// for TableRow/json, use string to avoid precision loss (like numeric)
q"$tree.wkt.toString"

case t if isCaseClass(c)(t) => // nested records
val fn = TermName("r" + t.typeSymbol.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.spotify.scio.bigquery

import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import com.spotify.scio.coders.Coder
import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.LogicalTypes
Expand Down Expand Up @@ -53,14 +54,20 @@ package object types {
case class Geography(wkt: String)

/**
* Case class to serve as raw type for Json instances to distinguish them from Strings.
* Case class to serve as raw type for Json instances.
*
* See also https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#json_type
*
* @param wkt
* Well Known Text formatted string that BigQuery displays for Json
*/
case class Json(wkt: String)
object Json {
private lazy val mapper = new ObjectMapper()

def apply(node: JsonNode): Json = Json(mapper.writeValueAsString(node))
def parse(json: Json): JsonNode = mapper.readTree(json.wkt)
}

/**
* Case class to serve as BigNumeric type to distinguish them from Numeric.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ final class ConverterProviderSpec
.retryUntil(_.precision <= Numeric.MaxNumericPrecision)
.map(Numeric.apply)
}
implicit val arbJson: Arbitrary[Json] = Arbitrary(
for {
key <- Gen.alphaStr
value <- Gen.alphaStr
} yield Json(s"""{"$key":"$value"}""")
)
implicit val eqByteArrays: Eq[Array[Byte]] = Eq.instance[Array[Byte]](_.toList == _.toList)
implicit val eqByteString: Eq[ByteString] = Eq.instance[ByteString](_ == _)
implicit val eqInstant: Eq[Instant] = Eq.instance[Instant](_ == _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.spotify.scio.bigquery.types

import com.fasterxml.jackson.databind.node.{JsonNodeFactory, ObjectNode}
import com.spotify.scio.bigquery._
import org.scalatest.matchers.should.Matchers
import org.scalatest.flatspec.AnyFlatSpec
Expand Down Expand Up @@ -47,9 +48,26 @@ class ConverterProviderTest extends AnyFlatSpec with Matchers {
}

it should "handle required json type" in {
val wkt = "{\"name\": \"Alice\", \"age\": 30}"
RequiredJson.fromTableRow(TableRow("a" -> wkt)) shouldBe RequiredJson(Json(wkt))
BigQueryType.toTableRow[RequiredJson](RequiredJson(Json(wkt))) shouldBe TableRow("a" -> wkt)
val wkt = """{"name":"Alice","age":30}"""
val jsNodeFactory = new JsonNodeFactory(false)
val jackson = jsNodeFactory
.objectNode()
.set[ObjectNode]("name", jsNodeFactory.textNode("Alice"))
.set[ObjectNode]("age", jsNodeFactory.numberNode(30))

RequiredJson.fromTableRow(TableRow("a" -> jackson)) shouldBe RequiredJson(Json(wkt))
BigQueryType.toTableRow[RequiredJson](RequiredJson(Json(wkt))) shouldBe TableRow("a" -> jackson)
}

it should "handle required big numeric type" in {
val bigNumeric = "12.34567890123456789012345678901234567890"
val wkt = BigDecimal(bigNumeric)
RequiredBigNumeric.fromTableRow(TableRow("a" -> bigNumeric)) shouldBe RequiredBigNumeric(
BigNumeric(wkt)
)
BigQueryType.toTableRow(RequiredBigNumeric(BigNumeric(wkt))) shouldBe TableRow(
"a" -> bigNumeric
)
}

it should "handle case classes with methods" in {
Expand All @@ -66,6 +84,9 @@ object ConverterProviderTest {
@BigQueryType.toTable
case class RequiredJson(a: Json)

@BigQueryType.toTable
case class RequiredBigNumeric(a: BigNumeric)

@BigQueryType.toTable
case class Required(a: String)

Expand Down