diff --git a/src/astro/databases/snowflake.py b/src/astro/databases/snowflake.py index d6c2fe0e8..26e34c528 100644 --- a/src/astro/databases/snowflake.py +++ b/src/astro/databases/snowflake.py @@ -1,5 +1,6 @@ """Snowflake database implementation.""" import logging +import os import random import string from dataclasses import dataclass, field @@ -291,6 +292,48 @@ def drop_stage(self, stage: SnowflakeStage) -> None: # Table load methods # --------------------------------------------------------- + def load_file_to_table_natively( + self, + source_file: File, + target_table: Table, + if_exists: LoadExistStrategy = "replace", + native_support_kwargs: Optional[Dict] = None, + **kwargs, + ): + """ + Load the content of a file to an existing Snowflake table natively by: + - Creating a Snowflake external stage + - Using Snowflake COPY INTO statement + + Requirements: + - The user must have permissions to create a STAGE in Snowflake. + - If loading from GCP Cloud Storage, `native_support_kwargs` must define `storage_integration` + - If loading from AWS S3, the credentials for creating the stage may be retrieved from + the Airflow connection or from the `storage_integration` attribute within `native_support_kwargs`. + + :param source_file: + :param target_table: + :param if_exists + + .. seealso:: + `Snowflake official documentation on COPY INTO + `_ + `Snowflake official documentation on CREATE STAGE + `_ + """ + native_support_kwargs = native_support_kwargs or {} + storage_integration = native_support_kwargs.get("storage_integration") + stage = self.create_stage( + file=source_file, storage_integration=storage_integration + ) + table_name = self.get_table_qualified_name(target_table) + file_path = os.path.basename(source_file.path) or "" + sql_statement = ( + f"COPY INTO {table_name} FROM @{stage.qualified_name}/{file_path}" + ) + self.hook.run(sql_statement) + self.drop_stage(stage) + def load_pandas_dataframe_to_table( self, source_dataframe: pd.DataFrame,