Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various features for 0.4.0 #62

Merged
merged 8 commits into from
Jun 3, 2024
Merged

Conversation

istreeter
Copy link
Collaborator

@istreeter istreeter commented Jun 1, 2024

Jira ref: PDP-1221

Improve default Iceberg table properties

  • Explicitly collect column statistics for the most important timestamp columns
  • Enable the object storage location provider, which adds a hash component to file paths
  • Make icebergTableProperties configurable, so users can override the defaults.

Bad rows must not exceed the maximum size allowed by the sink


Trap SIGTERM and start graceful shutdown

This loader tries to do graceful shutdown. Unfortunately, some of the 3rd party libraries try to do their own graceful shutdown as soon as the JVM begins to shutdown. This is bad for the loader because it interferes with our delayed graceful shutdown.

This commit works by trapping the SIGTERM so we explicitly start our own graceful shutdown before the JVM (and therefore 3rd party libs) start their own shutdown.


Improve default Hudi configuration settings

I have recently tested this loader with Hudi at high event volume. I believe I have found a good combination of Hudi configuration options that work well with this loader.

Selected highlights:

  • BULK_INSERT is the best write operation to use with this loader. It is more compatible with how we share the local spark context across "write" tasks and "transform" tasks.
  • Clustering can be enabled safely, if settings are chosen carefully. This enables bumping small parquet files into larger files. The settings are chosen for reasonable memory requirements and without impacting latency.
  • Hudi parallelism settings should be turned down to 1. This is more compatible with how this loader uses a small local spark context, which is shared across "write" tasks and "transform" tasks.

Enable syncing Hudi metadata to the Glue Catalog

Hudi has a feature in which it syncs the table's schema and partitions to the Glue catalog. This is helpful for users who want to query a Hudi table via AWS Athena.

This commit adds a missing dependency and config settings so the Hudi/Glue sync now works with this loader, if configured.


Turn on metrics logging for Iceberg

By changing the log level of LoggingMetricsReporter to info we get an appropriate and helpful level of information in the logs when writing to Iceberg format.


Improve default Delta table properties

This commit changes the default table properties when creating a new table. The changes are only relevant when running the loader for the first time. The new defaults are based on our recent experience of loading to Delta with high even volume.

  • delta.logRetentionDuration. This affects users who periodically run a compaction job on their Delta Table. Previously, we kept old log files for 30 days beyond the compaction job. For high volume pipelines, by reducing this to 1 day we can reduce the number of log files to be managed by Delta.
  • delta.dataSkippingStatsColumns. We want Delta to collect stats on columns load_tstamp, collector_tstamp, derived_tstamp and dvce_created_tstamp. Previously we achieving this by moving those four columns to the far left of the table, and then setting dataSkippingColumns = 4. Delta has a new option dataSkippingStatsColumns where we can explicitly name the columns to index. It is better for the end user, because they can potentially alter the table to add any custom column to the list.
  • delta.checkpointInterval. By default, Delta creates a checkpoint every 10 commits. Because the Lake Loader commits frequently, and because it scales horizontally to multiple loaders, we have found improved efficiency by decreasing how often it writes a checkpoint.

This commit also sets the spark option spark.databricks.delta.autoCompact.enabled to false. This is only needed in case the customer ever manually sets the table property delta.autoOptimize.autoCompact: it is important we override the table property.


Bump common-streams and iceberg to latest versions

In #60 I bumped common-streams to 0.7.0-M2. This consolidates the version on 0.7.0.

In #59 I bumped iceberg to 1.5.1. But the Iceberg release notes strongly encourage updating to 1.5.2.

@istreeter istreeter force-pushed the various-features-for-0.4.0 branch from 0dc7a7b to be61051 Compare June 1, 2024 19:43
istreeter added 8 commits June 3, 2024 10:42
In #60 I bumped common-streams to 0.7.0-M2. This consolidates the
version on 0.7.0.

In #59 I bumped iceberg to 1.5.1.  But the Iceberg release notes
strongly encourage updating to 1.5.2.
This commit changes the default table properties when creating a new table. The changes are only relevant when running the loader for the first time. The new defaults are based on our recent experience of loading to Delta with high even volume.

- `delta.logRetentionDuration`.  This affects users who periodically run a compaction job on their Delta Table. Previously, we kept old log files for 30 days beyond the compaction job.  For high volume pipelines, by reducing this to 1 day we can reduce the number of log files to be managed by Delta.
- `delta.dataSkippingStatsColumns`.  We want Delta to collect stats on columns `load_tstamp`, `collector_tstamp`, `derived_tstamp` and `dvce_created_tstamp`.  Previously we achieving this by moving those four columns to the far left of the table, and then setting `dataSkippingColumns = 4`. Delta has a new option `dataSkippingStatsColumns` where we can explicitly name the columns to index.  It is better for the end user, because they can potentially alter the table to add any custom column to the list.
- `delta.checkpointInterval`. By default, Delta creates a checkpoint every 10 commits.  Because the Lake Loader commits frequently, and because it scales horizontally to multiple loaders, we have found improved efficiency by decreasing how often it writes a checkpoint.

This commit also sets the spark option `spark.databricks.delta.autoCompact.enabled` to false.  This is only needed in case the customer ever manually sets the table property `delta.autoOptimize.autoCompact`: it is important we override the table property.
By changing the log level of `LoggingMetricsReporter` to `info` we get
an appropriate and helpful level of information in the logs when writing
to Iceberg format.
Hudi has a feature in which it syncs the table's schema and partitions
to the Glue catalog. This is helpful for users who want to query a Hudi
table via AWS Athena.

This commit adds a missing dependency and config settings so the
Hudi/Glue sync now works with this loader, if configured.
I have recently tested this loader with Hudi at high event volume. I
believe I have found a good combination of Hudi configuration options
that work well with this loader.

Selected highlights:

- `BULK_INSERT` is the best write operation to use with this loader. It
  is more compatible with how we share the local spark context across
  "write" tasks and "transform" tasks.
- Clustering can be enabled safely, if settings are chosen carefully.
  This enables bumping small parquet files into larger files. The
  settings are chosen for reasonable memory requirements and without
  impacting latency.
- Hudi parallelism settings should be turned down to 1. This is more
  compatible with how this loader uses a small local spark context,
  which is shared across "write" tasks and "transform" tasks.
This loader tries to do graceful shutdown. Unfortunately, some of the
3rd party libraries try to do their own graceful shutdown as soon as the
JVM begins to shutdown. This is bad for the loader because it interferes
with our delayed graceful shutdown.

This commit works by trapping the SIGTERM so we explicitly start our own
graceful shutdown before the JVM (and therefore 3rd party libs) start
their own shutdown.
- Explicitly collect column statistics for the most important timestamp
  columns
- Enable the object storage location provider, which adds a hash
  component to file paths
- Make icebergTableProperties configurable, so users can override the
  defaults.
@istreeter istreeter force-pushed the various-features-for-0.4.0 branch from be61051 to fee675f Compare June 3, 2024 09:43
"delta.checkpointInterval": "50"
}

"hudiTableOptions": {
"icebergTableProperties": {
"write.spark.accept-any-schema": "true"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, we only explicitly set "write.spark.accept-any-schema": "true" whereas the others are new ones. But as I understand these don't brake the contract and should not affect the runtime 👍

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They don't break the contract. Furthermore, these properties are only used when creating a new table for the first time. So they are only relevant for new users; it does not impact old users upgrading from an earlier version.

@@ -48,6 +48,16 @@ class HudiWriter(config: Config.Hudi) extends Writer {
LOCATION '${config.location}'
TBLPROPERTIES($tableProps)
""")

// We call clean/archive during startup because it also triggers rollback of any previously
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A really interesting way of handling the initial state.

# -- This can be blank in most setups because the loader already sets sensible defaults.
"deltaTableProperties": {
"delta.dataSkippingStatsColumns": "load_tstamp,collector_tstamp,derived_tstamp,dvce_created_tstamp"
"delta.checkpointInterval": "50"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume some of these will be exposed through a config management. The loader won't be able to sanity-check the value (and possibly some others). Are we expected to have some sort of check on how this value is set versus eg. commit frequency?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lake Loader has several places where we give the open-source user a lot of power to set 3rd-party configuration properties. For example, every single possible spark property can be overridden by the user. And the loader does not ever sanity-check any of these settings.

I like this approach for now, because the Lake Loader is still young and I am still learning what works well as default settings. My intention is to get good defaults so that most users under most circumstances never need to think about these advanced options. So even though it is configurable, I would not encourage anyone to change it.

If later we discover there is a setting that is relevant for many users... then we could document it, and possibly sanity-check it. E.g. if we discover a setting for which some users might want value X but other users might want value Y. On the other hand, if the 3rd-party documentation is already clear about an add-on feature, then maybe we would not document it ourselves.

I don't know if I've really answered your question there??

@istreeter istreeter merged commit fee675f into develop Jun 3, 2024
2 checks passed
@istreeter istreeter deleted the various-features-for-0.4.0 branch June 3, 2024 15:14
istreeter added a commit that referenced this pull request Jul 11, 2024
In #62 we added a feature so that bad rows are re-sized if their
serialized length exceeds the maximum size allowed by the sink.

This fixes a bug which meant the resizing was not working properly.
@istreeter istreeter mentioned this pull request Jul 11, 2024
istreeter added a commit that referenced this pull request Jul 12, 2024
In #62 we added a feature so that bad rows are re-sized if their
serialized length exceeds the maximum size allowed by the sink.

This fixes a bug which meant the resizing was not working properly.
oguzhanunlu pushed a commit that referenced this pull request Nov 1, 2024
In #62 we added a feature so that bad rows are re-sized if their
serialized length exceeds the maximum size allowed by the sink.

This fixes a bug which meant the resizing was not working properly.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants