Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various features for 0.4.0 #62

Merged
merged 8 commits into from
Jun 3, 2024
44 changes: 42 additions & 2 deletions modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,58 @@
"hoodie.table.name": "events"
"hoodie.table.keygenerator.class": "org.apache.hudi.keygen.TimestampBasedKeyGenerator"
"hoodie.table.partition.fields": "load_tstamp"
"hoodie.table.precombine.field": "load_tstamp"
"hoodie.keygen.timebased.timestamp.scalar.time.unit": "microseconds"
"hoodie.keygen.timebased.output.dateformat": "yyyy-MM-dd"
}

"hudiWriteOptions": {
# -- This loader works most efficiently with BULK_INSERT instead of INSERT
"hoodie.datasource.write.operation": "BULK_INSERT"

# -- Record key and partition settings. Chosen to be consistent with `hudiTableOptions`.
"hoodie.keygen.timebased.timestamp.type": "SCALAR"
"hoodie.datasource.write.operation": "INSERT"
"hoodie.datasource.write.reconcile.schema": "true"
"hoodie.datasource.write.partitionpath.field": "load_tstamp"
"hoodie.schema.on.read.enable": "true"
"hoodie.metadata.index.column.stats.column.list": "load_tstamp,collector_tstamp,derived_tstamp,dvce_created_tstamp"
// "hoodie.embed.timeline.server.reuse.enabled": "true" // TODO: Experiment with this.
"hoodie.metadata.index.column.stats.enable": "true"
"hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled": "true"

# -- Configures how Hudi manages the timeline
"hoodie.embed.timeline.server": "true"
"hoodie.embed.timeline.server.reuse.enabled": "true"
"hoodie.filesystem.view.incr.timeline.sync.enable": "true"
"hoodie.filesystem.view.type": "SPILLABLE_DISK"

# -- Hive sync is disabled by default. But if someone does enable it then these are helpful settings:
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor"
"hoodie.datasource.hive_sync.partition_fields": "load_tstamp_date"
"hoodie.datasource.hive_sync.support_timestamp": "true"

# -- Clustering: Every 4 commits, rewrite the latest parquet files to boost their size.
"hoodie.clustering.inline": "true"
"hoodie.clustering.plan.partition.filter.mode": "RECENT_DAYS"
"hoodie.clustering.plan.strategy.daybased.lookback.partitions": "1"
"hoodie.clustering.plan.strategy.target.file.max.bytes": "500000000" # 500 MB
"hoodie.clustering.plan.strategy.small.file.limit": "100000000" # 100 MB
"hoodie.clustering.inline.max.commits": "4" # Should be smaller than the ratio `target.file.max.bytes` / `small.file.limit`
"hoodie.clustering.plan.strategy.single.group.clustering.enabled": "false"

# -- Parallelism. This loader works best when we prevent Hudi from ever running >1 task at a time
"hoodie.file.listing.parallelism": "1"
"hoodie.metadata.index.bloom.filter.parallelism": "1"
"hoodie.metadata.index.column.stats.parallelism": "1"
"hoodie.clustering.max.parallelism": "1"
"hoodie.finalize.write.parallelism": "1"
"hoodie.markers.delete.parallelism": "1"
"hoodie.rollback.parallelism": "1"
"hoodie.upsert.shuffle.parallelism": "1"
"hoodie.bloom.index.parallelism": "1"
"hoodie.insert.shuffle.parallelism": "1"
"hoodie.archive.delete.parallelism": "1"
"hoodie.cleaner.parallelism": "1"
"hoodie.clustering.plan.strategy.max.num.groups": "1"
}

"catalog": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ class HudiWriter(config: Config.Hudi) extends Writer {
LOCATION '${config.location}'
TBLPROPERTIES($tableProps)
""")

// We call clean/archive during startup because it also triggers rollback of any previously
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A really interesting way of handling the initial state.

// failed commits. We want to do the rollbacks before early, so that we are immediately
// healthy once we start consuming events.
spark.sql(s"""
CALL run_clean(table => '$internal_table_name')
""")
spark.sql(s"""
CALL archive_commits(table => '$internal_table_name')
""")
}.void
}

Expand Down