-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Alternative implementation of a pubsub source #85
Closed
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
In #62 we added a feature so that bad rows are re-sized if their serialized length exceeds the maximum size allowed by the sink. This fixes a bug which meant the resizing was not working properly.
The health probe now reports unhealthy under these extra scenarios: - Bad row sink is unhealthy – cannot write to the sink - Fatal error happens when trying to write events to the lake
…mns (#66) Some query engines dislike Iceberg tables in which a STRUCT field is nullable and a nested field of the STRUCT is non-nullable. For example, in Snowflake we have seen errors like "SQL execution internal error" when the nested field contains a null. This PR adds a config option `respectIgluNullability`. When set to `false`, the Lake Loader will make all nested fields nullable. This is slightly less ideal for data storage, but it brings back compatibility with query engines like Snowflake. The default value of the new config option is `true` which maintains the behaviour of previous versions of the loader.
As part of this change we also bump spark to 3.5.x when using Hudi. And bump scala to 2.13.x. Previously we were pinned to earlier versions because of compatibility with Hudi 0.14.0. This PR is implemented in a way that we retain the flexibility of easily supporting a different version of Spark for the Hudi docker image. I anticipate we might need this flexibility if Iceberg/Delta are faster to add support for Spark 4.x.
Although we remain on Hudi 0.15.0, this bumps the AWS module of Hudi to version 1.x. This is safe because the AWS module is backwards compatible to 0.15.0. This change lets us use features only in 1.x like assumed role credentials provider.
Two features has been added in this commit: alerting and retrying For alerting, webhook method is used similar to other Snowplow apps. Alert message is sent to URL given in the config. Alerts are sent for some error cases, not for all of them. It is implemented such that it is sent only for setup errors. The error cases where alert sent can be extended in the future, of course. For retrying, two retry policies can be defined similar to Snowflake Loader. One of them is for setup errors and other one is for transient errors. Alert would be sent only for setup errors, not for transient errors. Also, possible setup error cases for Iceberg/Glue/S3 are added in this commit as well. Error cases for other destinations/table formats will be added later.
…72) It is possible to run the Hudi Lake Loader enabling the hudi option `"hoodie.datasource.hive_sync.enable": "true"` to register/sync the table to a Hive Metastore or Glue. However, with that setting enabled, the Hudi delays syncing until the first time events are committed. For use case, it is more helpful if the loader connects to Glue/Hive during startup, so we more quickly get an alert if the loader is missing permissions. This PR works my making the loader add an empty commit during startup. It does not add any parquet file, but it triggers the loader to sync the table to Glue/Hive.
The webhook alert should contain a short helpful message explaining why an error is caused by the destination setup. In other snowplow loaders we get the message simply by serializing the Exception. But in Lake Loader I found the exception messages to be very messy. In a related problem, for Hudi setup errors I needed to traverse the Exception's `getCause` in order to check if it was a setup error. This PR takes more explicit control of setting short friendly error messages, and traversing the `getCause` to get all relevant messages. E.g. an alert message before this change: > Failed to create events table: s3a://<REDACTED/events/_delta_log: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by V1ToV2AwsCredentialProviderAdapter : software.amazon.awssdk.services.sts.model.StsException: User: arn:aws:iam::<REDACTED>:user/<REDACTED> is not authorized to perform: sts:AssumeRole on resource: arn:aws:iam::<REDACTED>:role/<REDACTED> (Service: Sts, Status Code: 403, Request ID: 00000000-0000-0000-0000-000000000000) The corresponding alert after this change: > Failed to create events table: s3a://<REDACTED/events/_delta_log: Failed to initialize AWS access credentials: Missing permissions to assume the AWS IAM role **Other small changes I snuck into this commit:** - Added specific webhook alerts for Hudi. - Removed the AssumedRoleCredentialsProvider for aws sdk v1. This is no longer needed now that Hadoop is fully using aws sdk v2. - Fixed minor bug with retrying creating a database in Hudi Writer
The loader has a feature where it sends alerts to a webhook if the destination is mis-configured. However, with Iceberg it was possible for the initialization to be successful, and yet the loader might still fail later while committing events, e.g. due to a permissions error updating the Glue catalog. Here we add an extra step so the loader is forced to make an empty commit early during initialization. If there is an error committing, then the loader sends a webhook alert and never becomes healthy.
For KCL apps, we want the KCL to get initialized as early as possible, so the worker can claim shard leases before they get stolen by other workers. Before this PR, the loader initialized the destination table first, and then subscribed to the stream afterwards. Initializing the destination table can be fairly slow, especially because we do things like syncing to the external catalog, and possibly cleaning up aborted commits. After this PR, the loader subscribes to the stream concurrently with initializing the destination table. This lets the KCL claim leases before they get stolen.
In common-streams 0.8.x we shifted alerting / retrying / webhook out of the applications and into the common library. It also adds new features like heartbeat webhooks starting when the loader first becomes healthy.
Before this PR, the loader would generate a failed event if it failed to fetch a required schema from Iglu. However, all events have already passed validation in Enrich, so it is completely unexpected to have an Iglu failure. An Iglu error _probably_ means some type of configuration error or service outage. After this PR, the loader will crash and exit on an Iglu error, instead of creating a failed event. This is probably the preferred behaviour, while the pipeline operator addresses the underlying infrastructure problem. If an Iglu schema is genuinely now unavailable, then the pipeline operator can override the default behaviour by setting `exitOnMissingIgluSchema: false` in the configuration file or by listing the missing schema in `skipschemas`.
On rare occasions I have seen errors where Spark complains about creating two temporary tables with the same name. In the loader we create table names based on the window's start time. The error was unexpected because each window should have a different start time. I believe this is the fix. It ensures view name is computed right at the start of the window, and not after waiting for the table to be initialized. It prevents consecutive windows from picking the same timestamp in the case when the table is very slow to initialize.
The following improvements are introduced via common-streams 0.8.0-M4: - Fields starting with a digit are now prefixed with an underscore `_`. This is needed for Hudi, which does not allow fields starting with a digit (snowplow/schema-ddl#209) - New kinesis source implementation without fs2-kinesis (snowplow-incubator/common-streams#84) - Iglu schemas are resolved in parallel, for short pause times during event processing (snowplow-incubator/common-streams#85) - Common http client configured with restricted max connections per server (snowplow-incubator/common-streams#87) - Iglu scala client 3.2.0 no longer relies on the "list" schemas endpoint (snowplow/iglu-scala-client#255)
The Open Table Formats occasionally need to delete files as part of routine maintenance. For example, Delta deletes old log files, configured via table property `delta.logRetentionDuration`. For the Snowplow Lake Loader, this can mean deleting a very large number of files; bearing in mind this is a streaming loader that commits frequently. And deleting files from cloud storage is relatively slow. I observed that the loader could pause for several minutes on a single commit waiting for files to be deleted. Here I implement a customized Hadoop FileSystem where `delete` returns immediately and then operates asynchronously. This means deleting never blocks the loader's main fiber of execution. It is safe to run `delete` tasks asynchronously because the Open Table Formats do not have a hard requirement that files are deleted immediately.
common-streams 0.8.0-M5 includes the configuration option `maxPullsPerTransportChannel` for the pubsub source.
The pubsub Source from common-streams is a wrapper around `Subscriber` provided by the 3rd-party pubsub sdk. That `Subscriber` is a wrapper around a lower-level GRPC stub. This commit adds an alternative Source which directly wraps the GRPC stub, not the higher-level Subscriber. Compared with the previous Source implementation it has these differences in behaviour: - In the V1 source, ack extension periods were adjusted dynamically according to runtime heuristics of message processing times. In the V2 source, the ack extension period is a fixed configurable period. - The V1 source made a modack request (extending ack deadline) immediately after receiving any message. Whereas the V2 source does not modack a message unless its deadline is about the expire. - The V1 source periodically modacks all unacked messages currently held in memory. This is a problem for e.g. the Lake Loader which can have a very large number of unacked messages at any one time. The V2 source only modacks messages when they are approaching their ack deadline. - The V2 source uses a smaller thread pool for GRPC callbacks. The V1 source needed a very large thread pool to avoid deadlocks in setups that opened a large number of streaming pulls. If this experimental V2 Source is successful, it is likely to be the replacement of the V1 Source in a future release of common-streams.
Closing in favour of doing it in common-streams snowplow-incubator/common-streams#101 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
The pubsub Source from common-streams is a wrapper around
Subscriber
provided by the 3rd-party pubsub sdk. ThatSubscriber
is a wrapper around a lower-level GRPC stub.This commit adds an alternative Source which directly wraps the GRPC stub, not the higher-level Subscriber.
Compared with the previous Source implementation it has these differences in behaviour:
If this experimental V2 Source is successful, it is likely to be the replacement of the V1 Source in a future release of common-streams.