Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

should fast_pandas_ingest_via_hive clean up hive tables if ingest fails? #45

Open
MichaelTiemannOSC opened this issue Feb 5, 2023 · 0 comments
Assignees

Comments

@MichaelTiemannOSC
Copy link
Contributor

There is this comment in fast_pandas_ingest_via_hive which acknowledges that the user might give us garbage that cannot be ingested (for one reason or another):

    # verify destination table first, to fail early and avoid creation of hive tables

But ingestion can still fail if the Hive is asked to create parquet datatypes it cannot (date), or Hive and Parquet don't agree on data representations (such as timestamp(3) vs. timestamp(6). Perhaps it would be best to wrap the 2nd half of the function in a try/except statement so that either the ingestion succeeds or the hive table is cleaned up. On the one hand, this would make it more difficult to debug because we cannot see the bad table, but on the other, it prevents the potential creation of many tables that cannot be easily removed because random owners keep making random mistakes until they figure out the right answer (or give up), like this (needs to be massaged into runnable code by choosing actual correct exceptions to catch, declaring those exceptions, etc):

    try:
        if verbose:
            print(f"\ndeclaring intermediate hive table {hive_catalog}.{hive_schema}.{hive_table}")
        tabledef = f"create table if not exists {hive_catalog}.{hive_schema}.{hive_table} (\n"
        tabledef += f"{columnschema}\n"
        tabledef += ") with (\n    format = 'parquet',\n"
        if len(partition_columns) > 0:
            tabledef += f"    partitioned_by = array{partition_columns},\n"
        tabledef += f"    external_location = 's3a://{hive_bucket.name}/trino/{hive_schema}/{hive_table}/'\n)"
        _do_sql(tabledef, engine, verbose=verbose)

        if verbose:
            print("\nsyncing partition metadata on intermediate hive table")
        if len(partition_columns) > 0:
            sql = text(f"call {hive_catalog}.system.sync_partition_metadata('{hive_schema}', '{hive_table}', 'FULL')")
            _do_sql(sql, engine, verbose=verbose)

        if overwrite:
            if verbose:
                print(f"\noverwriting data in {catalog}.{schema}.{table}")
            sql = f"delete from {catalog}.{schema}.{table}"
            _do_sql(sql, engine, verbose=verbose)

        if verbose:
            print(f"\ntransferring data: {hive_catalog}.{hive_schema}.{hive_table} -> {catalog}.{schema}.{table}")
        sql = f"insert into {catalog}.{schema}.{table}\nselect * from {hive_catalog}.{hive_schema}.{hive_table}"
        _do_sql(sql, engine, verbose=verbose)
        if verbose:
            print(f"\ndeleting table and data for intermediate table {hive_catalog}.{hive_schema}.{hive_table}")
        oscu.drop_unmanaged_table(hive_catalog, hive_schema, hive_table, engine, hive_bucket, verbose=verbose)
    except SQLAlchemyError(e):
        # Clean up table that will otherwise be orphaned
        if verbose:
            print(f"\ndeleting table and data for intermediate table {hive_catalog}.{hive_schema}.{hive_table}")
        oscu.drop_unmanaged_table(hive_catalog, hive_schema, hive_table, engine, hive_bucket, verbose=verbose)
        raise

We currently have four such orphan tables in the hive ingest schema. We'd have more, but I've been manually pruning the ones I create (using oscu.drop_unmanaged_table so as to reclaim their storage as well).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants