Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: update docs for bigquery sink json support #183

Merged
merged 4 commits into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions docs/docs/concepts/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,20 @@ The final state of message can be any one of the followings after it is consumed
One can monitor via plotting the metrics related to messages.

### Schema Handling
- Incase when `INPUT_SCHEMA_DATA_TYPE is set to protobuf`
- Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data. Data streams on Kafka topics are bound to a protobuf schema.
- Firehose deserializes the data consumed from the topics using the Protobuf descriptors generated out of the artifacts. The artifactory is an HTTP interface that Firehose uses to deserialize.
- The schema handling ie., find the mapped schema for the topic, downloading the descriptors, and dynamically being notified of/updating with the latest schema is abstracted through the Stencil library.

- Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data. Data streams on Kafka topics are bound to a protobuf schema.
- Firehose deserializes the data consumed from the topics using the Protobuf descriptors generated out of the artifacts. The artifactory is an HTTP interface that Firehose uses to deserialize.
- The schema handling ie., find the mapped schema for the topic, downloading the descriptors, and dynamically being notified of/updating with the latest schema is abstracted through the Stencil library.
The Stencil is a proprietary library that provides an abstraction layer, for schema handling.

The Stencil is a proprietary library that provides an abstraction layer, for schema handling.
Schema Caching, dynamic schema updates, etc. are features of the stencil client library.

Schema Caching, dynamic schema updates, etc. are features of the stencil client library.
- Incase when `INPUT_SCHEMA_DATA_TYPE is set to json`
prakharmathur82 marked this conversation as resolved.
Show resolved Hide resolved
- Currently this config is only supported in Bigquery sink,
- For json, in bigquery sink the schema is dynamically inferred from incoming data, in future we plan to provide json schema support via stencil.



## Firehose Integration

Expand Down
3 changes: 2 additions & 1 deletion docs/docs/contribute/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Configuration parameter variables of each sink can be found in the [Configuratio

### Schema Registry

Firehose uses Stencil Server as its Schema Registry for hosting Protobuf descriptors. The environment variable `SCHEMA_REGISTRY_STENCIL_ENABLE` must be set to `true` . Stencil server URL must be specified in the variable `SCHEMA_REGISTRY_STENCIL_URLS` . The Proto Descriptor Set file of the Kafka messages must be uploaded to the Stencil server.
When `INPUT_SCHEMA_DATA_TYPE is set to protobuf`, firehose uses Stencil Server as its Schema Registry for hosting Protobuf descriptors. The environment variable `SCHEMA_REGISTRY_STENCIL_ENABLE` must be set to `true` . Stencil server URL must be specified in the variable `SCHEMA_REGISTRY_STENCIL_URLS` . The Proto Descriptor Set file of the Kafka messages must be uploaded to the Stencil server.

Refer [this guide](https://github.com/odpf/stencil/tree/master/server#readme) on how to set up and configure the Stencil server, and how to generate and upload Proto descriptor set file to the server.

Expand Down Expand Up @@ -71,6 +71,7 @@ Set the generic variables in the local.properties file.
```text
KAFKA_RECORD_PARSER_MODE = message
SINK_TYPE = log
INPUT_SCHEMA_DATA_TYPE=protobuf
INPUT_SCHEMA_PROTO_CLASS = io.odpf.firehose.consumer.TestMessage
```
Set the variables which specify the kafka server, topic name, and group-id of the kafka consumer - the standard values are used here.
Expand Down
7 changes: 5 additions & 2 deletions docs/docs/guides/create_firehose.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ SOURCE_KAFKA_TOPIC=test-topic
KAFKA_RECOED_CONSUMER_GROUP_ID=sample-group-id
KAFKA_RECORD_PARSER_MODE=message
SINK_TYPE=log
INPUT_SCHEMA_DATA_TYPE=protobuf
INPUT_SCHEMA_PROTO_CLASS=com.tests.TestMessage
```

Expand Down Expand Up @@ -129,8 +130,10 @@ _**Note:**_ [_**DATABASE**_](../sinks/influxdb-sink.md#sink_influx_db_name) _**a
## Create a Bigquery sink

- it requires the following [variables](../sinks/bigquery-sink.md) to be set.
- This sink will generate bigquery schema from protobuf message schema and update bigquery table with the latest generated schema.
- The protobuf message of a `google.protobuf.Timestamp` field might be needed when table partitioning is enabled.
- For INPUT_SCHEMA_DATA_TYPE = protobuf, this sink will generate bigquery schema from protobuf message schema and update bigquery table with the latest generated schema.
- The protobuf message of a `google.protobuf.Timestamp` field might be needed when table partitioning is enabled.
- For INPUT_SCHEMA_DATA_TYPE = json, this sink will generate bigquery schema by infering incoming json. In future we will add support for json schema as well coming from stencil.
- The timestamp column is needed incase of partition table. It can be generated at the time of ingestion by setting the config. Please refer to config `SINK_BIGQUERY_ADD_EVENT_TIMESTAMP_ENABLE` in [depot bigquery sink config section](https://github.com/odpf/depot/blob/main/docs/reference/configuration/bigquery-sink.md#sink_bigquery_add_event_timestamp_enable)
- Google cloud credential with some bigquery permission is required to run this sink.

If you'd like to connect to a sink which is not yet supported, you can create a new sink by following the [contribution guidelines](../contribute/contribution.md)
5 changes: 5 additions & 0 deletions docs/docs/introduction.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ Discover why users choose Firehose as their main Kafka Consumer
- **Runtime** Firehose can run inside containers or VMs in a fully managed runtime environment like Kubernetes.
- **Metrics** Always know what’s going on with your deployment with built-in monitoring of throughput, response times, errors, and more.

## Supported Incoming data types from kafka
- [Protobuf](https://developers.google.com/protocol-buffers)
- [JSON](https://www.json.org/json-en.html)
prakharmathur82 marked this conversation as resolved.
Show resolved Hide resolved
- Supported limited to bigquery, elastic and mongo sink. In future support to other sinks will be added

## Supported Sinks:

Following sinks are supported in the Firehose
Expand Down
4 changes: 2 additions & 2 deletions docs/docs/reference/core-faqs.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Firehose provides various Kafka client configurations. Refer [Generic Configurat

## What all data formats are supported ?

Elasticsearch and MongoDB sink support both JSON and Protobuf as the input schema. For other sinks, we currently support only Protobuf. Support for JSON and Avro is planned and incorporated in our roadmap. Please refer to our Roadmap section for more details.
Elasticsearch , Bigquery and MongoDB sink support both JSON and Protobuf as the input schema. For other sinks, we currently support only Protobuf. Support for JSON and Avro is planned and incorporated in our roadmap. Please refer to our Roadmap section for more details.

Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data. Data streams on Kafka topics are bound to a Protobuf schema.

Expand Down Expand Up @@ -146,7 +146,7 @@ No, all fields from the input key/message will be sent by Firehose to the Sink.

Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data. Data streams on Kafka topics are bound to a Protobuf schema. Protobuf is much more lightweight that other schema formats like JSON, since it encodes the keys in the message to integers.

Elasticsearch and MongoDB sink support both JSON and Protobuf as the input schema.
Elasticsearch, Bigquery and MongoDB sink support both JSON and Protobuf as the input schema.

For other sinks, we currently support only Protobuf. Support for JSON and Avro is planned and incorporated in our roadmap. Please refer to our Roadmap section for more details.

Expand Down
5 changes: 3 additions & 2 deletions docs/docs/reference/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -771,15 +771,16 @@ section.

#### What all data formats are supported?

ElasticSearch and MongoDB sink support both JSON and Protobuf as the input schema. For other sinks, we currently support
ElasticSearch, Bigquery and MongoDB sink support both JSON and Protobuf as the input schema. For other sinks, we currently support
only Protobuf. Support for JSON and Avro is planned and incorporated in the roadmap.

Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serialising structured data.
Data streams on Kafka topics are bound to a Protobuf schema. Follow the instructions
When `INPUT_SCHEMA_DATA_TYPE=protobuf` Data streams on Kafka topics are bound to a Protobuf schema. Follow the instructions
in [this article](https://developers.google.com/protocol-buffers/docs/javatutorial) on how to create, compile and
serialize a Protobuf object to send it to a binary OutputStream.
Refer [this guide](https://developers.google.com/protocol-buffers/docs/proto3) for detailed Protobuf syntax and rules
to create a `.proto` file.
When `INPUT_SCHEMA_DATA_TYPE=json` data streams on kafka topics are bound to having a valid json message.

#### Can we select particular fields from the input message?

Expand Down
48 changes: 45 additions & 3 deletions docs/docs/sinks/bigquery-sink.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,48 @@
# BigQuery

A Bigquery sink Firehose \(`SINK_TYPE`=`bigquery`\) requires env variables to be set along with Generic ones and
env variables in depot repository. The Firehose sink uses bigquery implementation available [depot](https://github.com/odpf/depot) repository.
Bigquery Sink has several responsibilities, first creation of bigquery table and dataset when they are not exist, second update the bigquery table schema based on the latest schema defined in stencil or infer from incoming data, third translate incoming messages into bigquery records and insert them to bigquery tables.
Bigquery utilise Bigquery [Streaming API](https://cloud.google.com/bigquery/streaming-data-into-bigquery) to insert record into bigquery tables. For more info on the sink refer to [Depot Bigquery sink documentation](https://github.com/odpf/depot/blob/main/docs/sinks/bigquery.md)

## Asynchronous consumer mode

Bigquery Streaming API limits size of payload sent for each insert operations. The limitation reduces the amount of message allowed to be inserted when the message size is big.
This will reduce the throughput of bigquery sink. To increase the throughput, firehose provide kafka consumer asynchronous mode.
In asynchronous mode sink operation is executed asynchronously, so multiple sink task can be scheduled and run concurrently.
Throughput can be increased by increasing the number of sink pool.

## At Least Once Guarantee

Because of asynchronous consumer mode and the possibility of retry on the insert operation. There is no guarantee of the message order that successfully sent to the sink.
That also happened with commit offset, the there is no order of the offset number of the processed messages.
Firehose collect all the offset sort them and only commit the latest continuous offset.
This will ensure all the offset being committed after messages successfully processed even when some messages are being re processed by retry handler or when the insert operation took a long time.

## Configurations
For Bigquery sink in Firehose we need to set first \(`SINK_TYPE`=`bigquery`\). There are some generic configs which are common across different sink types which need to be set example: kafka consumer configs, the generic ones are mentioned in [generic.md](../advance/generic.md). Bigquery sink specific configs are mentioned in depot [Depot-configuration/bigquery-sink.md section](https://github.com/odpf/depot/blob/main/docs/reference/configuration/bigquery-sink.md)


prakharmathur82 marked this conversation as resolved.
Show resolved Hide resolved
## Bigquery table schema update
Refer to [Depot-bigquery.md#bigquery-table-schema-update section](https://github.com/odpf/depot/blob/main/docs/sinks/bigquery.md#bigquery-table-schema-update)

## Protobuf and BigQuery table type mapping
For type conversion between protobuf to bigquery type. Please refer to
[Depot-bigquery.md#protobuf-bigquery-table-type-mapping section](https://github.com/odpf/depot/blob/main/docs/sinks/bigquery.md#protobuf---bigquery-table-type-mapping)

## Partitioning
Bigquery Sink supports creation of table with partition configuration.
For more information refer to [Depot-bigquery.md#partitioning section](https://github.com/odpf/depot/blob/main/docs/sinks/bigquery.md#partitioning)

## Kafka Metadata
For data quality checking purpose sometimes kafka metadata need to be added on the record. For more information refer to [Depot-bigquery.md#metadata sectionn](https://github.com/odpf/depot/blob/main/docs/sinks/bigquery.md#metadata)

## Default columns for json data type
With dynamic schema for json we need to create table with some default columns, example like parition key needs to be set during creation of the table. Sample config `SINK_BIGQUERY_DEFAULT_COLUMNS =event_timestamp=timestamp`. For more information refer to [Depot-bigquery.md#default-columns-for-json-data-type section](https://github.com/odpf/depot/blob/main/docs/sinks/bigquery.md#default-columns-for-json-data-type)

## Error handling
The response can contain multiple errors which will be sent to the firehose from depot. Please refer to [Depot-bigquery.md#errors-handling section](https://github.com/odpf/depot/blob/main/docs/sinks/bigquery.md#errors-handling)


## Google Cloud Bigquery IAM Permission
Several IAM permission is required for bigquery sink to run properly. For more detail refer to [Depot-bigquery.md#google-cloud-bigquery-iam-permission section](https://github.com/odpf/depot/blob/main/docs/sinks/bigquery.md#google-cloud-bigquery-iam-permission)


[Configuration of Bigquery Sink] (https://github.com/odpf/depot/blob/main/docs/reference/configuration/bigquery-sink.md)