From 166cd75b7a7c26eaf9269e3de641feff92a64d99 Mon Sep 17 00:00:00 2001 From: dichenli Date: Mon, 18 Dec 2017 19:42:37 -0800 Subject: [PATCH] 1. Add region argument. 2. If sortColumn or PartitionKey fields are None, convert them to empty lists. 3. Fix bug: None timestamp in CreateTime causes export_to_metastore failure. 4. Avoid None database_prefix and table_prefix. 5. Readme updates and fixes. 6. Code style improvements. --- utilities/Hive_metastore_migration/README.md | 38 +++- .../src/export_from_datacatalog.py | 14 +- .../src/hive_metastore_migration.py | 215 ++++++++++++------ .../src/import_into_datacatalog.py | 36 +-- 4 files changed, 200 insertions(+), 103 deletions(-) diff --git a/utilities/Hive_metastore_migration/README.md b/utilities/Hive_metastore_migration/README.md index eac18df..a20ffe5 100644 --- a/utilities/Hive_metastore_migration/README.md +++ b/utilities/Hive_metastore_migration/README.md @@ -134,6 +134,10 @@ Below are instructions for using each of the migration workflows described above you created to point to the Hive metastore. It is used to extract the Hive JDBC connection information using the native Spark library. + - `--region` the AWS region for Glue Data Catalog, for example, `us-east-1`. + You can find a list of Glue supported regions here: http://docs.aws.amazon.com/general/latest/gr/rande.html#glue_region. + If not provided, `us-east-1` is used as default. + - `--database-prefix` (optional) set to a string prefix that is applied to the database name created in AWS Glue Data Catalog. You can use it as a way to track the origin of the metadata, and avoid naming conflicts. The default @@ -164,7 +168,8 @@ If the above solutions don't apply to your situation, you can choose to first migrate your Hive metastore to Amazon S3 objects as a staging area, then run an ETL job to import the metadata from S3 to the AWS Glue Data Catalog. To do this, you need to have a Spark 2.1.x cluster that can connect to your Hive metastore and export -metadata to plain files on S3. +metadata to plain files on S3. The Hive metastore to S3 migration can also run +as an Glue ETL job, if AWS Glue can directly connect to your Hive metastore. 1. Make the MySQL connector jar available to the Spark cluster on the master and all worker nodes. Include the jar in the Spark driver class path as well @@ -229,9 +234,12 @@ metadata to plain files on S3. Add the following parameters. - `--mode` set to `from-s3` - - `--database-input-path` set to the S3 path containing only databases. - - `--table-input-path` set to the S3 path containing only tables. - - `--partition-input-path` set to the S3 path containing only partitions. + - `--region` the AWS region for Glue Data Catalog, for example, `us-east-1`. + You can find a list of Glue supported regions here: http://docs.aws.amazon.com/general/latest/gr/rande.html#glue_region. + If not provided, `us-east-1` is used as default. + - `--database-input-path` set to the S3 path containing only databases. For example: `s3://someBucket/output_path_from_previous_job/databases` + - `--table-input-path` set to the S3 path containing only tables. For example: `s3://someBucket/output_path_from_previous_job/tables` + - `--partition-input-path` set to the S3 path containing only partitions. For example: `s3://someBucket/output_path_from_previous_job/partitions` Also, because there is no need to connect to any JDBC source, the job doesn't require any connections. @@ -315,6 +323,9 @@ metadata to plain files on S3. directly to a jdbc Hive Metastore - `--connection-name` set to the name of the AWS Glue connection you created to point to the Hive metastore. It is the destination of the migration. + - `--region` the AWS region for Glue Data Catalog, for example, `us-east-1`. + You can find a list of Glue supported regions here: http://docs.aws.amazon.com/general/latest/gr/rande.html#glue_region. + If not provided, `us-east-1` is used as default. - `--database-names` set to a semi-colon(;) separated list of database names to export from Data Catalog. @@ -333,7 +344,10 @@ metadata to plain files on S3. instructions above. Since the destination is now an S3 bucket instead of a Hive metastore, no connections are required. In the job, add the following parameters: - - `--mode` set to `to-S3`, which means the migration is to S3. + - `--mode` set to `to-s3`, which means the migration is to S3. + - `--region` the AWS region for Glue Data Catalog, for example, `us-east-1`. + You can find a list of Glue supported regions here: http://docs.aws.amazon.com/general/latest/gr/rande.html#glue_region. + If not provided, `us-east-1` is used as default. - `--database-names` set to a semi-colon(;) separated list of database names to export from Data Catalog. - `--output-path` set to the S3 destination path. @@ -365,8 +379,7 @@ metadata to plain files on S3. #### AWS Glue Data Catalog to another AWS Glue Data Catalog -Currently, you cannot access an AWS Glue Data Catalog in another account. -However, you can migrate (copy) metadata from the Data Catalog in one account to another. The steps are: +You can migrate (copy) metadata from the Data Catalog in one account to another. The steps are: 1. Enable cross-account access for an S3 bucket so that both source and target accounts can access it. See [the Amazon S3 documenation](http://docs.aws.amazon.com/AmazonS3/latest/dev/example-bucket-policies.html#example-bucket-policies-use-case-1) @@ -379,7 +392,7 @@ However, you can migrate (copy) metadata from the Data Catalog in one account to 3. Upload the the following scripts to an S3 bucket accessible from the target AWS account to be updated: - export_from_datacatalog.py + import_into_datacatalog.py hive_metastore_migration.py 4. In the source AWS account, create a job on the AWS Glue console to extract metadata from the AWS Glue Data Catalog to S3. @@ -391,7 +404,10 @@ However, you can migrate (copy) metadata from the Data Catalog in one account to Add the following parameters: - - `--mode` set to `to-S3`, which means the migration is to S3. + - `--mode` set to `to-s3`, which means the migration is to S3. + - `--region` the AWS region for Glue Data Catalog, for example, `us-east-1`. + You can find a list of Glue supported regions here: http://docs.aws.amazon.com/general/latest/gr/rande.html#glue_region. + If not provided, `us-east-1` is used as default. - `--database-names` set to a semi-colon(;) separated list of database names to export from Data Catalog. - `--output-path` set to the S3 destination path that you configured with **cross-account access**. @@ -407,10 +423,12 @@ However, you can migrate (copy) metadata from the Data Catalog in one account to Add the following parameters. - `--mode` set to `from-s3` + - `--region` the AWS region for Glue Data Catalog, for example, `us-east-1`. + You can find a list of Glue supported regions here: http://docs.aws.amazon.com/general/latest/gr/rande.html#glue_region. + If not provided, `us-east-1` is used as default. - `--database-input-path` set to the S3 path containing only databases. - `--table-input-path` set to the S3 path containing only tables. - `--partition-input-path` set to the S3 path containing only partitions. 6. (Optional) Manually delete the temporary files generated in the S3 folder. Also, remember to revoke the cross-account access if it's not needed anymore. - diff --git a/utilities/Hive_metastore_migration/src/export_from_datacatalog.py b/utilities/Hive_metastore_migration/src/export_from_datacatalog.py index 6fd77c3..1481666 100644 --- a/utilities/Hive_metastore_migration/src/export_from_datacatalog.py +++ b/utilities/Hive_metastore_migration/src/export_from_datacatalog.py @@ -18,7 +18,7 @@ from hive_metastore_migration import * -CONNECTION_TYPE_NAME = "com.amazonaws.services.glue.connections.DataCatalogConnection" +CONNECTION_TYPE_NAME = 'com.amazonaws.services.glue.connections.DataCatalogConnection' def transform_catalog_to_df(dyf): return dyf.toDF() @@ -50,7 +50,7 @@ def datacatalog_migrate_to_hive_metastore(sc, sql_context, databases, tables, pa hive_metastore.export_to_metastore() -def read_databases_from_catalog(sql_context, glue_context, datacatalog_name, database_arr): +def read_databases_from_catalog(sql_context, glue_context, datacatalog_name, database_arr, region): databases = None tables = None partitions = None @@ -59,7 +59,9 @@ def read_databases_from_catalog(sql_context, glue_context, datacatalog_name, dat dyf = glue_context.create_dynamic_frame.from_options( connection_type=CONNECTION_TYPE_NAME, - connection_options={"catalog.name": datacatalog_name, "catalog.database": database}) + connection_options={'catalog.name': datacatalog_name, + 'catalog.database': database, + 'catalog.region': region}) df = transform_catalog_to_df(dyf) @@ -88,6 +90,7 @@ def main(): parser.add_argument('--database-names', required=True, help='Semicolon-separated list of names of database in Datacatalog to export') parser.add_argument('-o', '--output-path', required=False, help='Output path, either local directory or S3 path') parser.add_argument('-c', '--connection-name', required=False, help='Glue Connection name for Hive metastore JDBC connection') + parser.add_argument('-R', '--region', required=False, help='AWS region of source Glue DataCatalog, default to "us-east-1"') options = get_options(parser, sys.argv) if options['mode'] == to_s3: @@ -105,6 +108,8 @@ def main(): else: raise AssertionError('unknown mode ' + options['mode']) + validate_aws_regions(options['region']) + # spark env (conf, sc, sql_context) = get_spark_env() glue_context = GlueContext(sc) @@ -116,7 +121,8 @@ def main(): sql_context=sql_context, glue_context=glue_context, datacatalog_name='datacatalog', - database_arr=database_arr + database_arr=database_arr, + region=options.get('region') or 'us-east-1' ) if options['mode'] == to_s3: diff --git a/utilities/Hive_metastore_migration/src/hive_metastore_migration.py b/utilities/Hive_metastore_migration/src/hive_metastore_migration.py index 1b7ec79..1656225 100644 --- a/utilities/Hive_metastore_migration/src/hive_metastore_migration.py +++ b/utilities/Hive_metastore_migration/src/hive_metastore_migration.py @@ -2,9 +2,9 @@ # Licensed under the Amazon Software License (the "License"). You may not use # this file except in compliance with the License. A copy of the License is # located at -# +# # http://aws.amazon.com/asl/ -# +# # and in the "LICENSE" file accompanying this file. This file is distributed # on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express # or implied. See the License for the specific language governing @@ -15,6 +15,7 @@ import sys import argparse import re +import logging from time import localtime, strftime from types import MethodType from datetime import tzinfo, datetime, timedelta @@ -34,34 +35,34 @@ DATACATALOG_STORAGE_DESCRIPTOR_SCHEMA = \ StructType([ - StructField('inputFormat', StringType(), True), - StructField('compressed', BooleanType(), False), - StructField('storedAsSubDirectories', BooleanType(), False), - StructField('location', StringType(), True), - StructField('numberOfBuckets', IntegerType(), False), - StructField('outputFormat', StringType(), True), - StructField('bucketColumns', ArrayType(StringType(), True), True), - StructField('columns', ArrayType(StructType([ - StructField('name', StringType(), True), - StructField('type', StringType(), True), - StructField('comment', StringType(), True) - ]), True), True), - StructField('parameters', MapType(StringType(), StringType(), True), True), - StructField('serdeInfo', StructType([ - StructField('name', StringType(), True), - StructField('serializationLibrary', StringType(), True), - StructField('parameters', MapType(StringType(), StringType(), True), True) - ]), True), - StructField('skewedInfo', StructType([ - StructField('skewedColumnNames', ArrayType(StringType(), True), True), - StructField('skewedColumnValueLocationMaps', MapType(StringType(), StringType(), True), True), - StructField('skewedColumnValues', ArrayType(StringType(), True), True) - ]), True), - StructField('sortColumns', ArrayType(StructType([ - StructField('column', StringType(), True), - StructField('order', IntegerType(), True) - ]), True), True) - ]) + StructField('inputFormat', StringType(), True), + StructField('compressed', BooleanType(), False), + StructField('storedAsSubDirectories', BooleanType(), False), + StructField('location', StringType(), True), + StructField('numberOfBuckets', IntegerType(), False), + StructField('outputFormat', StringType(), True), + StructField('bucketColumns', ArrayType(StringType(), True), True), + StructField('columns', ArrayType(StructType([ + StructField('name', StringType(), True), + StructField('type', StringType(), True), + StructField('comment', StringType(), True) + ]), True), True), + StructField('parameters', MapType(StringType(), StringType(), True), True), + StructField('serdeInfo', StructType([ + StructField('name', StringType(), True), + StructField('serializationLibrary', StringType(), True), + StructField('parameters', MapType(StringType(), StringType(), True), True) + ]), True), + StructField('skewedInfo', StructType([ + StructField('skewedColumnNames', ArrayType(StringType(), True), True), + StructField('skewedColumnValueLocationMaps', MapType(StringType(), StringType(), True), True), + StructField('skewedColumnValues', ArrayType(StringType(), True), True) + ]), True), + StructField('sortColumns', ArrayType(StructType([ + StructField('column', StringType(), True), + StructField('order', IntegerType(), True) + ]), True), True) + ]) DATACATALOG_DATABASE_ITEM_SCHEMA = \ StructType([ @@ -104,8 +105,8 @@ DATACATALOG_DATABASE_SCHEMA = \ StructType([ StructField('items', ArrayType( - DATACATALOG_DATABASE_ITEM_SCHEMA, False), - True), + DATACATALOG_DATABASE_ITEM_SCHEMA, False), + True), StructField('type', StringType(), False) ]) @@ -193,7 +194,7 @@ def drop_columns(df, columns_to_drop): def rename_columns(df, rename_tuples=None): """ - rename columns, for each key in rename_map, rename column from key to value + Rename columns, for each key in rename_map, rename column from key to value :param df: dataframe :param rename_map: map for columns to be renamed :return: new dataframe with columns renamed @@ -226,7 +227,7 @@ def join_other_to_single_column(df, other, on, how, new_column_name): def coalesce_by_row_count(df, desired_rows_per_partition=10): """ - coalesce dataframe to reduce number of partitions, to avoid fragmentation of data + Coalesce dataframe to reduce number of partitions, to avoid fragmentation of data :param df: dataframe :param desired_rows_per_partition: desired number of rows per partition, there is no guarantee the actual rows count is larger or smaller @@ -240,7 +241,7 @@ def coalesce_by_row_count(df, desired_rows_per_partition=10): def batch_items_within_partition(sql_context, df, key_col, value_col, values_col): """ - group a DataFrame of key, value pairs, create a list of values for the same key in each spark partition, but there + Group a DataFrame of key, value pairs, create a list of values for the same key in each spark partition, but there is no cross-partition data interaction, so the same key may be shown multiple times in the output dataframe :param sql_context: spark sqlContext :param df: DataFrame with only two columns, a key_col and a value_col @@ -275,7 +276,8 @@ def batch_metastore_partitions(sql_context, df_parts): :param sql_context: the spark SqlContext :param df_parts: the dataframe of partitions with the schema of DATACATALOG_PARTITION_SCHEMA :type df_parts: DataFrame - :return: + :return: a dataframe partition in which each row contains a list of catalog partitions + belonging to the same database and table. """ df_kv = df_parts.select(struct(['database', 'table', 'type']).alias('key'), 'item') batched_kv = batch_items_within_partition(sql_context, df_kv, key_col='key', value_col='item', values_col='items') @@ -470,7 +472,7 @@ def udf_escape_chars(param_value): .replace('"', '\\"')\ .replace('{', '\\{')\ .replace(':', '\\:')\ - .replace('}', '\\}')\ + .replace('}', '\\}') return ret_param_value @@ -538,6 +540,25 @@ def transform_timestamp_cols(df, date_cols_map): df = HiveMetastoreTransformer.utc_timestamp_to_iso8601_time(df, k, v) return df + @staticmethod + def fill_none_with_empty_list(df, column): + """ + Given a column of array type, fill each None value with empty list. + This is not doable by df.na.fill(), Spark will throw Unsupported value type java.util.ArrayList ([]). + :param df: dataframe with array type + :param column: column name string, the column must be array type + :return: dataframe that fills None with empty list for the given column + """ + return HiveMetastoreTransformer.modify_column_by_udf( + df=df, + udf=UserDefinedFunction( + lambda lst: [] if lst is None else lst, + get_schema_type(df, column) + ), + column_to_modify=column, + new_column_name=column + ) + @staticmethod def join_dbs_tbls(ms_dbs, ms_tbls): return ms_dbs.select('DB_ID', 'NAME').join(other=ms_tbls, on='DB_ID', how='inner') @@ -606,7 +627,7 @@ def transform_ms_serde_info(self, ms_serdes, ms_serde_params): serde_with_params = self.join_with_params(df=ms_serdes, df_params=escaped_serde_params, id_col='SERDE_ID') serde_info = serde_with_params.rename_columns(rename_tuples=[ - ('NAME', 'name'), + ('NAME', 'name'), ('SLIB', 'serializationLibrary') ]) return serde_info @@ -642,7 +663,9 @@ def transform_storage_descriptors(self, ms_sds, ms_sd_params, ms_columns, ms_buc ('IS_STOREDASSUBDIRECTORIES', 'storedAsSubDirectories') ]) - storage_descriptors_final = storage_descriptors_renamed.drop_columns(['SERDE_ID', 'CD_ID']) + storage_descriptors_with_empty_sorted_cols = HiveMetastoreTransformer.fill_none_with_empty_list( + storage_descriptors_renamed, 'sortColumns') + storage_descriptors_final = storage_descriptors_with_empty_sorted_cols.drop_columns(['SERDE_ID', 'CD_ID']) return storage_descriptors_final def transform_tables(self, db_tbl_joined, ms_table_params, storage_descriptors, ms_partition_keys): @@ -653,8 +676,8 @@ def transform_tables(self, db_tbl_joined, ms_table_params, storage_descriptors, tbls_with_params = self.join_with_params(df=tbls_date_transformed, df_params=self.transform_param_value(ms_table_params), id_col='TBL_ID') partition_keys = self.transform_ms_partition_keys(ms_partition_keys) - tbls_joined = tbls_with_params \ - .join(other=partition_keys, on='TBL_ID', how='left_outer') \ + tbls_joined = tbls_with_params\ + .join(other=partition_keys, on='TBL_ID', how='left_outer')\ .join_other_to_single_column(other=storage_descriptors, on='SD_ID', how='left_outer', new_column_name='storageDescriptor') @@ -672,7 +695,9 @@ def transform_tables(self, db_tbl_joined, ms_table_params, storage_descriptors, tbls_dropped_cols = tbls_renamed.drop_columns(['DB_ID', 'TBL_ID', 'SD_ID', 'LINK_TARGET_ID']) tbls_drop_invalid = tbls_dropped_cols.na.drop(how='any', subset=['name', 'database']) - tbls_final = tbls_drop_invalid.select( + tbls_with_empty_part_cols = HiveMetastoreTransformer.fill_none_with_empty_list( + tbls_drop_invalid, 'partitionKeys') + tbls_final = tbls_with_empty_part_cols.select( 'database', struct(remove(tbls_dropped_cols.columns, 'database')).alias('item') ).withColumn('type', lit('table')) return tbls_final @@ -792,7 +817,7 @@ def udf_partition_name_from_keys_vals(keys, vals): @staticmethod def udf_milliseconds_str_to_timestamp(milliseconds_str): - return long(milliseconds_str) / 1000 + return 0L if milliseconds_str is None else long(milliseconds_str) / 1000 @staticmethod def udf_string_list_str_to_list(str): @@ -831,11 +856,11 @@ def generate_idx_for_df(df, id_name, col_name, col_schema): :return: new df with exploded rows. """ idx_udf = UserDefinedFunction( - DataCatalogTransformer.udf_array_to_map, + DataCatalogTransformer.udf_array_to_map, MapType(IntegerType(), col_schema, True)) return df.withColumn('idx_columns', idx_udf(col(col_name)))\ - .select(id_name, explode('idx_columns').alias("INTEGER_IDX", "col")) + .select(id_name, explode('idx_columns').alias("INTEGER_IDX", "col")) @staticmethod def column_date_to_timestamp(df, column): @@ -843,8 +868,8 @@ def column_date_to_timestamp(df, column): DataCatalogTransformer.udf_milliseconds_str_to_timestamp, IntegerType()) return df.withColumn(column + '_new', date_to_udf_time_int(col(column)))\ - .drop(column)\ - .withColumnRenamed(column + '_new', column) + .drop(column)\ + .withColumnRenamed(column + '_new', column) @staticmethod def params_to_df(df, id_name): @@ -891,7 +916,7 @@ def extract_dbs(self, databases): .alias('locationUriNew'))\ .drop('locationUri')\ .withColumnRenamed('locationUriNew', 'locationUri') - + return ms_dbs def reformat_dbs(self, ms_dbs): @@ -899,14 +924,14 @@ def reformat_dbs(self, ms_dbs): ('locationUri', 'DB_LOCATION_URI'), ('name', 'NAME') ]) - + return ms_dbs def extract_tbls(self, tables, ms_dbs): ms_tbls_no_id = tables\ - .join(ms_dbs, tables.database == ms_dbs.NAME, 'inner')\ - .select(tables.database, tables.item, ms_dbs.DB_ID)\ - .select('DB_ID', 'database', 'item.*')# database col needed for later + .join(ms_dbs, tables.database == ms_dbs.NAME, 'inner')\ + .select(tables.database, tables.item, ms_dbs.DB_ID)\ + .select('DB_ID', 'database', 'item.*')# database col needed for later ms_tbls = self.generate_id_df(ms_tbls_no_id, 'TBL_ID') return ms_tbls @@ -927,7 +952,7 @@ def reformat_tbls(self, ms_tbls): ('viewExpandedText', 'VIEW_EXPANDED_TEXT'), ('viewOriginalText', 'VIEW_ORIGINAL_TEXT') ]) - + return ms_tbls def get_name_for_partitions(self, ms_partitions, ms_tbls): @@ -939,21 +964,21 @@ def get_name_for_partitions(self, ms_partitions, ms_tbls): ms_partitions = ms_partitions.join(tbls_for_join, ms_partitions.TBL_ID == tbls_for_join.TBL_ID, 'inner')\ .drop(tbls_for_join.TBL_ID)\ - .withColumn('PART_NAME', combine_part_key_and_vals(col('partitionKeys'), col('values')) )\ + .withColumn('PART_NAME', combine_part_key_and_vals(col('partitionKeys'), col('values')))\ .drop('partitionKeys') return ms_partitions def extract_partitions(self, partitions, ms_dbs, ms_tbls): ms_partitions = partitions.join(ms_dbs, partitions.database == ms_dbs.NAME, 'inner')\ - .select(partitions.item, ms_dbs.DB_ID, partitions.table)\ - + .select(partitions.item, ms_dbs.DB_ID, partitions.table) + cond = [ms_partitions.table == ms_tbls.TBL_NAME, ms_partitions.DB_ID == ms_tbls.DB_ID] ms_partitions = ms_partitions.join(ms_tbls, cond, 'inner')\ - .select(ms_partitions.item, ms_tbls.TBL_ID)\ - .select('TBL_ID', 'item.*') - + .select(ms_partitions.item, ms_tbls.TBL_ID)\ + .select('TBL_ID', 'item.*') + # generate PART_ID ms_partitions = self.generate_id_df(ms_partitions, 'PART_ID') @@ -1018,7 +1043,7 @@ def reformat_sds(self, ms_sds): ms_sds = self.generate_id_df(ms_sds, 'CD_ID') ms_sds = self.generate_id_df(ms_sds, 'SERDE_ID') - + return ms_sds def extract_from_dbs(self, hms, ms_dbs): @@ -1029,7 +1054,7 @@ def extract_from_dbs(self, hms, ms_dbs): def extract_from_tbls(self, hms, ms_tbls): ms_table_params = DataCatalogTransformer.params_to_df(ms_tbls, 'TBL_ID') - + part_key_schema = StructType([ StructField('name', StringType(), True), StructField('type', StringType(), True), @@ -1060,7 +1085,7 @@ def extract_from_partitions(self, hms, ms_partitions): ms_partition_key_vals = \ DataCatalogTransformer.generate_idx_for_df(ms_partitions, 'PART_ID', 'values', part_key_val_schema)\ - .withColumnRenamed('col', 'PART_KEY_VAL') + .withColumnRenamed('col', 'PART_KEY_VAL') hms.ms_partition_key_vals = ms_partition_key_vals hms.ms_partition_params = ms_partition_params @@ -1072,7 +1097,7 @@ def extract_from_sds(self, hms, ms_sds): ms_sd_params = DataCatalogTransformer.params_to_df(ms_sds, 'SD_ID') ms_cds = ms_sds.select('CD_ID') - + ms_columns = self.extract_from_sds_columns(ms_sds) (ms_serdes, ms_serde_params) = self.extract_from_sds_serde_info(ms_sds) @@ -1082,7 +1107,7 @@ def extract_from_sds(self, hms, ms_sds): hms.ms_sd_params = ms_sd_params hms.ms_cds = ms_cds hms.ms_columns = ms_columns - hms.ms_serdes = ms_serdes + hms.ms_serdes = ms_serdes hms.ms_serde_params = ms_serde_params hms.ms_sort_cols = ms_sort_cols @@ -1130,15 +1155,15 @@ def extract_from_sds_skewed_info(self, hms, ms_sds): # with extra field 'STRING_LIST_STR' skewed_col_value_loc_map = skewed_info\ .select('SD_ID', explode('skewedColumnValueLocationMaps')\ - .alias('STRING_LIST_STR', 'LOCATION'))\ - + .alias('STRING_LIST_STR', 'LOCATION')) + skewed_col_value_loc_map = self.generate_id_df(skewed_col_value_loc_map, 'STRING_LIST_ID_KID') - udf_string_list_list = UserDefinedFunction(DataCatalogTransformer.udf_string_list_str_to_list, + udf_string_list_list = UserDefinedFunction(DataCatalogTransformer.udf_string_list_str_to_list, ArrayType(StringType(), True)) skewed_string_list_values = skewed_col_value_loc_map\ - .select(col('STRING_LIST_ID_KID').alias('STRING_LIST_ID'), + .select(col('STRING_LIST_ID_KID').alias('STRING_LIST_ID'), udf_string_list_list('STRING_LIST_STR').alias('STRING_LIST_LIST')) ms_skewed_string_list_values = DataCatalogTransformer.generate_idx_for_df( @@ -1165,9 +1190,9 @@ def extract_from_sds_sort_cols(self, ms_sds): StructField('column', StringType(), True), StructField('order', IntegerType(), True) ]))\ - .select('SD_ID', 'INTEGER_IDX', 'col.*')\ - .withColumnRenamed('column', 'COLUMN_NAME')\ - .withColumnRenamed('order', 'ORDER') + .select('SD_ID', 'INTEGER_IDX', 'col.*')\ + .withColumnRenamed('column', 'COLUMN_NAME')\ + .withColumnRenamed('order', 'ORDER') def get_start_id_for_id_name(self, hms): hms.extract_metastore() @@ -1278,6 +1303,7 @@ def extract_metastore(self): # order of write matters here def export_to_metastore(self): + self.ms_dbs.show() self.write_table(connection=self.connection, table_name='DBS', df=self.ms_dbs) self.write_table(connection=self.connection, table_name='DATABASE_PARAMS', df=self.ms_database_params) self.write_table(connection=self.connection, table_name='CDS', df=self.ms_cds) @@ -1288,11 +1314,11 @@ def export_to_metastore(self): self.write_table(connection=self.connection, table_name='SD_PARAMS', df=self.ms_sd_params) self.write_table(connection=self.connection, table_name='SKEWED_COL_NAMES', df=self.ms_skewed_col_names) self.write_table(connection=self.connection, table_name='SKEWED_STRING_LIST', df=self.ms_skewed_string_list) - self.write_table(connection=self.connection, table_name='SKEWED_STRING_LIST_VALUES', + self.write_table(connection=self.connection, table_name='SKEWED_STRING_LIST_VALUES', df=self.ms_skewed_string_list_values) - self.write_table(connection=self.connection, table_name='SKEWED_COL_VALUE_LOC_MAP', + self.write_table(connection=self.connection, table_name='SKEWED_COL_VALUE_LOC_MAP', df=self.ms_skewed_col_value_loc_map) - self.write_table(connection=self.connection, table_name='SORT_COLS', + self.write_table(connection=self.connection, table_name='SORT_COLS', df=self.ms_sort_cols) self.write_table(connection=self.connection, table_name='TBLS', df=self.ms_tbls) self.write_table(connection=self.connection, table_name='TABLE_PARAMS', df=self.ms_table_params) @@ -1433,6 +1459,47 @@ def validate_options_in_mode(options, mode, required_options, not_allowed_option raise AssertionError('Option %s is not allowed for mode %s' % (option, mode)) +def validate_aws_regions(region): + """ + To validate the region in the input. The region list below may be outdated as AWS and Glue expands, so it only + create an error message if validation fails. + If the migration destination is in a region other than Glue supported regions, the job will fail. + :return: None + """ + if region is None: + return + + aws_glue_regions = [ + 'ap-northeast-1' # Tokyo + 'eu-west-1', # Ireland + 'us-east-1', # North Virginia + 'us-east-2', # Ohio + 'us-west-2', # Oregon + ] + + aws_regions = aws_glue_regions + [ + 'ap-northeast-2', # Seoul + 'ap-south-1', # Mumbai + 'ap-southeast-1', # Singapore + 'ap-southeast-2', # Sydney + 'ca-central-1', # Montreal + 'cn-north-1', # Beijing + 'cn-northwest-1', # Ningxia + 'eu-central-1', # Frankfurt + 'eu-west-2', # London + 'sa-east-1', # Sao Paulo + 'us-gov-west-1', # GovCloud + 'us-west-1' # Northern California + ] + + error_msg = "Invalid region: {0}, the job will fail if the destination is not in a Glue supported region".format(region) + if region not in aws_regions: + logging.error(error_msg) + elif region not in aws_glue_regions: + logging.warn(error_msg) + + + def main(): options = parse_arguments(sys.argv) diff --git a/utilities/Hive_metastore_migration/src/import_into_datacatalog.py b/utilities/Hive_metastore_migration/src/import_into_datacatalog.py index 33f3636..5af6350 100644 --- a/utilities/Hive_metastore_migration/src/import_into_datacatalog.py +++ b/utilities/Hive_metastore_migration/src/import_into_datacatalog.py @@ -33,7 +33,7 @@ def transform_df_to_catalog_import_schema(sql_context, glue_context, df_database return dyf_databases, dyf_tables, dyf_partitions -def import_datacatalog(sql_context, glue_context, datacatalog_name, databases, tables, partitions): +def import_datacatalog(sql_context, glue_context, datacatalog_name, databases, tables, partitions, region): # TEMP: get around datacatalog writer performance issue limited_partitions = partitions.limit(10) @@ -43,17 +43,18 @@ def import_datacatalog(sql_context, glue_context, datacatalog_name, databases, t # load glue_context.write_dynamic_frame.from_options( - frame=dyf_databases, connection_type="catalog", - connection_options={"catalog.name": datacatalog_name}) + frame=dyf_databases, connection_type='catalog', + connection_options={'catalog.name': datacatalog_name, 'catalog.region': region}) glue_context.write_dynamic_frame.from_options( - frame=dyf_tables, connection_type="catalog", - connection_options={"catalog.name": datacatalog_name}) + frame=dyf_tables, connection_type='catalog', + connection_options={'catalog.name': datacatalog_name, 'catalog.region': region}) glue_context.write_dynamic_frame.from_options( - frame=dyf_partitions, connection_type="catalog", - connection_options={"catalog.name": datacatalog_name}) + frame=dyf_partitions, connection_type='catalog', + connection_options={'catalog.name': datacatalog_name, 'catalog.region': region}) -def metastore_full_migration(sc, sql_context, glue_context, connection, datacatalog_name, db_prefix, table_prefix): +def metastore_full_migration(sc, sql_context, glue_context, connection, datacatalog_name, db_prefix, table_prefix + , region): # extract hive_metastore = HiveMetastore(connection, sql_context) hive_metastore.extract_metastore() @@ -63,11 +64,11 @@ def metastore_full_migration(sc, sql_context, glue_context, connection, datacata sc, sql_context, db_prefix, table_prefix).transform(hive_metastore) #load - import_datacatalog(sql_context, glue_context, datacatalog_name, databases, tables, partitions) + import_datacatalog(sql_context, glue_context, datacatalog_name, databases, tables, partitions, region) def metastore_import_from_s3(sql_context, glue_context,db_input_dir, tbl_input_dir, parts_input_dir, - datacatalog_name): + datacatalog_name, region): # extract databases = sql_context.read.json(path=db_input_dir, schema=METASTORE_DATABASE_SCHEMA) @@ -75,7 +76,7 @@ def metastore_import_from_s3(sql_context, glue_context,db_input_dir, tbl_input_d partitions = sql_context.read.json(path=parts_input_dir, schema=METASTORE_PARTITION_SCHEMA) # load - import_datacatalog(sql_context, glue_context, datacatalog_name, databases, tables, partitions) + import_datacatalog(sql_context, glue_context, datacatalog_name, databases, tables, partitions, region) def main(): @@ -85,6 +86,7 @@ def main(): parser = argparse.ArgumentParser(prog=sys.argv[0]) parser.add_argument('-m', '--mode', required=True, choices=[from_s3, from_jdbc], help='Choose to migrate metastore either from JDBC or from S3') parser.add_argument('-c', '--connection-name', required=False, help='Glue Connection name for Hive metastore JDBC connection') + parser.add_argument('-R', '--region', required=False, help='AWS region of target Glue DataCatalog, default to "us-east-1"') parser.add_argument('-d', '--database-prefix', required=False, help='Optional prefix for database names in Glue DataCatalog') parser.add_argument('-t', '--table-prefix', required=False, help='Optional prefix for table name in Glue DataCatalog') parser.add_argument('-D', '--database-input-path', required=False, help='An S3 path containing json files of metastore database entities') @@ -107,6 +109,8 @@ def main(): else: raise AssertionError('unknown mode ' + options['mode']) + validate_aws_regions(options['region']) + # spark env (conf, sc, sql_context) = get_spark_env() glue_context = GlueContext(sc) @@ -119,7 +123,8 @@ def main(): db_input_dir=options['database_input_path'], tbl_input_dir=options['table_input_path'], parts_input_dir=options['partition_input_path'], - datacatalog_name='datacatalog' + datacatalog_name='datacatalog', + region=options.get('region') or 'us-east-1' ) elif options['mode'] == from_jdbc: glue_context.extract_jdbc_conf(options['connection_name']) @@ -128,9 +133,10 @@ def main(): sql_context=sql_context, glue_context=glue_context, connection=glue_context.extract_jdbc_conf(options['connection_name']), - db_prefix=options['database_prefix'] if options.has_key('database_prefix') else "", - table_prefix=options['table_prefix'] if options.has_key('table_prefix') else "", - datacatalog_name='datacatalog' + db_prefix=options.get('database_prefix') or '', + table_prefix=options.get('table_prefix') or '', + datacatalog_name='datacatalog', + region=options.get('region') or 'us-east-1' ) if __name__ == '__main__':