diff --git a/CHANGELOG.md b/CHANGELOG.md index f828b277..6738a98b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +2.2.3 (2023-06-28) +------------------- + +*Changes* +- Allow retention period in days to be optionally set in the config using the `retention` parameter + 2.2.2 (2023-04-27) ------------------- diff --git a/README.md b/README.md index 0083d115..308d43f2 100644 --- a/README.md +++ b/README.md @@ -181,6 +181,7 @@ Full list of options in `config.json`: | archive_load_files_s3_bucket | String | | (Default: Value of `s3_bucket`) When `archive_load_files` is enabled, the archived files will be placed in this bucket. | s3_proxies | Object | No | (Default: None) If not set then http_proxy and https_proxy and other environmental settings will dictate which proxy is used. If this is set then you can specify a proxy for the S3 Upload connection to use, or set to `{}` to force the S3 Uploader to bypass a proxy entirely | replication_method | String | No | If not set then target-snowflake behaves as normal, with inserts or upserts into an existing table. If this is set to the value `"truncate"` then the target table is truncated before loading commences | +| retention | Integer | No | If not set then target-snowflake creates tables with the default retention period for the database. If the value is set then this is the number of days to indicate in the data_retention_time_in_days parameter. | ### To run tests: diff --git a/target_snowflake/db_sync.py b/target_snowflake/db_sync.py index c179f419..5e0c9c22 100644 --- a/target_snowflake/db_sync.py +++ b/target_snowflake/db_sync.py @@ -71,6 +71,10 @@ def validate_config(config): if replication_method not in ['append','truncate']: errors.append(f'Unrecognised replication_method: {replication_method} - valid values are append, truncate') + retention = config.get('retention',0) + if (not isinstance(retention,int) or retention < 0): + errors.append(f'Retention period invalid: {retention} - must be a positive integer indicating the number of days') + return errors @@ -571,8 +575,27 @@ def create_table_query(self, is_temporary=False): p_temp = 'TEMP ' if is_temporary else '' p_table_name = self.table_name(stream_schema_message['stream'], is_temporary) p_columns = ', '.join(columns + primary_key) - p_extra = 'data_retention_time_in_days = 0 ' if is_temporary else 'data_retention_time_in_days = 1 ' - return f'CREATE {p_temp}TABLE IF NOT EXISTS {p_table_name} ({p_columns}) {p_extra}' + retention = '' + + if is_temporary: + retention = "data_retention_time_in_days = 0" + elif self.connection_config.get('retention'): + retention = f"data_retention_time_in_days = {self.connection_config.get('retention')}" + + create_table_statement = ( + "CREATE " + f"{p_temp}" + "TABLE IF NOT EXISTS " + f"{p_table_name}" + " (" + f"{p_columns}" + ") " + f"{retention}" + ) + + self.logger.debug(create_table_statement) + + return create_table_statement def grant_usage_on_schema(self, schema_name, grantee): """Grant usage on schema"""