Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery storage module #2780

Merged
merged 2 commits into from
Nov 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/release-drafter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ categories:
label: 'p:geode'
- title: 'Google Cloud BigQuery'
label: 'p:google-cloud-bigquery'
- title: 'Google Cloud BigQuery Storage'
label: 'p:google-cloud-bigquery-storage'
- title: 'Google Cloud Pub/Sub'
label: 'p:google-cloud-pub-sub'
- title: 'Google Cloud Pub/Sub gRPC'
Expand Down
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ jobs:
env:
- PRE_CMD="docker-compose up -d geode"
- name: google-cloud-bigquery
- name: google-cloud-bigquery-storage
- name: google-cloud-pub-sub
env:
- PRE_CMD="docker-compose up -d gcloud-pubsub-emulator_prep"
Expand Down
29 changes: 26 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ lazy val alpakka = project
geode,
googleCommon,
googleCloudBigQuery,
googleCloudBigQueryStorage,
googleCloudPubSub,
googleCloudPubSubGrpc,
googleCloudStorage,
Expand Down Expand Up @@ -79,9 +80,15 @@ lazy val alpakka = project
.filterNot(_.data.getAbsolutePath.contains("commons-net-3.1.jar"))
.filterNot(_.data.getAbsolutePath.contains("protobuf-java-2.6.1.jar"))
},
ScalaUnidoc / unidoc / unidocProjectFilter := inAnyProject -- inProjects(`doc-examples`,
csvBench,
mqttStreamingBench),
ScalaUnidoc / unidoc / unidocProjectFilter := inAnyProject
-- inProjects(
`doc-examples`,
csvBench,
mqttStreamingBench,
// googleCloudPubSubGrpc and googleCloudBigQueryStorage contain the same gRPC generated classes
// don't include ScalaDocs for googleCloudBigQueryStorage to make it work
googleCloudBigQueryStorage
),
crossScalaVersions := List() // workaround for https://github.com/sbt/sbt/issues/3465
)

Expand Down Expand Up @@ -180,6 +187,22 @@ lazy val googleCloudBigQuery = alpakkaProject(
Test / fork := true
).dependsOn(googleCommon).enablePlugins(spray.boilerplate.BoilerplatePlugin)

lazy val googleCloudBigQueryStorage = alpakkaProject(
"google-cloud-bigquery-storage",
"google.cloud.bigquery.storage",
Dependencies.GoogleBigQueryStorage,
akkaGrpcCodeGeneratorSettings ~= { _.filterNot(_ == "flat_package") },
akkaGrpcCodeGeneratorSettings += "server_power_apis",
akkaGrpcGeneratedSources := Seq(AkkaGrpc.Client),
akkaGrpcGeneratedSources in Test := Seq(AkkaGrpc.Server),
akkaGrpcGeneratedLanguages := Seq(AkkaGrpc.Scala, AkkaGrpc.Java),
Compile / scalacOptions ++= Seq(
"-P:silencer:pathFilters=akka-grpc/main",
"-P:silencer:pathFilters=akka-grpc/test"
),
compile / javacOptions := (compile / javacOptions).value.filterNot(_ == "-Xlint:deprecation")
).dependsOn(googleCommon).disablePlugins(MimaPlugin).enablePlugins(AkkaGrpcPlugin)

lazy val googleCloudPubSub = alpakkaProject(
"google-cloud-pub-sub",
"google.cloud.pubsub",
Expand Down
156 changes: 156 additions & 0 deletions docs/src/main/paradox/google-cloud-bigquery-storage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
# Google Cloud BigQuery Storage

The BigQuery Storage API offers fast access to BigQuery-managed storage using an [rpc-based](https://cloud.google.com/bigquery/docs/reference/storage/rpc) protocol.
It is seen as an improvement over the REST API, and bulk data `extract` jobs for accessing BigQuery-managed table data, but doesn't offer any functionality around managing BigQuery resources.
Further information at the official [Google Cloud documentation website](https://cloud.google.com/bigquery/docs/reference/storage).

This connector communicates to the BigQuery Storage API via the gRPC protocol. The integration between Akka Stream and gRPC is handled by the
@extref:[Akka gRPC library](akka-grpc:/). Currently, this connector only supports returning each row as an Avro GenericRecord.

@@project-info{ projectId="google-cloud-bigquery-storage" }

## Artifacts

Akka gRPC uses Akka Discovery internally. Make sure to add Akka Discovery with the same Akka version that the application uses.

@@dependency [sbt,Maven,Gradle] {
group=com.lightbend.akka
artifact=akka-stream-alpakka-google-cloud-bigquery-storage_$scala.binary.version$
version=$project.version$
symbol2=AkkaVersion
value2=$akka.version$
group2=com.typesafe.akka
artifact2=akka-stream_$scala.binary.version$
version2=AkkaVersion
group3=com.typesafe.akka
artifact3=akka-discovery_$scala.binary.version$
version3=AkkaVersion
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

@@dependencies { projectId="google-cloud-bigquery-storage" }

## Build setup

The Alpakka Google Cloud BigQuery Storage library contains the classes generated from [Google's protobuf specification](https://github.com/googleapis/java-bigquerystorage/tree/master/proto-google-cloud-bigquerystorage-v1).

@@@note { title="ALPN on JDK 8" }

HTTP/2 requires ALPN negotiation, which comes with the JDK starting with
version 8u251.

For older versions of the JDK you will need to load the `jetty-alpn-agent`
yourself, but we recommend upgrading.

@@@

## Configuration

The BigQuery Storage connector @ref[shares its basic configuration](google-common.md) with all the Google connectors in Alpakka.

Example Test Configuration
```
alpakka.google.cloud.bigquery.grpc {
host = "localhost"
port = 21000
rootCa = "none"
callCredentials = "none"
}
```

For more configuration details consider the underlying configuration for @extref:[Akka gRPC](akka-grpc:/client/configuration.html).

A manually initialized @scala[`akka.stream.alpakka.googlecloud.bigquery.storage.scaladsl.GrpcBigQueryStorageReader`]@java[`akka.stream.alpakka.googlecloud.bigquery.storage.javadsl.GrpcBigQueryStorageReader`] can be used by providing it as an attribute to the stream:

Scala
: @@snip (/google-cloud-bigquery-storage/src/test/scala/docs/scaladsl/ExampleReader.scala) { #attributes }

Java
: @@snip (/google-cloud-bigquery-storage/src/test/java/docs/javadsl/ExampleReader.java) { #attributes }

## Reading

We can read in a number of ways. To read data from a table a read session needs to be created.
On the session creation we can specify the number of streams to be used in order to transfer the data, this makes it feasible to achieve parallelism while ingesting the data, thus achieving better performance.
To create a session the data format needs to be specified. The options provided are Avro and Arrow.

If no `TableReadOptions` are specified all the table's columns shall be retrieved as a `Source` containing a `Source` for each stream, which will each deliver a section of the rows:

Scala
: @@snip (/google-cloud-bigquery-storage/src/test/scala/docs/scaladsl/ExampleReader.scala) { #read-all }

Java
: @@snip (/google-cloud-bigquery-storage/src/test/java/docs/javadsl/ExampleReader.java) { #read-all }

Secondly, by specifying [`TableReadOptions`](https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#tablereadoptions), we can narrow down the amount of data returned, filtering down the columns returned, and/or a `row_restriction`. This is defined as:

> SQL text filtering statement, similar to a WHERE clause in a query. Currently, only a single predicate that is a comparison between a column and a constant value is supported. Aggregates are not supported.

Scala
: @@snip (/google-cloud-bigquery-storage/src/test/scala/docs/scaladsl/ExampleReader.scala) { #read-options }

Java
: @@snip (/google-cloud-bigquery-storage/src/test/java/docs/javadsl/ExampleReader.java) { #read-options }

You can then choose to read and process these streams as is or merged.
You can process the streams merged in rows. You need to provide a `ByteString` `Unmarshaller` based on the format requested.

Scala
: @@snip (/google-cloud-bigquery-storage/src/test/scala/docs/scaladsl/ExampleReader.scala) { #read-merged}

Java
: @@snip (/google-cloud-bigquery-storage/src/test/java/docs/javadsl/ExampleReader.java) { #read-merged }

Or process the stream of rows individually:

Scala
: @@snip (/google-cloud-bigquery-storage/src/test/scala/docs/scaladsl/ExampleReader.scala) { #read-all }

Java
: @@snip (/google-cloud-bigquery-storage/src/test/java/docs/javadsl/ExampleReader.java) { #read-all }

Since Avro and Arrow are the formats available, streams for those specific formats can be created.

You can read Arrow Record streams merged

Scala
: @@snip (/google-cloud-bigquery-storage/src/test/scala/docs/scaladsl/ExampleReader.scala) { #read-arrow-merged }

Java
: @@snip (/google-cloud-bigquery-storage/src/test/java/docs/javadsl/ExampleReader.java) { #read-arrow-merged }

You can read Arrow Record streams individually

Scala
: @@snip (/google-cloud-bigquery-storage/src/test/scala/docs/scaladsl/ExampleReader.scala) { #read-arrow-all }

Java
: @@snip (/google-cloud-bigquery-storage/src/test/java/docs/javadsl/ExampleReader.java) { #read-arrow-all }

You can read Avro Record streams merged

Scala
: @@snip (/google-cloud-bigquery-storage/src/test/scala/docs/scaladsl/ExampleReader.scala) { #read-avro-merged }

Java
: @@snip (/google-cloud-bigquery-storage/src/test/java/docs/javadsl/ExampleReader.java) { #read-avro-merged }

You can read Avro Record streams individually

Scala
: @@snip (/google-cloud-bigquery-storage/src/test/scala/docs/scaladsl/ExampleReader.scala) { #read-avro-all }

Java
: @@snip (/google-cloud-bigquery-storage/src/test/java/docs/javadsl/ExampleReader.java) { #read-avro-all }



## Running the test code
The tests use a [`BigQueryMockServer`](/google-cloud-bigquery-storage/src/test/scala/akka/stream/alpakka/bigquery/storage/mock/BigQueryMockServer.scala) that implements the server defined in the protobuf for the Storage API. It essentially provides a mock table on which to query.
Tests can be started from sbt by running:

sbt
: ```bash
> google-cloud-bigquery-storage/test
```
1 change: 1 addition & 0 deletions docs/src/main/paradox/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ The [Alpakka project](https://doc.akka.io/docs/alpakka/current/) is an open sour
* [FTP](ftp.md)
* [Google Common](google-common.md)
* [Google Cloud BigQuery](google-cloud-bigquery.md)
* [Google Cloud BigQuery Storage](google-cloud-bigquery-storage.md)
* [Google Cloud Pub/Sub](google-cloud-pub-sub.md)
* [Google Cloud Pub/Sub gRPC](google-cloud-pub-sub-grpc.md)
* [Google Cloud Storage](google-cloud-storage.md)
Expand Down
29 changes: 29 additions & 0 deletions google-cloud-bigquery-storage/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
alpakka.google {

credentials.default-scopes = ${?alpakka.google.credentials.default-scopes} ["https://www.googleapis.com/auth/bigquery.readonly"]

credentials {
provider = none
none {
project-id = "alpakka-google-test"
token = "yyyy.c.an-access-token"
}
}

retry-settings {
max-retries = 1
min-backoff = 50ms
max-backoff = 100ms
}

cloud.bigquery.grpc {
host = "bigquerystorage.googleapis.com"
port = 443

# Set to "none" to disable TLS
# TLS and certificates should be configured via the underlying
# SSL-config library
# https://lightbend.github.io/ssl-config/WSQuickStart.html#point-the-trust-manager-at-the-pem-file
rootCa = "none"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.googlecloud.bigquery.storage

import org.apache.avro.generic.GenericRecord

trait BigQueryRecord {

def get(column: String): Option[Object]

}

object BigQueryRecord {

def fromMap(map: Map[String, Object]): BigQueryRecord = new BigQueryRecordMapImpl(map)

def fromAvro(record: GenericRecord): BigQueryRecord = new BigQueryRecordAvroImpl(record)

}

case class BigQueryRecordAvroImpl(record: GenericRecord) extends BigQueryRecord {

override def get(column: String): Option[Object] = Option(record.get(column))

override def equals(that: Any): Boolean = that match {
case BigQueryRecordAvroImpl(thatRecord) => thatRecord.equals(record)
case _ => false
}

override def hashCode(): Int = record.hashCode()

}

case class BigQueryRecordMapImpl(map: Map[String, Object]) extends BigQueryRecord {

override def get(column: String): Option[Object] = map.get(column)

override def equals(that: Any): Boolean = that match {
case BigQueryRecordMapImpl(thatMap) => thatMap.equals(map)
case _ => false
}

override def hashCode(): Int = map.hashCode()

}
Loading