diff --git a/README.md b/README.md index 4f9fb20800..f94edd126c 100644 --- a/README.md +++ b/README.md @@ -81,7 +81,7 @@ There are several key concepts in Snowplow: events (self-describing, structured) **Please, use up-to-date terms:** * _Self-describing event_, not _unstructured event_ * _Entities_, not _contexts_ (it’s ok-ish to refer to a set of entities as “context”, but only in a casual sense, as in “these provide some context to the event”) -* _Failed events_ and not _bad rows_ +* _Failed events_ and not _bad rows_, unless specifically referring to the legacy bad row JSON format and associated tooling * If you are writing about schemas, pick “schema” or “data structure” and stick with it **Please, do not over-explain these in any of your writing.** Instead, just link to one of the existing concept pages: diff --git a/docs/collecting-data/collecting-from-own-applications/java-tracker/custom-tracking-using-schemas/index.md b/docs/collecting-data/collecting-from-own-applications/java-tracker/custom-tracking-using-schemas/index.md index 1339e165e7..8d3cc73c5e 100644 --- a/docs/collecting-data/collecting-from-own-applications/java-tracker/custom-tracking-using-schemas/index.md +++ b/docs/collecting-data/collecting-from-own-applications/java-tracker/custom-tracking-using-schemas/index.md @@ -6,15 +6,15 @@ sidebar_position: 30 Self-describing (self-referential) JSON schemas are at the core of Snowplow tracking. Read more about them [here](/docs/understanding-your-pipeline/schemas/index.md). They allow you to track completely customised data, and are also used internally throughout Snowplow pipelines. -In all our trackers, self-describing JSON are used in two places. One is in the `SelfDescribing` event type that wraps custom self-describing JSONs for sending. The second use is to attach custom data to any tracked event. It's one of the most powerful Snowplow features. +In all our trackers, self-describing JSON are used in two places. One is in the `SelfDescribing` event type that wraps custom self-describing JSONs for sending. The second use is to attach custom data to any tracked event. It's one of the most powerful Snowplow features. -When tracking user behavior, the event describes the specific activity they performed, e.g. a user added an item to an eCommerce cart. To understand the meaning of the event, and how it relates to your business, it's ideal to also track the relatively persistent environment in which the activity was performed. For example, is the user a repeat customer? Which item did they add, and how many are in stock? +When tracking user behavior, the event describes the specific activity they performed, e.g. a user added an item to an eCommerce cart. To understand the meaning of the event, and how it relates to your business, it's ideal to also track the relatively persistent environment in which the activity was performed. For example, is the user a repeat customer? Which item did they add, and how many are in stock? These environmental factors can be tracked as the event "context", using self-describing JSON. When self-describing JSON are tracked as part of an event, they are called "entities". All the entities of an event together form the context. Read more in this [thorough blog post](https://snowplowanalytics.com/blog/2020/03/25/what-are-snowplow-events-and-entities-and-what-makes-them-so-powerful/). ### Adding custom entities to any event -Every `Event.Builder` in the Java tracker allows for a list of `SelfDescribingJson` objects to be added to the `Event`. It's fine to add multiple entities of the same type. There's no official limit to how many entities you can add to a single event, but consider if the payload size could become problematic if you are adding a large number. +Every `Event.Builder` in the Java tracker allows for a list of `SelfDescribingJson` objects to be added to the `Event`. It's fine to add multiple entities of the same type. There's no official limit to how many entities you can add to a single event, but consider if the payload size could become problematic if you are adding a large number. Context entities can be added to any event using the `customContext()` Builder method: ```java @@ -36,11 +36,11 @@ The Java tracker does not yet provide the ability to automatically assign entiti The Java tracker provides the `SelfDescribingJson` class for custom events and entities. There is no in-built distinction between schemas used for events and those used for entities: they can be used interchangably. -Your schemas must be accessible to your pipeline, within an [Iglu server](/docs/pipeline-components-and-applications/iglu/index.md). Tracked events containing self-describing JSON are validated against their schemas during the enrichment phase of the pipeline. If the data don't match the schema, the events end up in the Bad Rows storage instead of the data warehouse. +Your schemas must be accessible to your pipeline, within an [Iglu server](/docs/pipeline-components-and-applications/iglu/index.md). Tracked events containing self-describing JSON are validated against their schemas during the enrichment phase of the pipeline. If the data don't match the schema, the events end up as [failed events](/docs/understanding-your-pipeline/failed-events/index.md). A self-describing JSON needs two keys, `schema` and `data`. The `schema` key is the Iglu URI for the schema. The `data` value must match the properties described by the specified schema. It is usually provided as a map. -A simple initialisation looks like this: +A simple initialisation looks like this: ```java // This map will be used for the "data" key Map eventData = new HashMap<>(); diff --git a/docs/collecting-data/collecting-from-own-applications/java-tracker/previous-versions/java-tracker-v0-12/custom-tracking-using-schemas/index.md b/docs/collecting-data/collecting-from-own-applications/java-tracker/previous-versions/java-tracker-v0-12/custom-tracking-using-schemas/index.md index 2ebec34906..5e508bed61 100644 --- a/docs/collecting-data/collecting-from-own-applications/java-tracker/previous-versions/java-tracker-v0-12/custom-tracking-using-schemas/index.md +++ b/docs/collecting-data/collecting-from-own-applications/java-tracker/previous-versions/java-tracker-v0-12/custom-tracking-using-schemas/index.md @@ -38,7 +38,7 @@ The Java tracker does not yet provide the ability to automatically assign entiti The Java tracker provides the `SelfDescribingJson` class for custom events and entities. There is no in-built distinction between schemas used for events and those used for entities: they can be used interchangably. -Your schemas must be accessible to your pipeline, within an [Iglu server](/docs/pipeline-components-and-applications/iglu/index.md). Tracked events containing self-describing JSON are validated against their schemas during the enrichment phase of the pipeline. If the data don't match the schema, the events end up in the Bad Rows storage instead of the data warehouse. +Your schemas must be accessible to your pipeline, within an [Iglu server](/docs/pipeline-components-and-applications/iglu/index.md). Tracked events containing self-describing JSON are validated against their schemas during the enrichment phase of the pipeline. If the data don't match the schema, the events end up as [failed events](/docs/understanding-your-pipeline/failed-events/index.md). A self-describing JSON needs two keys, `schema` and `data`. The `schema` key is the Iglu URI for the schema. The `data` value must match the properties described by the specified schema. It is usually provided as a map. diff --git a/docs/collecting-data/collecting-from-own-applications/react-native-tracker/custom-tracking-using-schemas/index.md b/docs/collecting-data/collecting-from-own-applications/react-native-tracker/custom-tracking-using-schemas/index.md index bec02dc7a7..2cd2e2d21d 100644 --- a/docs/collecting-data/collecting-from-own-applications/react-native-tracker/custom-tracking-using-schemas/index.md +++ b/docs/collecting-data/collecting-from-own-applications/react-native-tracker/custom-tracking-using-schemas/index.md @@ -22,7 +22,7 @@ A Self Describing event is a [self-describing JSON](http://snowplowanalytics.com **Required properties** - `schema`: (string) – A valid Iglu schema path. This must point to the location of the custom event’s schema, of the format: `iglu:{vendor}/{name}/{format}/{version}`. -- `data`: (object) – The custom data for your event. This data must conform to the schema specified in the `schema` argument, or the event will fail validation and land in bad rows. +- `data`: (object) – The custom data for your event. This data must conform to the schema specified in the `schema` argument, or the event will fail validation and become a [failed event](/docs/understanding-your-pipeline/failed-events/index.md). To track a custom self-describing event, use the `trackSelfDescribingEvent` method of the tracker. diff --git a/docs/collecting-data/collecting-from-own-applications/react-native-tracker/previous-version/react-native-tracker-v0-reference/index.md b/docs/collecting-data/collecting-from-own-applications/react-native-tracker/previous-version/react-native-tracker-v0-reference/index.md index b4e8baf4e4..2b3db0e822 100644 --- a/docs/collecting-data/collecting-from-own-applications/react-native-tracker/previous-version/react-native-tracker-v0-reference/index.md +++ b/docs/collecting-data/collecting-from-own-applications/react-native-tracker/previous-version/react-native-tracker-v0-reference/index.md @@ -38,7 +38,7 @@ tracker.trackScreenViewEvent({screenName: 'myScreenName'}); In the previous 0.1.x releases, initializing the tracker was done differently. As an example describing the API change for a quick migration to v0.2.0: ```typescript -/* Previous API (v0.1.x) +/* Previous API (v0.1.x) import Tracker from '@snowplow/react-native-tracker'; // (a) const initPromise = Tracker.initialize({ // (b) @@ -273,7 +273,7 @@ tracker.trackSelfDescribingEvent({ **Required properties**: - `schema`: (string) – A valid Iglu schema path. This must point to the location of the custom event’s schema, of the format: `iglu:{vendor}/{name}/{format}/{version}`. -- `data`: (object) – The custom data for your event. This data must conform to the schema specified in the `schema` argument, or the event will fail validation and land in bad rows. +- `data`: (object) – The custom data for your event. This data must conform to the schema specified in the `schema` argument, or the event will fail validation and become a [failed event](/docs/understanding-your-pipeline/failed-events/index.md). To attach custom contexts, pass a second argument to the function, containing an array of self-describing JSON. diff --git a/docs/destinations/forwarding-events/elasticsearch/index.md b/docs/destinations/forwarding-events/elasticsearch/index.md index 9427a0e9e8..d1888f3a2d 100644 --- a/docs/destinations/forwarding-events/elasticsearch/index.md +++ b/docs/destinations/forwarding-events/elasticsearch/index.md @@ -217,7 +217,7 @@ The sink is configured using a HOCON file, for which you can find examples [her | output.good.cluster.documentType | Optional. The Elasticsearch index type. Index types are deprecated in ES >=7.x Therefore, it shouldn't be set with ES >=7.x | | output.good.chunk.byteLimit | Optional. Bulk request to Elasticsearch will be splitted to chunks according given byte limit. Default value 1000000. | | output.good.chunk.recordLimit | Optional. Bulk request to Elasticsearch will be splitted to chunks according given record limit. Default value 500. | -| output.bad.type | Required. Configure where to write bad rows. Can be "kinesis", "nsq", "stderr" or "none". | +| output.bad.type | Required. Configure where to write failed events. Can be "kinesis", "nsq", "stderr" or "none". | | output.bad.streamName | Required. Stream name for events which are rejected by Elasticsearch. | | output.bad.region | Used when `output.bad.type` is kinesis. Optional if it can be resolved with [AWS region provider chain](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html). Region where the bad Kinesis stream is located. | | output.bad.customEndpoint | Used when `output.bad.type` is kinesis. Optional. Custom endpoint to override AWS Kinesis endpoints, this can be used to specify local endpoints when using localstack. | diff --git a/docs/enriching-your-data/available-enrichments/custom-api-request-enrichment/index.md b/docs/enriching-your-data/available-enrichments/custom-api-request-enrichment/index.md index 6a7d455200..1a8daaa051 100644 --- a/docs/enriching-your-data/available-enrichments/custom-api-request-enrichment/index.md +++ b/docs/enriching-your-data/available-enrichments/custom-api-request-enrichment/index.md @@ -137,7 +137,7 @@ To disable `ttl` so keys could be stored in cache until job is done `0` valu #### `ignoreOnError` -When set to `true`, no bad row will be emitted if the API call fails and the enriched event will be emitted without the context added by this enrichment. +When set to `true`, no failed event will be emitted if the API call fails and the enriched event will be emitted without the context added by this enrichment. ### Data sources diff --git a/docs/enriching-your-data/available-enrichments/custom-sql-enrichment/index.md b/docs/enriching-your-data/available-enrichments/custom-sql-enrichment/index.md index d9f3972ae5..64d7a80e3b 100644 --- a/docs/enriching-your-data/available-enrichments/custom-sql-enrichment/index.md +++ b/docs/enriching-your-data/available-enrichments/custom-sql-enrichment/index.md @@ -171,7 +171,7 @@ A Snowplow enrichment can run many millions of time per hour, effectively launch #### `ignoreOnError` -When set to `true`, no bad row will be emitted if the SQL query fails and the enriched event will be emitted without the context added by this enrichment. +When set to `true`, no failed event will be emitted if the SQL query fails and the enriched event will be emitted without the context added by this enrichment. ## Examples @@ -279,7 +279,7 @@ This single context would be added to the `derived_contexts` array: { "SKU": "456", "prod_name": "Ray-Bans" - } + } ] } ``` diff --git a/docs/pipeline-components-and-applications/enrichment-components/configuration-reference/index.md b/docs/pipeline-components-and-applications/enrichment-components/configuration-reference/index.md index 020e68bd77..84617e4d75 100644 --- a/docs/pipeline-components-and-applications/enrichment-components/configuration-reference/index.md +++ b/docs/pipeline-components-and-applications/enrichment-components/configuration-reference/index.md @@ -33,7 +33,7 @@ license { | `monitoring.metrics.stdout.prefix` | Optional. Default: `snowplow.enrich`. Prefix for the metrics appearing in the logs. | | `telemetry.disable` | Optional. Set to `true` to disable [telemetry](/docs/getting-started-on-community-edition/telemetry/index.md). | | `telemetry.userProvidedId` | Optional. See [here](/docs/getting-started-on-community-edition/telemetry/index.md#how-can-i-help) for more information. | -| `featureFlags.acceptInvalid` | Optional. Default: `false`. Enrich *3.0.0* introduces the validation of the enriched events against atomic schema before emitting. If set to `false`, a bad row will be emitted instead of the enriched event if validation fails. If set to `true`, invalid enriched events will be emitted, as before. | +| `featureFlags.acceptInvalid` | Optional. Default: `false`. Enrich *3.0.0* introduces the validation of the enriched events against atomic schema before emitting. If set to `false`, a failed event will be emitted instead of the enriched event if validation fails. If set to `true`, invalid enriched events will be emitted, as before. | | `featureFlags.legacyEnrichmentOrder` | Optional. Default: `false`. In early versions of `enrich-kinesis` and `enrich-pubsub` (>= *3.1.5*), the Javascript enrichment incorrectly ran before the currency, weather, and IP Lookups enrichments. Set this flag to true to keep the erroneous behavior of those previous versions. | | `validation.atomicFieldsLimits` (since *4.0.0*) | Optional. For the defaults, see [here](https://github.com/snowplow/enrich/blob/master/modules/common/src/main/resources/reference.conf). Configuration for custom maximum atomic fields (strings) length. It's a map-like structure with keys being atomic field names and values being their max allowed length. | @@ -44,7 +44,7 @@ Instead of a message queue, it's also possible to read collector payloads from f | `input.type`| Required. Must be `FileSystem`. | | `input.dir`| Required. E.g. `/input/collectorPayloads/`. Directory containing collector payloads encoded with Thrift. | -Likewise, it's possible to write enriched events, pii events and bad rows to files instead of PubSub or Kinesis. +Likewise, it's possible to write enriched events, pii events and failed events to files instead of PubSub or Kinesis. To write enriched events to files: @@ -54,12 +54,12 @@ To write enriched events to files: | `output.good.file` | Required. E.g. `/output/enriched`. File where enriched events will be written. | | `output.good.maxBytes` | Optional. E.g. `1048576`. Maximum size of a file in bytes. Triggers file rotation. | -To write bad rows to files: +To write failed events to files: | parameter | description | |-|-| | `output.bad.type` | Required. Must be `FileSystem`. | -| `output.bad.file` | Required. E.g. `/output/badRows`. File where bad rows will be written. | +| `output.bad.file` | Required. E.g. `/output/bad`. File where failed events will be written. | | `output.bad.maxBytes` | Optional. E.g. `1048576`. Maximum size of a file in bytes. Triggers file rotation. | To write pii events to files: @@ -86,10 +86,10 @@ A minimal configuration file can be found on the [Github repo](https://github.co | `output.good.delayThreshold` | Optional. Default: `200 milliseconds`. Delay threshold to use for batching. After this amount of time has elapsed, before `maxBatchSize` and `maxBatchBytes` have been reached, messages from the buffer will be sent. | | `output.good.maxBatchSize` | Optional. Default: `1000` (PubSub maximum). Maximum number of messages sent within a batch. When the buffer reaches this number of messages they are sent. | | `output.good.maxBatchBytes` | Optional. Default: `8000000` (PubSub maximum is 10MB). Maximum number of bytes sent within a batch. When the buffer reaches this size messages are sent. | -| `output.bad.topic` | Required. E.g. `projects/example-project/topics/badrows`. Name of the PubSub topic that will receive the bad rows. | -| `output.bad.delayThreshold` | Same as `output.good.delayThreshold` for bad rows. | -| `output.bad.maxBatchSize` | Same as `output.good.maxBatchSize` for bad rows. | -| `output.bad.maxBatchBytes` | Same as `output.good.maxBatchBytes` for bad rows. | +| `output.bad.topic` | Required. E.g. `projects/example-project/topics/bad`. Name of the PubSub topic that will receive the failed events. | +| `output.bad.delayThreshold` | Same as `output.good.delayThreshold` for failed events. | +| `output.bad.maxBatchSize` | Same as `output.good.maxBatchSize` for failed events. | +| `output.bad.maxBatchBytes` | Same as `output.good.maxBatchBytes` for failed events. | | `output.pii.topic` | Optional. Example: `projects/test-project/topics/pii`. Should be used in conjunction with the PII pseudonymization enrichment. When configured, enables an extra output topic for writing a `pii_transformation` event. | | `output.pii.attributes` | Same as `output.good.attributes` for pii events. | | `output.pii.delayThreshold` | Same as `output.good.delayThreshold` for pii events. | @@ -123,14 +123,14 @@ A minimal configuration file can be found on the [Github repo](https://github.co | `output.good.throttledBackoffPolicy.maxBackoff` (since *3.4.1*) | Optional. Default: `1 second`. Maximum backoff before retrying when writing fails in case of throughput exceeded. Writing is retried forever. | | `output.good.recordLimit` | Optional. Default: `500` (maximum allowed). Limits the number of events in a single PutRecords request. Several requests are made in parallel. | | `output.good.customEndpoint` | Optional. E.g. `http://localhost:4566`. To use a custom Kinesis endpoint. | -| `output.bad.streamName` | Required. E.g. `bad`. Name of the Kinesis stream to write to the bad rows. | -| `output.bad.region` | Same as `output.good.region` for bad rows. | -| `output.bad.backoffPolicy.minBackoff` | Same as `output.good.backoffPolicy.minBackoff` for bad rows. | -| `output.bad.backoffPolicy.maxBackoff` | Same as `output.good.backoffPolicy.maxBackoff` for bad rows. | -| `output.bad.backoffPolicy.maxRetries` | Same as `output.good.backoffPolicy.maxRetries` for bad rows. | -| `output.bad.throttledBackoffPolicy.minBackoff` (since *3.4.1*) | Same as `output.good.throttledBackoffPolicy.minBackoff` for bad rows. | -| `output.bad.throttledBackoffPolicy.maxBackoff` (since *3.4.1*) | Same as `output.good.throttledBackoffPolicy.maxBackoff` for bad rows. | -| `output.bad.recordLimit` | Same as `output.good.recordLimit` for bad rows. | +| `output.bad.streamName` | Required. E.g. `bad`. Name of the Kinesis stream to write to the failed events. | +| `output.bad.region` | Same as `output.good.region` for failed events. | +| `output.bad.backoffPolicy.minBackoff` | Same as `output.good.backoffPolicy.minBackoff` for failed events. | +| `output.bad.backoffPolicy.maxBackoff` | Same as `output.good.backoffPolicy.maxBackoff` for failed events. | +| `output.bad.backoffPolicy.maxRetries` | Same as `output.good.backoffPolicy.maxRetries` for failed events. | +| `output.bad.throttledBackoffPolicy.minBackoff` (since *3.4.1*) | Same as `output.good.throttledBackoffPolicy.minBackoff` for failed events. | +| `output.bad.throttledBackoffPolicy.maxBackoff` (since *3.4.1*) | Same as `output.good.throttledBackoffPolicy.maxBackoff` for failed events. | +| `output.bad.recordLimit` | Same as `output.good.recordLimit` for failed events. | | `output.bad.customEndpoint` | Same as `output.good.customEndpoint` for pii events. | | `output.pii.streamName` | Optional. E.g. `pii`. Should be used in conjunction with the PII pseudonymization enrichment. When configured, enables an extra output stream for writing a `pii_transformation` event. | | `output.pii.region` | Same as `output.good.region` for pii events. | @@ -188,7 +188,7 @@ A minimal configuration file can be found on the [Github repo](https://github.co | `output.good.topic` | Required. Name of the NSQ topic that will receive the enriched events. | | `output.good.nsqdHost` | Required. The host name of nsqd application. | | `output.good.nsqdPort` | Required. The port number of nsqd application. | -| `output.bad.topic` | Required. Name of the NSQ topic that will receive the bad rows. | +| `output.bad.topic` | Required. Name of the NSQ topic that will receive the failed events. | | `output.bad.nsqdHost` | Required. The host name of nsqd application. | | `output.bad.nsqdPort` | Required. The port number of nsqd application. | | `output.pii.topic` | Optional. Name of the NSQ topic that will receive the pii events. | @@ -199,7 +199,7 @@ A minimal configuration file can be found on the [Github repo](https://github.co Enriched events are expected to match [atomic](https://github.com/snowplow/iglu-central/blob/master/schemas/com.snowplowanalytics.snowplow/atomic/jsonschema/1-0-0) schema. However, until `3.0.0`, it was never checked that the enriched events emitted by enrich were valid. -If an event is not valid against `atomic` schema, a bad row should be emitted instead of the enriched event. +If an event is not valid against `atomic` schema, a [failed event](/docs/understanding-your-pipeline/failed-events/index.md) should be emitted instead of the enriched event. However, this is a breaking change, and we want to give some time to users to adapt, in case today they are working downstream with enriched events that are not valid against `atomic`. For this reason, this new validation was added as a feature that can be deactivated like that: @@ -214,9 +214,9 @@ It will be possible to know if the new validation would have had an impact by 2 1. A new metric `invalid_enriched` has been introduced. It reports the number of enriched events that were not valid against `atomic` schema. As the other metrics, it can be seen on stdout and/or StatsD. -2. Each time there is an enriched event invalid against `atomic` schema, a line will be logged with the bad row (add `-Dorg.slf4j.simpleLogger.log.InvalidEnriched=debug` to the `JAVA_OPTS` to see it). +2. Each time there is an enriched event invalid against `atomic` schema, a line will be logged with the failed event (add `-Dorg.slf4j.simpleLogger.log.InvalidEnriched=debug` to the `JAVA_OPTS` to see it). -If `acceptInvalid` is set to `false`, a bad row will be emitted instead of the enriched event in case it's not valid against `atomic` schema. +If `acceptInvalid` is set to `false`, a failed event will be emitted instead of the enriched event in case it's not valid against `atomic` schema. When we'll know that all our customers don't have any invalid enriched events any more, we'll remove the feature flags and it will be impossible to emit invalid enriched events. diff --git a/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/index.md b/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/index.md index 7ca5a84e00..354df95375 100644 --- a/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/index.md +++ b/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/index.md @@ -68,10 +68,10 @@ In order to load this data again from `failedInserts` to BigQuery you can use Repeater has several important behavior aspects: -- If a pulled record is not a valid Snowplow event, it will result into a `loader_recovery_error` bad row. +- If a pulled record is not a valid Snowplow event, it will result into a `loader_recovery_error` failed event. - If a pulled record is a valid event, Repeater will wait some time (15 minutes by default) after the `etl_tstamp` before attempting to re-insert it, in order to let Mutator do its job. -- If the database responds with an error, the row will get transformed into a `loader_recovery_error` bad row. -- All entities in the dead-letter bucket are valid Snowplow [bad rows](https://github.com/snowplow/snowplow-badrows). +- If the database responds with an error, the row will get transformed into a `loader_recovery_error` failed event. +- All entities in the dead-letter bucket are in the [bad rows format](https://github.com/snowplow/snowplow-badrows). ### Topics, subscriptions and message formats @@ -286,7 +286,7 @@ We recommend constantly running Repeater on a small / cheap node or Docker conta --config=/configs/bigquery.hocon \\ --resolver=/configs/resolver.json \\ --bufferSize=20 \\ # size of the batch to send to the dead-letter bucket - --timeout=20 \\ # duration after which bad rows will be sunk into the dead-letter bucket + --timeout=20 \\ # duration after which failed events will be sunk into the dead-letter bucket --backoffPeriod=900 \\ # seconds to wait before attempting an insert (calculated against etl_tstamp) --verbose # optional, for debugging only `} diff --git a/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/previous-versions/bigquery-loader-0-3-0/images/bigquery-microservices-architecture.png b/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/previous-versions/bigquery-loader-0-3-0/images/bigquery-microservices-architecture.png deleted file mode 100644 index f7ab3333cb..0000000000 Binary files a/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/previous-versions/bigquery-loader-0-3-0/images/bigquery-microservices-architecture.png and /dev/null differ diff --git a/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/previous-versions/bigquery-loader-0-3-0/index.md b/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/previous-versions/bigquery-loader-0-3-0/index.md deleted file mode 100644 index fbe3d6ae41..0000000000 --- a/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/previous-versions/bigquery-loader-0-3-0/index.md +++ /dev/null @@ -1,288 +0,0 @@ ---- -title: "BigQuery Loader (0.3.x)" -date: "2020-03-11" -sidebar_position: 30 ---- - -Please be aware that we have identified a security vulnerability in BigQuery Repeater in this version, which we've fixed in version [0.4.2](/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/previous-versions/bigquery-loader-0-4-0/index.md). You can find more details on our [Discourse forum](https://discourse.snowplow.io/t/important-notice-snowplow-bigquery-loader-vulnerability-and-fix/3783). - -Snowplow supports streaming data into BigQuery in near real-time. - -In order to do this, you need to setup the [BigQuery Loader](https://github.com/snowplow-incubator/snowplow-bigquery-loader). This loads enriched events from the enriched Pub/Sub topic, and streams them into BigQuery. - -## Technical Architecture - -The available tools are: - -1. **Snowplow BigQuery Loader**, an [Apache Beam](https://beam.apache.org/) job that reads Snowplow enriched data from Google Pub/Sub, transforms it into BigQuery-friendly format and loads it. It also writes information about encountered data types into an auxiliary `typesTopic` Pub/Sub topic. -2. **Snowplow BigQuery Mutator**, a Scala app that reads the `typesTopic` (via `typesSubscription`) and performs table mutations to add new columns as required. -3. **Snowplow BigQuery Repeater**, a Scala app that reads `failedInserts` (caused by _mutation lag_) and tries to re-insert them into BigQuery after some delay, sinking failures into a dead-end bucket. -4. **Snowplow BigQuery Forwarder**, an alternative to Repeater implemented as an Apache Beam job. In most cases, we recommend using Repeater. - -![](images/bigquery-microservices-architecture.png) - -In addition it also includes a fourth microservice, the "forwarder". This has been replaced by the "repeater" - we recommend users setup the repeater rather than the forwarder. - -### Snowplow BigQuery Loader - -#### Overview - -An Apache Beam job intended to run on [Google Dataflow](https://cloud.google.com/dataflow/) and load enriched data from `enriched` Pub/Sub topic to Google BigQuery. - -#### Algorithm - -- Reads Snowplow enriched events from `input` Pub/Sub subscription. -- Uses the JSON transformer from the [Snowplow Scala Analytics SDK](https://github.com/snowplow/snowplow-scala-analytics-sdk) to convert those enriched events into JSONs. -- Uses [Iglu Client](https://github.com/snowplow/iglu-scala-client/) to fetch JSON schemas for self-describing events and entities. -- Uses [Iglu Schema DDL](https://github.com/snowplow/iglu/tree/master/0-common/schema-ddl) to transform self-describing events and entities into BigQuery format. -- Writes transformed data into BigQuery. -- Writes all encountered Iglu types into a `typesTopic`. -- Writes all data failed to be processed into a `badRows` topic. -- Writes data that succeeded to be transformed, but failed to be loaded into a `failedInserts` topic. - -### Snowplow BigQuery Mutator - -#### Overview - -This is a Scala app that reads data from the `typesTopic` via a `typesSubscription` and performs table mutations. - -#### Algorithm - -- Reads messages from `typesSubscription`. -- Finds out if a message contains a type that has not been encountered yet (by checking internal cache). -- If a message contains a new type, double-checks it with the connected BigQuery table. -- If the type is not in the table, fetches its JSON schema from Iglu Registry. -- Transforms JSON schema into BigQuery column definition. -- Adds the column to the connected BigQuery table. - -### Snowplow BigQuery Repeater - -A JVM application that reads a `failedInserts` subscription and tries to re-insert them into BigQuery to overcome mutation lag. - -#### Overview - -Repeater has several important behavior aspects: - -- If a pulled record is not a valid Snowplow event, it will result into a `loader_recovery_error` bad row. -- If a pulled record is a valid event, Repeater will wait some time (5 minutes by default) after the `etl_tstamp` before attempting to re-insert it, in order to let Mutator do its job. -- If the database responds with an error, the row will get transformed into a `loader_recovery_error` bad row. -- All entities in the dead-end bucket are valid Snowplow bad rows. - -#### Mutation lag - -Loader inserts data into BigQuery in near real-time. At the same time, it sinks `shredded_type` payloads into the `typesTopic` approximately every 5 seconds. It also can take up to 10-15 seconds for Mutator to fetch, parse the message and execute an `ALTER TABLE` statement against the table. - -If a new type arrives from `input` subscription in this period of time and Mutator fails to handle it, BigQuery will reject the row containing it and it will be sent to the `failedInserts` topic. This topic contains JSON objects _ready to be loaded into BigQuery_ (ie not canonical Snowplow Enriched event format). - -In order to load this data again from `failedInserts` to BigQuery you can use Repeater or Forwarder (see below). Both read a subscription from `failedInserts` and perform `INSERT` statements. - -### Snowplow BigQuery Forwarder - -Used for exactly the same purpose as Repeater, but uses Dataflow under the hood, which makes it suitable for very big amounts of data. At the same time, it has several important drawbacks compared with Repeater: - -- User needs to re-launch it manually when failed inserts appear. -- Otherwise, it could be extremely expensive to run a Dataflow job that idles 99.9% of the time (it cannot terminate as it is a streaming/infinite job1). -- There's no way to tell Forwarder that it should take a pause before inserting rows back. Without the pause there's a chance that Mutator doesn't get a chance to alter the table. -- Forwarder keeps retrying all inserts (default behavior for streaming Dataflow jobs), while Repeater has a dead-end GCS bucket. -- In order to debug a problem with Forwarder, operator needs to inspect Stackdriver logs. - -1Forwarder is a very generic and primitive Dataflow job. It could be launched using the standard Dataflow templates. But a standard template job cannot accept a subscription as a source, only a topic. That means the job must be running all the time and most of the time it will be idle. - -### Topics and message formats - -Snowplow BigQuery Loader uses Google Pub/Sub topics and subscriptions to store intermediate data and communicate between applications. - -- `input` subscription -- data enriched by Beam Enrich, in canonical `TSV+JSON format`; -- `typesTopic` -- all shredded types in `iglu:com.snowplowanalytics.snowplow/shredded_type/jsonschema/1-0-0` self-describing payload encountered by Loader are sinked here with ~5 seconds interval; -- `typesSubscription` -- a subscription to `typesTopic` used by Mutator with `iglu:com.snowplowanalytics.snowplow/shredded_type/jsonschema/1-0-0` self-describing payloads; -- `badRows` topic -- data that could not be processed by Loader due to Iglu Registry unavailability, formatted as `bad rows`; -- `failedInserts` topic -- data that has been successfully transformed by Loader, but failed loading to BigQuery usually due to mutation lag, formatted as `BigQuery JSON`. - -## Setup guide - -### Configuration file - -Loader, Mutator, Repeater and Forwarder accept the same configuration file with [iglu:com.snowplowanalytics.snowplow.storage/bigquery_config/jsonschema/](https://github.com/snowplow/iglu-central/tree/master/schemas/com.snowplowanalytics.snowplow.storage/bigquery_config/jsonschema) schema, which looks like this: - -```json -{ - "schema": "iglu:com.snowplowanalytics.snowplow.storage/bigquery_config/jsonschema/1-0-0", - "data": { - "name": "Alpha BigQuery test", - "id": "31b1559d-d319-4023-aaae-97698238d808", - - "projectId": "com-acme", - "datasetId": "snowplow", - "tableId": "events", - - "input": "enriched-good-sub", - "typesTopic": "bq-test-types", - "typesSubscription": "bq-test-types-sub", - "badRows": "bq-test-bad-rows", - "failedInserts": "bq-test-bad-inserts", - - "load": { - "mode": "STREAMING_INSERTS", - "retry": false - }, - - "purpose": "ENRICHED_EVENTS" - } -} -``` - -- All topics and subscriptions (`input`, `typesTopic`, `typesSubscription`, `badRows` and `failedInserts`) are explained in the [topics and message formats](/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/previous-versions/bigquery-loader-0-3-0/index.md#topics-and-message-formats) section. -- `projectId` is used to group all resources (topics, subscriptions and BigQuery table). -- `datasetId` and `tableId` (along with `projectId`) specify the target BigQuery table. -- `name` is an arbitrary human-readable description of the storage target. -- `id` is a unique identificator in UUID format. -- `load` specifies the loading mode and is explained in the dedicated section below. -- `purpose` is a standard storage configuration. Its only valid value currently is `ENRICHED_EVENTS`. - -#### Loading mode - -BigQuery supports two loading APIs: - -- [Streaming inserts API](https://cloud.google.com/bigquery/streaming-data-into-bigquery) -- [Load jobs API](https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs) - -You can use the `load` property to configure Loader to use one of them. - -For example, the configuration for using **streaming inserts** can look like this: - -```json -{ - "load": { - "mode": "STREAMING_INSERTS", - "retry": false - } -} -``` - -`retry` specifies if failed inserts (eg due to mutation lag) should be retried infinitely or sent straight to the `failedInserts` topic. If a row cannot be inserted, it will be re-tried indefinitely, which can throttle the whole job. In that case a restart might be required. - -The configuration for using **load jobs** can look like this: - -```json -{ - "load": { - "mode": "FILE_LOADS", - "frequency": 60000 - } -} -``` - -`frequency` specifies how often the load job should be performed, in seconds. Unlike the near-real-time **streaming inserts** API, load jobs are more batch-oriented. - -Load jobs do not support `retry` (and streaming inserts do not support `frequency`). - -It is generally recommended to stick with the **streaming jobs** API without retries and use [Repeater](/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/previous-versions/bigquery-loader-0-3-0/index.md#repeater) to recover data from `failedInserts`. However, the **load jobs** API is cheaper and generates fewer duplicates. - -#### Command line options - -All four apps accept a path to a config file as specified above, and to an Iglu resolver config. - -#### Loader - -Loader accepts two required arguments, one optional argument, and [any other](https://cloud.google.com/dataflow/pipelines/specifying-exec-params#setting-other-cloud-pipeline-options) supported by Google Cloud Dataflow. - -```bash -$ ./snowplow-bigquery-loader \ - --config=$CONFIG \ - --resolver=$RESOLVER - --labels={"key1":"val1","key2":"val2"} # optional -``` - -The optional `labels` argument accepts a JSON with key-value pairs that will be used as [labels](https://cloud.google.com/compute/docs/labeling-resources) to the Cloud Dataflow job. This will be helpful if you are launching Loader as a Kubernetes job: because labels assigned to the job will not be replicated in the resulting Dataflow job. - -This can be launched from any machine authenticated to submit Dataflow jobs. - -#### Mutator - -Mutator has three subcommands: `listen`, `create` and `add-column`. - -##### `listen` - -`listen` is the primary command and is used to automate table migrations. - -```bash -$ ./snowplow-bigquery-mutator \ - listen - --config $CONFIG \ - --resolver $RESOLVER \ - --verbose # Optional, for debugging only -``` - -##### `add-column` - -`add-column` can be used once to add a column manually. This should eliminate the risk of mutation lag and the necessity to run a Repeater or Forwarder job. - -```bash -$ ./snowplow-bigquery-mutator \ - add-column \ - --config $CONFIG \ - --resolver $RESOLVER \ - --shred-property CONTEXTS \ --schema iglu:com.acme/app_context/jsonschema/1-0-0 -``` - -The specified schema must be present in one of the Iglu registries in the resolver configuration. - -##### `create` - -`create` creates an empty table with `atomic` structure. - -```bash -$ ./snowplow-bigquery-mutator \ - create \ - --config $CONFIG \ - --resolver $RESOLVER -``` - -### Repeater - -We recommend constantly running Repeater on a small / cheap node or Docker container. - -```bash -$ ./snowplow-bigquery-repeater \ - create \ - --config $CONFIG \ - --resolver $RESOLVER \ - --failedInsertsSub $FAILED_INSERTS_SUB \ - --deadEndBucket $DEAD_END_GCS \ # Must start with gcs:\\ prefix - --desperatesBufferSize 20 \ # Size of the batch to send to the dead-end bucket - --desperatesWindow 20 \ # Window duration after which bad rows will be sunk into the dead-end bucket - --backoffPeriod 900 # Seconds to wait before attempting a re-insert (calculated against etl_tstamp) -``` - -`desperatesBufferSize`, `desperatesWindow` and `backoffPeriod` are optional parameters. - -#### Forwarder - -Like Loader, Forwarder can be submitted from any machine authenticated to submit Dataflow jobs. - -```bash -$ ./snowplow-bigquery-forwarder \ - --config=$CONFIG \ - --resolver=$RESOLVER - --labels={"key1":"val1","key2":"val2"} # optional - --failedInsertsSub=$FAILED_INSERTS_SUB -``` - -Its only unique option is `failedInsertsSub`, which is a subscription (that must be created _upfront_) to the `failedInserts` topic. - -The `labels` argument works the same as with Loader. - -By convention both Dataflow jobs (Forwarder and Loader) accept CLI options with `=` symbol and camelCase, while Mutator and Repeater accept them in UNIX style (without `=`). - -### Docker support - -All four applications are available as Docker images. - -- `snowplow-docker-registry.bintray.io/snowplow/snowplow-bigquery-loader:0.3.0` -- `snowplow-docker-registry.bintray.io/snowplow/snowplow-bigquery-mutator:0.3.0` -- `snowplow-docker-registry.bintray.io/snowplow/snowplow-bigquery-repeater:0.3.0` -- `snowplow-docker-registry.bintray.io/snowplow/snowplow-bigquery-forwarder:0.3.0` - -### Partitioning - -During initial setup it is strongly recommended to [setup partitioning](https://cloud.google.com/bigquery/docs/creating-column-partitions) on the `derived_tstamp` property. Mutator's `create` command does not automatically add partitioning yet. diff --git a/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/previous-versions/bigquery-loader-0-4-0/images/bigquery-microservices-architecture.png b/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/previous-versions/bigquery-loader-0-4-0/images/bigquery-microservices-architecture.png deleted file mode 100644 index f7ab3333cb..0000000000 Binary files a/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/previous-versions/bigquery-loader-0-4-0/images/bigquery-microservices-architecture.png and /dev/null differ diff --git a/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/previous-versions/bigquery-loader-0-4-0/index.md b/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/previous-versions/bigquery-loader-0-4-0/index.md deleted file mode 100644 index f9fe4af5b9..0000000000 --- a/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/previous-versions/bigquery-loader-0-4-0/index.md +++ /dev/null @@ -1,282 +0,0 @@ ---- -title: "BigQuery Loader (0.4.x)" -date: "2020-03-11" -sidebar_position: 20 ---- - -## Technical Architecture - -The available tools are: - -1. **Snowplow BigQuery Loader**, an [Apache Beam](https://beam.apache.org/) job that reads Snowplow enriched data from Google Pub/Sub, transforms it into BigQuery-friendly format and loads it. It also writes information about encountered data types into an auxiliary `typesTopic` Pub/Sub topic. -2. **Snowplow BigQuery Mutator**, a Scala app that reads the `typesTopic` (via `typesSubscription`) and performs table mutations to add new columns as required. -3. **Snowplow BigQuery Repeater**, a Scala app that reads `failedInserts` (caused by _mutation lag_) and tries to re-insert them into BigQuery after some delay, sinking failures into a dead-end bucket. -4. **Snowplow BigQuery Forwarder**, an alternative to Repeater implemented as an Apache Beam job. In most cases, we recommend using Repeater. - -![](images/bigquery-microservices-architecture.png) - -In addition it also includes a fourth microservice, the "forwarder". This has been replaced by the "repeater" - we recommend users setup the repeater rather than the forwarder. - -### Snowplow BigQuery Loader - -#### Overview - -An Apache Beam job intended to run on [Google Dataflow](https://cloud.google.com/dataflow/) and load enriched data from `enriched` Pub/Sub topic to Google BigQuery. - -#### Algorithm - -- Reads Snowplow enriched events from `input` Pub/Sub subscription. -- Uses the JSON transformer from the [Snowplow Scala Analytics SDK](https://github.com/snowplow/snowplow-scala-analytics-sdk) to convert those enriched events into JSONs. -- Uses [Iglu Client](https://github.com/snowplow/iglu-scala-client/) to fetch JSON schemas for self-describing events and entities. -- Uses [Iglu Schema DDL](https://github.com/snowplow/iglu/tree/master/0-common/schema-ddl) to transform self-describing events and entities into BigQuery format. -- Writes transformed data into BigQuery. -- Writes all encountered Iglu types into a `typesTopic`. -- Writes all data failed to be processed into a `badRows` topic. -- Writes data that succeeded to be transformed, but failed to be loaded into a `failedInserts` topic. - -### Snowplow BigQuery Mutator - -#### Overview - -This is a Scala app that reads data from the `typesTopic` via a `typesSubscription` and performs table mutations. - -#### Algorithm - -- Reads messages from `typesSubscription`. -- Finds out if a message contains a type that has not been encountered yet (by checking internal cache). -- If a message contains a new type, double-checks it with the connected BigQuery table. -- If the type is not in the table, fetches its JSON schema from Iglu Registry. -- Transforms JSON schema into BigQuery column definition. -- Adds the column to the connected BigQuery table. - -### Snowplow BigQuery Repeater - -A JVM application that reads a `failedInserts` subscription and tries to re-insert them into BigQuery to overcome mutation lag. - -#### Overview - -Repeater has several important behavior aspects: - -- If a pulled record is not a valid Snowplow event, it will result into a `loader_recovery_error` bad row. -- If a pulled record is a valid event, Repeater will wait some time (5 minutes by default) after the `etl_tstamp` before attempting to re-insert it, in order to let Mutator do its job. -- If the database responds with an error, the row will get transformed into a `loader_recovery_error` bad row. -- All entities in the dead-end bucket are valid Snowplow bad rows. - -#### Mutation lag - -Loader inserts data into BigQuery in near real-time. At the same time, it sinks `shredded_type` payloads into the `typesTopic` approximately every 5 seconds. It also can take up to 10-15 seconds for Mutator to fetch, parse the message and execute an `ALTER TABLE` statement against the table. - -If a new type arrives from `input` subscription in this period of time and Mutator fails to handle it, BigQuery will reject the row containing it and it will be sent to the `failedInserts` topic. This topic contains JSON objects _ready to be loaded into BigQuery_ (ie not canonical Snowplow Enriched event format). - -In order to load this data again from `failedInserts` to BigQuery you can use Repeater or Forwarder (see below). Both read a subscription from `failedInserts` and perform `INSERT` statements. - -### Snowplow BigQuery Forwarder - -Used for exactly the same purpose as Repeater, but uses Dataflow under the hood, which makes it suitable for very big amounts of data. At the same time, it has several important drawbacks compared with Repeater: - -- User needs to re-launch it manually when failed inserts appear. -- Otherwise, it could be extremely expensive to run a Dataflow job that idles 99.9% of the time (it cannot terminate as it is a streaming/infinite job1). -- There's no way to tell Forwarder that it should take a pause before inserting rows back. Without the pause there's a chance that Mutator doesn't get a chance to alter the table. -- Forwarder keeps retrying all inserts (default behavior for streaming Dataflow jobs), while Repeater has a dead-end GCS bucket. -- In order to debug a problem with Forwarder, operator needs to inspect Stackdriver logs. - -1Forwarder is a very generic and primitive Dataflow job. It could be launched using the standard Dataflow templates. But a standard template job cannot accept a subscription as a source, only a topic. That means the job must be running all the time and most of the time it will be idle. - -### Topics and message formats - -Snowplow BigQuery Loader uses Google Pub/Sub topics and subscriptions to store intermediate data and communicate between applications. - -- `input` subscription -- data enriched by Beam Enrich, in canonical `TSV+JSON format`; -- `typesTopic` -- all shredded types in `iglu:com.snowplowanalytics.snowplow/shredded_type/jsonschema/1-0-0` self-describing payload encountered by Loader are sinked here with ~5 seconds interval; -- `typesSubscription` -- a subscription to `typesTopic` used by Mutator with `iglu:com.snowplowanalytics.snowplow/shredded_type/jsonschema/1-0-0` self-describing payloads; -- `badRows` topic -- data that could not be processed by Loader due to Iglu Registry unavailability, formatted as `bad rows`; -- `failedInserts` topic -- data that has been successfully transformed by Loader, but failed loading to BigQuery usually due to mutation lag, formatted as `BigQuery JSON`. - -## Setup guide - -### Configuration file - -Loader, Mutator, Repeater and Forwarder accept the same configuration file with [iglu:com.snowplowanalytics.snowplow.storage/bigquery_config/jsonschema/](https://github.com/snowplow/iglu-central/tree/master/schemas/com.snowplowanalytics.snowplow.storage/bigquery_config/jsonschema) schema, which looks like this: - -```json -{ - "schema": "iglu:com.snowplowanalytics.snowplow.storage/bigquery_config/jsonschema/1-0-0", - "data": { - "name": "Alpha BigQuery test", - "id": "31b1559d-d319-4023-aaae-97698238d808", - - "projectId": "com-acme", - "datasetId": "snowplow", - "tableId": "events", - - "input": "enriched-good-sub", - "typesTopic": "bq-test-types", - "typesSubscription": "bq-test-types-sub", - "badRows": "bq-test-bad-rows", - "failedInserts": "bq-test-bad-inserts", - - "load": { - "mode": "STREAMING_INSERTS", - "retry": false - }, - - "purpose": "ENRICHED_EVENTS" - } -} -``` - -- All topics and subscriptions (`input`, `typesTopic`, `typesSubscription`, `badRows` and `failedInserts`) are explained in the [topics and message formats](#topics-and-message-formats) section. -- `projectId` is used to group all resources (topics, subscriptions and BigQuery table). -- `datasetId` and `tableId` (along with `projectId`) specify the target BigQuery table. -- `name` is an arbitrary human-readable description of the storage target. -- `id` is a unique identificator in UUID format. -- `load` specifies the loading mode and is explained in the dedicated section below. -- `purpose` is a standard storage configuration. Its only valid value currently is `ENRICHED_EVENTS`. - -#### Loading mode - -BigQuery supports two loading APIs: - -- [Streaming inserts API](https://cloud.google.com/bigquery/streaming-data-into-bigquery) -- [Load jobs API](https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs) - -You can use the `load` property to configure Loader to use one of them. - -For example, the configuration for using **streaming inserts** can look like this: - -```json -{ - "load": { - "mode": "STREAMING_INSERTS", - "retry": false - } -} -``` - -`retry` specifies if failed inserts (eg due to mutation lag) should be retried infinitely or sent straight to the `failedInserts` topic. If a row cannot be inserted, it will be re-tried indefinitely, which can throttle the whole job. In that case a restart might be required. - -The configuration for using **load jobs** can look like this: - -```json -{ - "load": { - "mode": "FILE_LOADS", - "frequency": 60000 - } -} -``` - -`frequency` specifies how often the load job should be performed, in seconds. Unlike the near-real-time **streaming inserts** API, load jobs are more batch-oriented. - -Load jobs do not support `retry` (and streaming inserts do not support `frequency`). - -It is generally recommended to stick with the **streaming jobs** API without retries and use [Repeater](#snowplow-bigquery-repeater) to recover data from `failedInserts`. However, the **load jobs** API is cheaper and generates fewer duplicates. - -### Command line options - -All four apps accept a path to a config file as specified above, and to an Iglu resolver config. - -#### Loader - -Loader accepts two required arguments, one optional argument, and [any other](https://cloud.google.com/dataflow/pipelines/specifying-exec-params#setting-other-cloud-pipeline-options) supported by Google Cloud Dataflow. - -```bash -$ ./snowplow-bigquery-loader \ - --config=$CONFIG \ - --resolver=$RESOLVER - --labels={"key1":"val1","key2":"val2"} # optional -``` - -The optional `labels` argument accepts a JSON with key-value pairs that will be used as [labels](https://cloud.google.com/compute/docs/labeling-resources) to the Cloud Dataflow job. This will be helpful if you are launching Loader as a Kubernetes job: because labels assigned to the job will not be replicated in the resulting Dataflow job. - -This can be launched from any machine authenticated to submit Dataflow jobs. - -#### Mutator - -Mutator has three subcommands: `listen`, `create` and `add-column`. - -##### `listen` - -`listen` is the primary command and is used to automate table migrations. - -```bash -$ ./snowplow-bigquery-mutator \ - listen - --config $CONFIG \ - --resolver $RESOLVER \ - --verbose # Optional, for debugging only -``` - -##### `add-column` - -`add-column` can be used once to add a column manually. This should eliminate the risk of mutation lag and the necessity to run a Repeater or Forwarder job. - -```bash -$ ./snowplow-bigquery-mutator \ - add-column \ - --config $CONFIG \ - --resolver $RESOLVER \ - --shred-property CONTEXTS \ --schema iglu:com.acme/app_context/jsonschema/1-0-0 -``` - -The specified schema must be present in one of the Iglu registries in the resolver configuration. - -##### `create` - -`create` creates an empty table with `atomic` structure. - -```bash -$ ./snowplow-bigquery-mutator \ - create \ - --config $CONFIG \ - --resolver $RESOLVER -``` - -#### Repeater - -We recommend constantly running Repeater on a small / cheap node or Docker container. - -```bash -$ ./snowplow-bigquery-repeater \ - create \ - --config $CONFIG \ - --resolver $RESOLVER \ - --failedInsertsSub $FAILED_INSERTS_SUB \ - --deadEndBucket $DEAD_END_GCS \ # Must start with gcs:\\ prefix - --desperatesBufferSize 20 \ # Size of the batch to send to the dead-end bucket - --desperatesWindow 20 \ # Window duration after which bad rows will be sunk into the dead-end bucket - --backoffPeriod 900 # Seconds to wait before attempting a re-insert (calculated against etl_tstamp) -``` - -`desperatesBufferSize`, `desperatesWindow` and `backoffPeriod` are optional parameters. - -#### Forwarder - -Like Loader, Forwarder can be submitted from any machine authenticated to submit Dataflow jobs. - -```bash -$ ./snowplow-bigquery-forwarder \ - --config=$CONFIG \ - --resolver=$RESOLVER - --labels={"key1":"val1","key2":"val2"} # optional - --failedInsertsSub=$FAILED_INSERTS_SUB -``` - -Its only unique option is `failedInsertsSub`, which is a subscription (that must be created _upfront_) to the `failedInserts` topic. - -The `labels` argument works the same as with Loader. - -By convention both Dataflow jobs (Forwarder and Loader) accept CLI options with `=` symbol and camelCase, while Mutator and Repeater accept them in UNIX style (without `=`). - -### Docker support - -All four applications are available as Docker images. - -- `snowplow/snowplow-bigquery-loader:0.4.2` -- `snowplow/snowplow-bigquery-mutator:0.4.2` -- `snowplow/snowplow-bigquery-repeater:0.4.2` -- `snowplow/snowplow-bigquery-forwarder:0.4.2` - -### Partitioning - -During initial setup it is strongly recommended to [setup partitioning](https://cloud.google.com/bigquery/docs/creating-column-partitions) on the `derived_tstamp` property. Mutator's `create` command does not automatically add partitioning yet. diff --git a/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/previous-versions/bigquery-loader-0-5-0/images/bigquery-microservices-architecture.png b/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/previous-versions/bigquery-loader-0-5-0/images/bigquery-microservices-architecture.png deleted file mode 100644 index f7ab3333cb..0000000000 Binary files a/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/previous-versions/bigquery-loader-0-5-0/images/bigquery-microservices-architecture.png and /dev/null differ diff --git a/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/previous-versions/bigquery-loader-0-5-0/index.md b/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/previous-versions/bigquery-loader-0-5-0/index.md deleted file mode 100644 index e0501b2af0..0000000000 --- a/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/previous-versions/bigquery-loader-0-5-0/index.md +++ /dev/null @@ -1,286 +0,0 @@ ---- -title: "BigQuery Loader (0.5.x)" -date: "2020-05-18" -sidebar_position: 10 ---- - -## Technical Architecture - -The available tools are: - -1. **Snowplow BigQuery Loader**, an [Apache Beam](https://beam.apache.org/) job that reads Snowplow enriched data from Google Pub/Sub, transforms it into BigQuery-friendly format and loads it. It also writes information about encountered data types into an auxiliary `typesTopic` Pub/Sub topic. -2. **Snowplow BigQuery Mutator**, a Scala app that reads the `typesTopic` (via `typesSubscription`) and performs table mutations to add new columns as required. -3. **Snowplow BigQuery Repeater**, a Scala app that reads `failedInserts` (caused by _mutation lag_) and tries to re-insert them into BigQuery after some delay, sinking failures into a dead-end bucket. -4. **[DEPRECATED] Snowplow BigQuery Forwarder**, an alternative to Repeater implemented as an Apache Beam job. This component has been deprecated from version 0.5.0. Please use Repeater instead. - -![](images/bigquery-microservices-architecture.png) - -In addition it also includes a fourth microservice, the "repeater". - -### Snowplow BigQuery Loader - -#### Overview - -An Apache Beam job intended to run on [Google Dataflow](https://cloud.google.com/dataflow/) and load enriched data from `enriched` Pub/Sub topic to Google BigQuery. - -#### Algorithm - -- Reads Snowplow enriched events from `input` Pub/Sub subscription. -- Uses the JSON transformer from the [Snowplow Scala Analytics SDK](https://github.com/snowplow/snowplow-scala-analytics-sdk) to convert those enriched events into JSONs. -- Uses [Iglu Client](https://github.com/snowplow/iglu-scala-client/) to fetch JSON schemas for self-describing events and entities. -- Uses [Iglu Schema DDL](https://github.com/snowplow/iglu/tree/master/0-common/schema-ddl) to transform self-describing events and entities into BigQuery format. -- Writes transformed data into BigQuery. -- Writes all encountered Iglu types into a `typesTopic`. -- Writes all data failed to be processed into a `badRows` topic. -- Writes data that succeeded to be transformed, but failed to be loaded into a `failedInserts` topic. - -### Snowplow BigQuery Mutator - -#### Overview - -This is a Scala app that reads data from the `typesTopic` via a `typesSubscription` and performs table mutations. - -#### Algorithm - -- Reads messages from `typesSubscription`. -- Finds out if a message contains a type that has not been encountered yet (by checking internal cache). -- If a message contains a new type, double-checks it with the connected BigQuery table. -- If the type is not in the table, fetches its JSON schema from Iglu Registry. -- Transforms JSON schema into BigQuery column definition. -- Adds the column to the connected BigQuery table. - -### Snowplow BigQuery Repeater - -A JVM application that reads a `failedInserts` subscription and tries to re-insert them into BigQuery to overcome mutation lag. - -#### Overview - -Repeater has several important behavioral aspects: - -- If a pulled record is not a valid Snowplow event, it will result into a `loader_recovery_error` bad row. -- If a pulled record is a valid event, Repeater will wait some time (5 minutes by default) after the `etl_tstamp` before attempting to re-insert it, in order to let Mutator do its job. -- If the database responds with an error, the row will get transformed into a `loader_recovery_error` bad row. -- All entities in the dead-end bucket are valid Snowplow bad rows. - -#### Mutation lag - -Loader inserts data into BigQuery in near real-time. At the same time, it sinks `shredded_type` payloads into the `typesTopic` approximately every 5 seconds. It also can take up to 10-15 seconds for Mutator to fetch, parse the message and execute an `ALTER TABLE` statement against the table. - -If a new type arrives from `input` subscription in this period of time and Mutator fails to handle it, BigQuery will reject the row containing it and it will be sent to the `failedInserts` topic. This topic contains JSON objects _ready to be loaded into BigQuery_ (ie not canonical Snowplow Enriched event format). - -In order to load this data again from `failedInserts` to BigQuery you can use Repeater or Forwarder (see below). Both read a subscription from `failedInserts` and perform `INSERT` statements. - -### [DEPRECATED] Snowplow BigQuery Forwarder - -This component has been deprecated from version 0.5.0. Please use Repeater instead. The documentation on Forwarder that follows is outdated and no longer maintained. It will be removed in future versions. - -Used for exactly the same purpose as Repeater, but uses Dataflow under the hood, which makes it suitable for very big amounts of data. At the same time, it has several important drawbacks compared with Repeater: - -- User needs to re-launch it manually when failed inserts appear. -- Otherwise, it could be extremely expensive to run a Dataflow job that idles 99.9% of the time (it cannot terminate as it is a streaming/infinite job1). -- There's no way to tell Forwarder that it should take a pause before inserting rows back. Without the pause there's a chance that Mutator doesn't get a chance to alter the table. -- Forwarder keeps retrying all inserts (default behavior for streaming Dataflow jobs), while Repeater has a dead-end GCS bucket. -- In order to debug a problem with Forwarder, operator needs to inspect Stackdriver logs. - -1Forwarder is a very generic and primitive Dataflow job. It could be launched using the standard Dataflow templates. But a standard template job cannot accept a subscription as a source, only a topic. That means the job must be running all the time and most of the time it will be idle. - -### Topics and message formats - -Snowplow BigQuery Loader uses Google Pub/Sub topics and subscriptions to store intermediate data and communicate between applications. - -- `input` subscription -- data enriched by Beam Enrich, in canonical `TSV+JSON format`; -- `typesTopic` -- all shredded types in `iglu:com.snowplowanalytics.snowplow/shredded_type/jsonschema/1-0-0` self-describing payload encountered by Loader are sinked here with ~5 seconds interval; -- `typesSubscription` -- a subscription to `typesTopic` used by Mutator with `iglu:com.snowplowanalytics.snowplow/shredded_type/jsonschema/1-0-0` self-describing payloads; -- `badRows` topic -- data that could not be processed by Loader due to Iglu Registry unavailability, formatted as `bad rows`; -- `failedInserts` topic -- data that has been successfully transformed by Loader, but failed loading to BigQuery usually due to mutation lag, formatted as `BigQuery JSON`. - -## Setup guide - -### Configuration file - -Loader, Mutator and Repeater (Forwarder has been deprecated) accept the same configuration file with [iglu:com.snowplowanalytics.snowplow.storage/bigquery_config/jsonschema/](https://github.com/snowplow/iglu-central/tree/master/schemas/com.snowplowanalytics.snowplow.storage/bigquery_config/jsonschema) schema, which looks like this: - -```json -{ - "schema": "iglu:com.snowplowanalytics.snowplow.storage/bigquery_config/jsonschema/1-0-0", - "data": { - "name": "Alpha BigQuery test", - "id": "31b1559d-d319-4023-aaae-97698238d808", - - "projectId": "com-acme", - "datasetId": "snowplow", - "tableId": "events", - - "input": "enriched-good-sub", - "typesTopic": "bq-test-types", - "typesSubscription": "bq-test-types-sub", - "badRows": "bq-test-bad-rows", - "failedInserts": "bq-test-bad-inserts", - - "load": { - "mode": "STREAMING_INSERTS", - "retry": false - }, - - "purpose": "ENRICHED_EVENTS" - } -} -``` - -- All topics and subscriptions (`input`, `typesTopic`, `typesSubscription`, `badRows` and `failedInserts`) are explained in the [topics and message formats](#topics-and-message-formats) section. -- `projectId` is used to group all resources (topics, subscriptions and BigQuery table). -- `datasetId` and `tableId` (along with `projectId`) specify the target BigQuery table. -- `name` is an arbitrary human-readable description of the storage target. -- `id` is a unique identificator in UUID format. -- `load` specifies the loading mode and is explained in the dedicated section below. -- `purpose` is a standard storage configuration. Its only valid value currently is `ENRICHED_EVENTS`. - -#### Loading mode - -BigQuery supports two loading APIs: - -- [Streaming inserts API](https://cloud.google.com/bigquery/streaming-data-into-bigquery) -- [Load jobs API](https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs) - -You can use the `load` property to configure Loader to use one of them. - -For example, the configuration for using **streaming inserts** can look like this: - -```json -{ - "load": { - "mode": "STREAMING_INSERTS", - "retry": false - } -} -``` - -`retry` specifies if failed inserts (eg due to mutation lag) should be retried infinitely or sent straight to the `failedInserts` topic. If a row cannot be inserted, it will be re-tried indefinitely, which can throttle the whole job. In that case a restart might be required. - -The configuration for using **load jobs** can look like this: - -```json -{ - "load": { - "mode": "FILE_LOADS", - "frequency": 60000 - } -} -``` - -`frequency` specifies how often the load job should be performed, in seconds. Unlike the near-real-time **streaming inserts** API, load jobs are more batch-oriented. - -Load jobs do not support `retry` (and streaming inserts do not support `frequency`). - -It is generally recommended to stick with the **streaming jobs** API without retries and use [Repeater](#snowplow-bigquery-repeater) to recover data from `failedInserts`. However, the **load jobs** API is cheaper and generates fewer duplicates. - -### Command line options - -All four apps accept a path to a config file as specified above, and to an Iglu resolver config. - -#### Loader - -Loader accepts two required arguments, one optional argument, and [any other](https://cloud.google.com/dataflow/pipelines/specifying-exec-params#setting-other-cloud-pipeline-options) supported by Google Cloud Dataflow. - -```bash -$ ./snowplow-bigquery-loader \ - --config=$CONFIG \ - --resolver=$RESOLVER - --labels={"key1":"val1","key2":"val2"} # optional -``` - -The optional `labels` argument is an example of a Dataflow natively supported argument. It accepts a JSON with key-value pairs that will be used as [labels](https://cloud.google.com/compute/docs/labeling-resources) to the Cloud Dataflow job. - -This can be launched from any machine authenticated to submit Dataflow jobs. - -#### Mutator - -Mutator has three subcommands: `listen`, `create` and `add-column`. - -##### `listen` - -`listen` is the primary command and is used to automate table migrations. - -```bash -$ ./snowplow-bigquery-mutator \ - listen - --config $CONFIG \ - --resolver $RESOLVER \ - --verbose # Optional, for debugging only -``` - -##### `add-column` - -`add-column` can be used once to add a column manually. This should eliminate the risk of mutation lag and the necessity to run a Repeater or Forwarder job. - -```bash -$ ./snowplow-bigquery-mutator \ - add-column \ - --config $CONFIG \ - --resolver $RESOLVER \ - --shred-property CONTEXTS \ --schema iglu:com.acme/app_context/jsonschema/1-0-0 -``` - -The specified schema must be present in one of the Iglu registries in the resolver configuration. - -##### `create` - -`create` creates an empty table with `atomic` structure. - -```bash -$ ./snowplow-bigquery-mutator \ - create \ - --config $CONFIG \ - --resolver $RESOLVER -``` - -#### Repeater - -We recommend constantly running Repeater on a small / cheap node or Docker container. - -```bash -$ ./snowplow-bigquery-repeater \ - create \ - --config $CONFIG \ - --resolver $RESOLVER \ - --failedInsertsSub $FAILED_INSERTS_SUB \ - --deadEndBucket $DEAD_END_GCS \ # Must start with gcs:\\ prefix - --desperatesBufferSize 20 \ # Size of the batch to send to the dead-end bucket - --desperatesWindow 20 \ # Window duration after which bad rows will be sunk into the dead-end bucket - --backoffPeriod 900 # Seconds to wait before attempting a re-insert (calculated against etl_tstamp) -``` - -`desperatesBufferSize`, `desperatesWindow` and `backoffPeriod` are optional parameters. - -#### [DEPRECATED] Forwarder - -This component has been deprecated from version 0.5.0. Please use Repeater instead. The documentation on Forwarder that follows is outdated and no longer maintained. It will be removed in future versions. - -Like Loader, Forwarder can be submitted from any machine authenticated to submit Dataflow jobs. - -```bash -$ ./snowplow-bigquery-forwarder \ - --config=$CONFIG \ - --resolver=$RESOLVER - --labels={"key1":"val1","key2":"val2"} # optional - --failedInsertsSub=$FAILED_INSERTS_SUB -``` - -Its only unique option is `failedInsertsSub`, which is a subscription (that must be created _upfront_) to the `failedInserts` topic. - -The `labels` argument works the same as with Loader. - -By convention both Dataflow jobs (Forwarder and Loader) accept CLI options with `=` symbol and camelCase, while Mutator and Repeater accept them in UNIX style (without `=`). - -### Docker support - -All four applications are available as Docker images. - -- `snowplow/snowplow-bigquery-loader:0.5.0` -- `snowplow/snowplow-bigquery-mutator:0.5.0` -- `snowplow/snowplow-bigquery-repeater:0.5.0` -- `snowplow/snowplow-bigquery-forwarder:0.5.0` - -### Partitioning - -During initial setup it is strongly recommended to [setup partitioning](https://cloud.google.com/bigquery/docs/creating-column-partitions) on the `derived_tstamp` property. Mutator's `create` command does not automatically add partitioning yet. diff --git a/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/previous-versions/bigquery-loader-0-6-0/images/bigquery-microservices-architecture.png b/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/previous-versions/bigquery-loader-0-6-0/images/bigquery-microservices-architecture.png deleted file mode 100644 index f7ab3333cb..0000000000 Binary files a/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/previous-versions/bigquery-loader-0-6-0/images/bigquery-microservices-architecture.png and /dev/null differ diff --git a/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/previous-versions/bigquery-loader-0-6-0/index.md b/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/previous-versions/bigquery-loader-0-6-0/index.md deleted file mode 100644 index 1d3eb4869b..0000000000 --- a/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/previous-versions/bigquery-loader-0-6-0/index.md +++ /dev/null @@ -1,315 +0,0 @@ ---- -title: "BigQuery Loader (0.6.x)" -date: "2021-10-06" -sidebar_position: 0 ---- - -## Technical Architecture - -The available tools are: - -1. **Snowplow BigQuery Loader**, an [Apache Beam](https://beam.apache.org/) job that reads Snowplow enriched data from Google Pub/Sub, transforms it into BigQuery-friendly format and loads it. It also writes information about encountered data types into an auxiliary `typesTopic` Pub/Sub topic. -2. **[EXPERIMENTAL] Snowplow BigQuery StreamLoader**, a standalone Scala app that can be deployed as an alternative to the Beam-based BigQuery Loader. It can be the better choice for smaller loads, where Dataflow adds unnecessary overhead. This component still has "experimental" status. -3. **Snowplow BigQuery Mutator**, a Scala app that reads the `typesTopic` (via `typesSubscription`) and performs table mutations to add new columns as required. -4. **Snowplow BigQuery Repeater**, a Scala app that reads `failedInserts` (caused by _mutation lag_) and tries to re-insert them into BigQuery after some delay, sinking failures into a dead-end bucket. -5. **[DEPRECATED] Snowplow BigQuery Forwarder**, an alternative to Repeater implemented as an Apache Beam job. This component has been deprecated from version 0.5.0. Please use Repeater instead. - -![This image has an empty alt attribute; its file name is bigquery-microservices-architecture.png](images/bigquery-microservices-architecture.png) - -### Snowplow BigQuery Loader - -#### Overview - -An Apache Beam job intended to run on [Google Dataflow](https://cloud.google.com/dataflow/) and load enriched data from `enriched` Pub/Sub topic to Google BigQuery. - -#### Algorithm - -- Reads Snowplow enriched events from `input` Pub/Sub subscription. -- Uses the JSON transformer from the [Snowplow Scala Analytics SDK](https://github.com/snowplow/snowplow-scala-analytics-sdk) to convert those enriched events into JSONs. -- Uses [Iglu Client](https://github.com/snowplow/iglu-scala-client/) to fetch JSON schemas for self-describing events and entities. -- Uses [Iglu Schema DDL](https://github.com/snowplow/iglu/tree/master/0-common/schema-ddl) to transform self-describing events and entities into BigQuery format. -- Writes transformed data into BigQuery. -- Writes all encountered Iglu types into a `typesTopic`. -- Writes all data failed to be processed into a `badRows` topic. -- Writes data that succeeded to be transformed, but failed to be loaded into a `failedInserts` topic. - -### Snowplow BigQuery StreamLoader - -#### Overview - -An alternative to Loader, which does not require Google Dataflow. - -#### Algorithm - -It has the same algorithm as Loader: - -- Reads Snowplow enriched events from `input` Pub/Sub subscription. -- Uses the JSON transformer from the [Snowplow Scala Analytics SDK](https://github.com/snowplow/snowplow-scala-analytics-sdk) to convert those enriched events into JSONs. -- Uses [Iglu Client](https://github.com/snowplow/iglu-scala-client/) to fetch JSON schemas for self-describing events and entities. -- Uses [Iglu Schema DDL](https://github.com/snowplow/iglu/tree/master/0-common/schema-ddl) to transform self-describing events and entities into BigQuery format. -- Writes transformed data into BigQuery. -- Writes all encountered Iglu types into a `typesTopic`. -- Writes all data failed to be processed into a `badRows` topic. -- Writes data that succeeded to be transformed, but failed to be loaded into a `failedInserts` topic. - -### Snowplow BigQuery Mutator - -#### Overview - -This is a Scala app that reads data from the `typesTopic` via a `typesSubscription` and performs table mutations. - -#### Algorithm - -- Reads messages from `typesSubscription`. -- Finds out if a message contains a type that has not been encountered yet (by checking internal cache). -- If a message contains a new type, double-checks it with the connected BigQuery table. -- If the type is not in the table, fetches its JSON schema from Iglu Registry. -- Transforms JSON schema into BigQuery column definition. -- Adds the column to the connected BigQuery table. - -### Snowplow BigQuery Repeater - -A JVM application that reads a `failedInserts` subscription and tries to re-insert them into BigQuery to overcome mutation lag. - -#### Overview - -Repeater has several important behavioral aspects: - -- If a pulled record is not a valid Snowplow event, it will result into a `loader_recovery_error` bad row. -- If a pulled record is a valid event, Repeater will wait some time (5 minutes by default) after the `etl_tstamp` before attempting to re-insert it, in order to let Mutator do its job. -- If the database responds with an error, the row will get transformed into a `loader_recovery_error` bad row. -- All entities in the dead-end bucket are valid Snowplow bad rows. - -#### Mutation lag - -Loader inserts data into BigQuery in near real-time. At the same time, it sinks `shredded_type` payloads into the `typesTopic` approximately every 5 seconds. It also can take up to 10-15 seconds for Mutator to fetch, parse the message and execute an `ALTER TABLE` statement against the table. - -If a new type arrives from `input` subscription in this period of time and Mutator fails to handle it, BigQuery will reject the row containing it and it will be sent to the `failedInserts` topic. This topic contains JSON objects _ready to be loaded into BigQuery_ (ie not canonical Snowplow Enriched event format). - -In order to load this data again from `failedInserts` to BigQuery you can use Repeater or Forwarder (see below). Both read a subscription from `failedInserts` and perform `INSERT` statements. - -### [DEPRECATED] Snowplow BigQuery Forwarder - -This component has been deprecated from version 0.5.0. Please use Repeater instead. The documentation on Forwarder that follows is outdated and no longer maintained. It will be removed in future versions. - -Used for exactly the same purpose as Repeater, but uses Dataflow under the hood, which makes it suitable for very big amounts of data. At the same time, it has several important drawbacks compared with Repeater: - -- User needs to re-launch it manually when failed inserts appear. -- Otherwise, it could be extremely expensive to run a Dataflow job that idles 99.9% of the time (it cannot terminate as it is a streaming/infinite job1). -- There's no way to tell Forwarder that it should take a pause before inserting rows back. Without the pause there's a chance that Mutator doesn't get a chance to alter the table. -- Forwarder keeps retrying all inserts (default behavior for streaming Dataflow jobs), while Repeater has a dead-end GCS bucket. -- In order to debug a problem with Forwarder, operator needs to inspect Stackdriver logs. - -1Forwarder is a very generic and primitive Dataflow job. It could be launched using the standard Dataflow templates. But a standard template job cannot accept a subscription as a source, only a topic. That means the job must be running all the time and most of the time it will be idle. - -### Topics and message formats - -Snowplow BigQuery Loader uses Google Pub/Sub topics and subscriptions to store intermediate data and communicate between applications. - -- `input` subscription -- data enriched by Beam Enrich, in canonical `TSV+JSON format`; -- `typesTopic` -- all shredded types in `iglu:com.snowplowanalytics.snowplow/shredded_type/jsonschema/1-0-0` self-describing payload encountered by Loader are sinked here with ~5 seconds interval; -- `typesSubscription` -- a subscription to `typesTopic` used by Mutator with `iglu:com.snowplowanalytics.snowplow/shredded_type/jsonschema/1-0-0` self-describing payloads; -- `badRows` topic -- data that could not be processed by Loader due to Iglu Registry unavailability, formatted as `bad rows`; -- `failedInserts` topic -- data that has been successfully transformed by Loader, but failed loading to BigQuery usually due to mutation lag, formatted as `BigQuery JSON`. - -## Setup guide - -### Configuration file - -Loader / StreamLoader, Mutator and Repeater (Forwarder has been deprecated) accept the same configuration file with [iglu:com.snowplowanalytics.snowplow.storage/bigquery_config/jsonschema/](https://github.com/snowplow/iglu-central/tree/master/schemas/com.snowplowanalytics.snowplow.storage/bigquery_config/jsonschema) schema, which looks like this: - -```json -{ - "schema": "iglu:com.snowplowanalytics.snowplow.storage/bigquery_config/jsonschema/1-0-0", - "data": { - "name": "Alpha BigQuery test", - "id": "31b1559d-d319-4023-aaae-97698238d808", - "projectId": "com-acme", - "datasetId": "snowplow", - "tableId": "events", - "input": "enriched-good-sub", - "typesTopic": "bq-test-types", - "typesSubscription": "bq-test-types-sub", - "badRows": "bq-test-bad-rows", - "failedInserts": "bq-test-bad-inserts", - "load": { - "mode": "STREAMING_INSERTS", - "retry": false - }, - "purpose": "ENRICHED_EVENTS" - } -} -``` - -- All topics and subscriptions (`input`, `typesTopic`, `typesSubscription`, `badRows` and `failedInserts`) are explained in the [topics and message formats](#topics-and-message-formats) section. -- `projectId` is used to group all resources (topics, subscriptions and BigQuery table). -- `datasetId` and `tableId` (along with `projectId`) specify the target BigQuery table. -- `name` is an arbitrary human-readable description of the storage target. -- `id` is a unique identificator in UUID format. -- `load` specifies the loading mode and is explained in the dedicated section below. -- `purpose` is a standard storage configuration. Its only valid value currently is `ENRICHED_EVENTS`. - -#### Loading mode - -BigQuery supports two loading APIs: - -- [Streaming inserts API](https://cloud.google.com/bigquery/streaming-data-into-bigquery) -- [Load jobs API](https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs) - -You can use the `load` property to configure Loader to use one of them. - -For example, the configuration for using **streaming inserts** can look like this: - -```json -{ - "load": { - "mode": "STREAMING_INSERTS", - "retry": false - } -} -``` - -`retry` specifies if failed inserts (eg due to mutation lag) should be retried infinitely or sent straight to the `failedInserts` topic. If a row cannot be inserted, it will be re-tried indefinitely, which can throttle the whole job. In that case a restart might be required. - -The configuration for using **load jobs** can look like this: - -```json -{ - "load": { - "mode": "FILE_LOADS", - "frequency": 60000 - } -} -``` - -`frequency` specifies how often the load job should be performed, in seconds. Unlike the near-real-time **streaming inserts** API, load jobs are more batch-oriented. - -Load jobs do not support `retry` (and streaming inserts do not support `frequency`). - -It is generally recommended to stick with the **streaming jobs** API without retries and use [Repeater](#snowplow-bigquery-repeater) to recover data from `failedInserts`. However, the **load jobs** API is cheaper and generates fewer duplicates. - -### Command line options - -All apps accept a path to a config file as specified above, and to an Iglu resolver config. - -#### Loader - -Loader accepts two required arguments and [any other](https://cloud.google.com/dataflow/pipelines/specifying-exec-params#setting-other-cloud-pipeline-options) supported by Google Cloud Dataflow. - -```bash -$ ./snowplow-bigquery-loader \ - --config=$CONFIG \ - --resolver=$RESOLVER \ - --labels={"key1":"val1","key2":"val2"} # optional -``` - -The optional `labels` argument is an example of a Dataflow natively supported argument. It accepts a JSON with key-value pairs that will be used as [labels](https://cloud.google.com/compute/docs/labeling-resources) to the Cloud Dataflow job. - -This can be launched from any machine authenticated to submit Dataflow jobs. - -#### StreamLoader - -StreamLoader accepts the same two required arguments as Loader. - -```bash -$ ./snowplow-bigquery-streamloader \ - --config=$CONFIG \ - --resolver=$RESOLVER -``` - -StreamLoader is not a Dataflow job, so it doesn't support any of the optional parameters that Loader does. - -#### Mutator - -Mutator has three subcommands: `listen`, `create` and `add-column`. - -##### `listen` - -`listen` is the primary command and is used to automate table migrations. - -```bash -$ ./snowplow-bigquery-mutator \ - listen \ - --config $CONFIG \ - --resolver $RESOLVER \ - --verbose # Optional, for debugging only -``` - -##### `add-column` - -`add-column` can be used once to add a column manually. This should eliminate the risk of mutation lag and the necessity to run a Repeater or Forwarder job. - -```bash -$ ./snowplow-bigquery-mutator \ - add-column \ - --config $CONFIG \ - --resolver $RESOLVER \ - --shred-property CONTEXTS \ - --schema iglu:com.acme/app_context/jsonschema/1-0-0 -``` - -The specified schema must be present in one of the Iglu registries in the resolver configuration. - -##### `create` - -`create` creates an empty table with `atomic` structure. - -```bash -$ ./snowplow-bigquery-mutator \ - create \ - --config $CONFIG \ - --resolver $RESOLVER -``` - -#### Repeater - -We recommend constantly running Repeater on a small / cheap node or Docker container. - -```bash -$ ./snowplow-bigquery-repeater \ - create \ - --config $CONFIG \ - --resolver $RESOLVER \ - --failedInsertsSub $FAILED_INSERTS_SUB \ - --deadEndBucket $DEAD_END_GCS \ # Must start with gcs:\\ prefix - --desperatesBufferSize 20 \ # Size of the batch to send to the dead-end bucket - --desperatesWindow 20 \ # Window duration after which bad rows will be sunk into the dead-end bucket - --backoffPeriod 900 # Seconds to wait before attempting a re-insert (calculated against etl_tstamp) -``` - -`desperatesBufferSize`, `desperatesWindow` and `backoffPeriod` are optional parameters. - -#### [DEPRECATED] Forwarder - -This component has been deprecated from version 0.5.0. Please use Repeater instead. The documentation on Forwarder that follows is outdated and no longer maintained. It will be removed in future versions. - -Like Loader, Forwarder can be submitted from any machine authenticated to submit Dataflow jobs. - -```bash -$ ./snowplow-bigquery-forwarder \ - --config=$CONFIG \ - --resolver=$RESOLVER \ - --labels={"key1":"val1","key2":"val2"} # optional - --failedInsertsSub=$FAILED_INSERTS_SUB -``` - -Its only unique option is `failedInsertsSub`, which is a subscription (that must be created _upfront_) to the `failedInserts` topic. - -The `labels` argument works the same as with Loader. - -By convention both Dataflow jobs (Forwarder and Loader) accept CLI options with `=` symbol and camelCase, while Mutator and Repeater accept them in UNIX style (without `=`). - -### Docker support - -All applications are available as Docker images on Docker Hub under these names: - -- `snowplow/snowplow-bigquery-loader:0.6.`2 -- `snowplow/snowplow-bigquery-streamloader:0.6.`2 -- `snowplow/snowplow-bigquery-mutator:0.6.`2 -- `snowplow/snowplow-bigquery-repeater:0.6.`2 - -Mutator, Repeater and Streamloader are also available as fatjar files attached to [releases](https://github.com/snowplow-incubator/snowplow-bigquery-loader/releases) in the project's Github repository. - -### Partitioning - -During initial setup it is strongly recommended to [setup partitioning](https://cloud.google.com/bigquery/docs/creating-column-partitions) on the `derived_tstamp` property. Mutator's `create` command does not automatically add partitioning yet. diff --git a/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/snowplow-bigquery-loader-configuration-reference/index.md b/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/snowplow-bigquery-loader-configuration-reference/index.md index 0819cf916c..54287daf6a 100644 --- a/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/snowplow-bigquery-loader-configuration-reference/index.md +++ b/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/snowplow-bigquery-loader-configuration-reference/index.md @@ -14,7 +14,7 @@ This is a complete list of the options that can be configured in the Snowplow Bi | `loader.input.subscription` | Required. Enriched events subscription consumed by Loader and StreamLoader, eg enriched-sub. | | `loader.output.good.datasetId` | Required. Specify the dataset to which the events table belongs, eg snowplow. | | `loader.output.good.tableId` | Required. The name of the events table, eg events. | -| `loader.output.bad.topic` | Required. The name of the topic where bad rows will be written, eg bad-topic. | +| `loader.output.bad.topic` | Required. The name of the topic where failed events will be written, eg bad-topic. | | `loader.output.types.topic` | Required. The name of the topic where observed types will be written, eg types-topic. | | `loader.output.failedInserts.topic` | Required. The name of the topic where failed inserts will be written, eg failed-inserts-topic. | | `mutator.input.subscription` | Required. A subscription on the loader.output.types.topic, eg types-sub. | diff --git a/docs/pipeline-components-and-applications/loaders-storage-targets/snowplow-postgres-loader/postgres-loader-configuration-reference/index.md b/docs/pipeline-components-and-applications/loaders-storage-targets/snowplow-postgres-loader/postgres-loader-configuration-reference/index.md index b25ba8798f..1be2e56e33 100644 --- a/docs/pipeline-components-and-applications/loaders-storage-targets/snowplow-postgres-loader/postgres-loader-configuration-reference/index.md +++ b/docs/pipeline-components-and-applications/loaders-storage-targets/snowplow-postgres-loader/postgres-loader-configuration-reference/index.md @@ -6,7 +6,7 @@ sidebar_position: 0 This is a complete list of the options that can be configured in the postgres loader's HOCON config file. The [example configs in github](https://github.com/snowplow-incubator/snowplow-postgres-loader/tree/master/config) show how to prepare an input file. -
input.typeRequired. Can be "Kinesis", "PubSub" or "Local". Configures where input events will be read from.
input.streamNameRequired when input.type is Kinesis. Name of the Kinesis stream to read from.
input.regionRequired when input.type is Kinesis. AWS region in which the Kinesis stream resides.
input.initialPositionOptional. Used when input.type is Kinesis. Use "TRIM_HORIZON" (the default) to start streaming at the last untrimmed record in the shard, which is the oldest data record in the shard. Or use "LATEST" to start streaming just after the most recent record in the shard.
input.retrievalMode.typeOptional. When input.type is Kinesis, this sets the polling mode for retrieving records. Can be "FanOut" (the default) or "Polling".
input.retrievalMode.maxRecordsOptional. Used when input.retrievalMode.type is "Polling". Configures how many records are fetched in each poll of the kinesis stream. Default 10000.
input.projectIdRequired when input.type is PubSub. The name of your GCP project.
input.subscriptionIdRequired when input.type is PubSub. Id of the PubSub subscription to read events from
input.pathRequired when input.type is Local. Path for event source. It can be directory or file. If it is directory, all the files under given directory will be read recursively. Also, given path can be both absolute path or relative path w.r.t. executable.
output.good.hostRequired. Hostname of the postgres database.
output.good.portOptional. Port number of the postgres database. Default 5432.
output.good.databaseRequired. Name of the postgres database.
output.good.usernameRequired. Postgres role name to use when connecting to the database
output.good.passwordRequired. Password for the postgres user.
output.good.schemaRequired. The Postgres schema in which to create tables and write events.
output.good.sslModeOptional. Configures how the client and server agree on ssl protection. Default "REQUIRE"
output.bad.typeOptional. Can be "Kinesis", "PubSub", "Local" or "Noop". Configures where bad rows will be sent. Default is "Noop" which means bad rows will be discarded
output.bad.streamNameRequired when bad.type is Kinesis. Name of the Kinesis stream to write to.
output.bad.regionRequired when bad.type is Kinesis. AWS region in which the Kinesis stream resides.
output.bad.projectIdRequired when bad.type is PubSub. The name of your GCP project.
output.bad.topicIdRequired when bad.type is PubSub. Id of the PubSub topic to write bad rows to
output.bad.pathRequired when bad.type is Local. Path of the file to write bad rows
purposeOptional. Set this to "ENRICHED_EVENTS" (the default) when reading the stream of enriched events in tsv format. Set this to "JSON" when reading a stream of self-describing json, e.g. snowplow bad rows.
monitoring.metrics.cloudWatchOptional boolean, with default true. For kinesis input, this is used to disable sending metrics to cloudwatch.
+
input.typeRequired. Can be "Kinesis", "PubSub" or "Local". Configures where input events will be read from.
input.streamNameRequired when input.type is Kinesis. Name of the Kinesis stream to read from.
input.regionRequired when input.type is Kinesis. AWS region in which the Kinesis stream resides.
input.initialPositionOptional. Used when input.type is Kinesis. Use "TRIM_HORIZON" (the default) to start streaming at the last untrimmed record in the shard, which is the oldest data record in the shard. Or use "LATEST" to start streaming just after the most recent record in the shard.
input.retrievalMode.typeOptional. When input.type is Kinesis, this sets the polling mode for retrieving records. Can be "FanOut" (the default) or "Polling".
input.retrievalMode.maxRecordsOptional. Used when input.retrievalMode.type is "Polling". Configures how many records are fetched in each poll of the kinesis stream. Default 10000.
input.projectIdRequired when input.type is PubSub. The name of your GCP project.
input.subscriptionIdRequired when input.type is PubSub. Id of the PubSub subscription to read events from
input.pathRequired when input.type is Local. Path for event source. It can be directory or file. If it is directory, all the files under given directory will be read recursively. Also, given path can be both absolute path or relative path w.r.t. executable.
output.good.hostRequired. Hostname of the postgres database.
output.good.portOptional. Port number of the postgres database. Default 5432.
output.good.databaseRequired. Name of the postgres database.
output.good.usernameRequired. Postgres role name to use when connecting to the database
output.good.passwordRequired. Password for the postgres user.
output.good.schemaRequired. The Postgres schema in which to create tables and write events.
output.good.sslModeOptional. Configures how the client and server agree on ssl protection. Default "REQUIRE"
output.bad.typeOptional. Can be "Kinesis", "PubSub", "Local" or "Noop". Configures where failed events will be sent. Default is "Noop" which means failed events will be discarded
output.bad.streamNameRequired when bad.type is Kinesis. Name of the Kinesis stream to write to.
output.bad.regionRequired when bad.type is Kinesis. AWS region in which the Kinesis stream resides.
output.bad.projectIdRequired when bad.type is PubSub. The name of your GCP project.
output.bad.topicIdRequired when bad.type is PubSub. Id of the PubSub topic to write failed events to
output.bad.pathRequired when bad.type is Local. Path of the file to write failed events
purposeOptional. Set this to "ENRICHED_EVENTS" (the default) when reading the stream of enriched events in tsv format. Set this to "JSON" when reading a stream of self-describing json, e.g. snowplow [bad rows](https://github.com/snowplow/iglu-central/tree/master/schemas/com.snowplowanalytics.snowplow.badrows).
monitoring.metrics.cloudWatchOptional boolean, with default true. For kinesis input, this is used to disable sending metrics to cloudwatch.
#### Advanced options diff --git a/docs/pipeline-components-and-applications/loaders-storage-targets/snowplow-rdb-loader/transforming-enriched-data/reusable/aws-only/_index.mdx b/docs/pipeline-components-and-applications/loaders-storage-targets/snowplow-rdb-loader/transforming-enriched-data/reusable/aws-only/_index.mdx index 04f8a4c4ac..d44c0057ca 100644 --- a/docs/pipeline-components-and-applications/loaders-storage-targets/snowplow-rdb-loader/transforming-enriched-data/reusable/aws-only/_index.mdx +++ b/docs/pipeline-components-and-applications/loaders-storage-targets/snowplow-rdb-loader/transforming-enriched-data/reusable/aws-only/_index.mdx @@ -16,7 +16,7 @@ output.bad.type (since 5.4.0) - Optional. Either kinesis or file, default value file. Type of bad output sink. When file, badrows are written as files under URI configured in output.path. + Optional. Either kinesis or file, default value file. Type of bad output sink. When file, failed events are written as files under URI configured in output.path. output.bad.streamName (since 5.4.0) diff --git a/docs/pipeline-components-and-applications/loaders-storage-targets/snowplow-rdb-loader/transforming-enriched-data/reusable/transformer-kafka/_index.mdx b/docs/pipeline-components-and-applications/loaders-storage-targets/snowplow-rdb-loader/transforming-enriched-data/reusable/transformer-kafka/_index.mdx index bdf4074312..d1661e1562 100644 --- a/docs/pipeline-components-and-applications/loaders-storage-targets/snowplow-rdb-loader/transforming-enriched-data/reusable/transformer-kafka/_index.mdx +++ b/docs/pipeline-components-and-applications/loaders-storage-targets/snowplow-rdb-loader/transforming-enriched-data/reusable/transformer-kafka/_index.mdx @@ -20,7 +20,7 @@ output.bad.type - Optional. Either kafka or file, default value file. Type of bad output sink. When file, badrows are written as files under URI configured in output.path. + Optional. Either kafka or file, default value file. Type of bad output sink. When file, failed events are written as files under URI configured in output.path. output.bad.topicName diff --git a/docs/pipeline-components-and-applications/loaders-storage-targets/snowplow-rdb-loader/transforming-enriched-data/reusable/transformer-pubsub/_index.mdx b/docs/pipeline-components-and-applications/loaders-storage-targets/snowplow-rdb-loader/transforming-enriched-data/reusable/transformer-pubsub/_index.mdx index 571f941bee..68508e9536 100644 --- a/docs/pipeline-components-and-applications/loaders-storage-targets/snowplow-rdb-loader/transforming-enriched-data/reusable/transformer-pubsub/_index.mdx +++ b/docs/pipeline-components-and-applications/loaders-storage-targets/snowplow-rdb-loader/transforming-enriched-data/reusable/transformer-pubsub/_index.mdx @@ -32,7 +32,7 @@ output.bad.type (since 5.4.0) - Optional. Either pubsub or file, default value file. Type of bad output sink. When file, badrows are written as files under URI configured in output.path. + Optional. Either pubsub or file, default value file. Type of bad output sink. When file, failed events are written as files under URI configured in output.path. output.bad.topic (since 5.4.0) diff --git a/docs/pipeline-components-and-applications/loaders-storage-targets/snowplow-rdb-loader/upgrade-guides/r32-upgrade-guide/index.md b/docs/pipeline-components-and-applications/loaders-storage-targets/snowplow-rdb-loader/upgrade-guides/r32-upgrade-guide/index.md index 6d5487d833..9664c89413 100644 --- a/docs/pipeline-components-and-applications/loaders-storage-targets/snowplow-rdb-loader/upgrade-guides/r32-upgrade-guide/index.md +++ b/docs/pipeline-components-and-applications/loaders-storage-targets/snowplow-rdb-loader/upgrade-guides/r32-upgrade-guide/index.md @@ -4,7 +4,7 @@ date: "2020-03-06" sidebar_position: 700 --- -We recommend to go through the upgrade routine in several independent steps. After every step you should have a working pipeline. If something is not working or Shredder produces unexpected bad rows - please let us know. +We recommend to go through the upgrade routine in several independent steps. After every step you should have a working pipeline. If something is not working or Shredder produces unexpected failed events - please let us know. ## Updating assets @@ -17,14 +17,14 @@ We recommend to go through the upgrade routine in several independent steps. Aft ```yaml aws: emr: - ami_version: 5.19.0 # was 5.9.0; Required by RDB Shredder + ami_version: 5.19.0 # was 5.9.0; Required by RDB Shredder storage: versions: rdb_loader: 0.17.0 # was 0.16.0 rdb_shredder: 0.16.0 # was 0.15.0 ``` -At this point, your pipeline should be running with new assets as it was before, without automigrations and generated TSV. We recommend to test this setup and monitor shredded bad rows for one or two runs before proceeding to enabling automigrations. +At this point, your pipeline should be running with new assets as it was before, without automigrations and generated TSV. We recommend to test this setup and monitor shredded failed events for one or two runs before proceeding to enabling automigrations. ## Iglu Server @@ -44,7 +44,7 @@ After setting up the Iglu Server, don't forget to add it to your resolver config New RDB Shredder is still able to produce legacy JSON files. But automigrations can be applied only to tables where data is prepared as TSV. If you setup a new pipeline, you can generate only TSVs abandoning legacy DDLs (except `atomic.events` and `atomic.manifest`) and [JSONPaths](https://discourse.snowplow.io/t/jsonpaths-files-demystified/269) altogether. However, if you already have tables deployed which DDLs were generated manually or via old igluctl you will likely need to apply so called _tabular blacklisting_ to these tables. It means that Shredder will keep producing data with these schemas as JSONs and Loader won't be able to apply migrations to it. This is necessary because manually generated DDLs are not guaranteed to have predictable column order and the only way to map JSON values to respective columns is JSONPaths files. -[igluctl 0.7.0](https://discourse.snowplow.io/t/igluctl-0-7-0-released/3620) provides `rdbms table-check` subcommand that get schemas from Iglu Server, figures out what DDL the Loader would generate, then connects to Redshift and compares those DDLs with actual state of the table. +[igluctl 0.7.0](https://discourse.snowplow.io/t/igluctl-0-7-0-released/3620) provides `rdbms table-check` subcommand that get schemas from Iglu Server, figures out what DDL the Loader would generate, then connects to Redshift and compares those DDLs with actual state of the table. Every table that have an incompatible order will have to be "blacklisted" in Redshift storage target config (`redshift_config.json`). Here's an example of a black list containing several schemas from Iglu Central: diff --git a/docs/pipeline-components-and-applications/stream-collector/configure/index.md b/docs/pipeline-components-and-applications/stream-collector/configure/index.md index 66d6541162..b4c0c350d9 100644 --- a/docs/pipeline-components-and-applications/stream-collector/configure/index.md +++ b/docs/pipeline-components-and-applications/stream-collector/configure/index.md @@ -81,8 +81,8 @@ collector { | `collector.streams.bad.sqsBadBuffer` | Optional. Like the `sqsGoodBuffer` but for failed events. | | `collector.streams.{good,bad}.aws.accessKey` | Required. Set to `default` to use the default provider chain; set to `iam` to use AWS IAM roles; or set to `env` to use `AWS_ACCESS_KEY_ID` environment variable. | | `collector.streams.{good,bad}.aws.secretKey` | Required. Set to `default` to use the default provider chain; set to `iam` to use AWS IAM roles; or set to `env` to use `AWS_SECRET_ACCESS_KEY` environment variable. | -| `collector.streams.{good,bad}.maxBytes` (since *2.9.0*) | Optional. Default: `1000000` (1 MB). Maximum number of bytes that a single record can contain. If a record is bigger, a size violation bad row is emitted instead. If SQS buffer is activated, `sqsMaxBytes` is used instead. | -| `collector.streams.{good,bad}.sqsMaxBytes` | Optional. Default: `192000` (192 kb). Maximum number of bytes that a single record can contain. If a record is bigger, a size violation bad row is emitted instead. SQS has a record size limit of 256 kb, but records are encoded with Base64, which adds approximately 33% of the size, so we set the limit to `256 kb * 3/4`. | +| `collector.streams.{good,bad}.maxBytes` (since *2.9.0*) | Optional. Default: `1000000` (1 MB). Maximum number of bytes that a single record can contain. If a record is bigger, a size violation failed event is emitted instead. If SQS buffer is activated, `sqsMaxBytes` is used instead. | +| `collector.streams.{good,bad}.sqsMaxBytes` | Optional. Default: `192000` (192 kb). Maximum number of bytes that a single record can contain. If a record is bigger, a size violation failed event is emitted instead. SQS has a record size limit of 256 kb, but records are encoded with Base64, which adds approximately 33% of the size, so we set the limit to `256 kb * 3/4`. | | `collector.streams.{good,bad}.startupCheckInterval` (since *2.9.0*) | Optional. Default: `1 second`. When collector starts, it checks if Kinesis streams exist with `describeStreamSummary` and if SQS buffers exist with `getQueueUrl` (if configured). This is the interval for the calls. `/sink-health` is made healthy as soon as requests are successful or records are successfully inserted. | | `collector.streams.backoffPolicy.minBackoff` | Optional. Default: `3000`. Time (in milliseconds) for retrying sending to kinesis / SQS after failure. | | `collector.streams.backoffPolicy.maxBackoff` | Optional. Default: `600000`. Time (in milliseconds) for retrying sending to kinesis / SQS after failure. | @@ -101,7 +101,7 @@ collector { | `collector.streams.{good,bad}.threadPoolSize` | Optional. Default: `10`. Thread pool size used by the collector sink for asynchronous operations. | | `collector.streams.{good,bad}.aws.accessKey` | Required. Set to `default` to use the default provider chain; set to `iam` to use AWS IAM roles; or set to `env` to use `AWS_ACCESS_KEY_ID` environment variable. | | `collector.streams.{good,bad}.aws.secretKey` | Required. Set to `default` to use the default provider chain; set to `iam` to use AWS IAM roles; or set to `env` to use `AWS_SECRET_ACCESS_KEY` environment variable. | -| `collector.streams.{good,bad}.maxBytes` (since *2.9.0*) | Optional. Default: `192000` (192 kb). Maximum number of bytes that a single record can contain. If a record is bigger, a size violation bad row is emitted instead. SQS has a record size limit of 256 kb, but records are encoded with Base64, which adds approximately 33% of the size, so we set the limit to `256 kb * 3/4`. | +| `collector.streams.{good,bad}.maxBytes` (since *2.9.0*) | Optional. Default: `192000` (192 kb). Maximum number of bytes that a single record can contain. If a record is bigger, a size violation failed event is emitted instead. SQS has a record size limit of 256 kb, but records are encoded with Base64, which adds approximately 33% of the size, so we set the limit to `256 kb * 3/4`. | | `collector.streams.{good,bad}.startupCheckInterval` (since *2.9.0*) | Optional. Default: `1 second`. When collector starts, it checks if SQS buffers exist with `getQueueUrl`. This is the interval for the calls. `/sink-health` is made healthy as soon as requests are successful or records are successfully inserted. | | `collector.streams.backoffPolicy.minBackoff` | Optional. Default: `3000`. Time (in milliseconds) for retrying sending to SQS after failure. | | `collector.streams.backoffPolicy.maxBackoff` | Optional. Default: `600000`. Time (in milliseconds) for retrying sending to SQS after failure. | @@ -123,7 +123,7 @@ collector { | `collector.streams.sink.{good,bad}.backoffPolicy.initialRpcTimeout` (since *2.5.0*) | Optional. Default: `10000`. Time (in milliseconds) before a RPC call to Pubsub is aborted and retried. | | `collector.streams.sink.{good,bad}.backoffPolicy.maxRpcTimeout` (since *2.5.0*) | Optional. Default: `10000`. Maximum time (in milliseconds) before RPC call to Pubsub is aborted and retried. | | `collector.streams.sink.{good,bad}.backoffPolicy.rpcTimeoutMultipler` (since *2.5.0*) | Optional. Default: `2`. How RPC timeouts increase as they are retried. | -| `collector.streams.sink.{good,bad}..maxBytes` (since *2.9.0*) | Optional. Default: `10000000` (10 MB). Maximum number of bytes that a single record can contain. If a record is bigger, a size violation bad row is emitted instead. | +| `collector.streams.sink.{good,bad}..maxBytes` (since *2.9.0*) | Optional. Default: `10000000` (10 MB). Maximum number of bytes that a single record can contain. If a record is bigger, a size violation failed event is emitted instead. | | `collector.streams.sink.{good,bad}.startupCheckInterval` (since *2.9.0*) | Optional. Default: `1 second`. When collector starts, it checks if PubSub topics exist with `listTopics`. This is the interval for the calls. `/sink-health` is made healthy as soon as requests are successful or records are successfully inserted. | | `collector.streams.sink.{good,bad}.retryInterval` (since *2.9.0*) | Optional. Default: `10 seconds`. Collector uses built-in retry mechanism of PubSub API. In case of failure of these retries, the events are added to a buffer and every `retryInterval` collector retries to send them. | | `collector.streams.{good,bad}.buffer.byteLimit` | Optional. Default: `1000000`. Incoming events are stored in an internal buffer before being sent to Pubsub. This configures the maximum total size of pending events |