Skip to content

Commit

Permalink
Add Snowflake load_file optimization
Browse files Browse the repository at this point in the history
Fix: #430

Co-authored-by: Ankit Chaurasia <[email protected]>
  • Loading branch information
2 people authored and kaxil committed Jul 20, 2022
1 parent cd46468 commit 25598fe
Showing 1 changed file with 43 additions and 0 deletions.
43 changes: 43 additions & 0 deletions src/astro/databases/snowflake.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Snowflake database implementation."""
import logging
import os
import random
import string
from dataclasses import dataclass, field
Expand Down Expand Up @@ -291,6 +292,48 @@ def drop_stage(self, stage: SnowflakeStage) -> None:
# Table load methods
# ---------------------------------------------------------

def load_file_to_table_natively(
self,
source_file: File,
target_table: Table,
if_exists: LoadExistStrategy = "replace",
native_support_kwargs: Optional[Dict] = None,
**kwargs,
):
"""
Load the content of a file to an existing Snowflake table natively by:
- Creating a Snowflake external stage
- Using Snowflake COPY INTO statement
Requirements:
- The user must have permissions to create a STAGE in Snowflake.
- If loading from GCP Cloud Storage, `native_support_kwargs` must define `storage_integration`
- If loading from AWS S3, the credentials for creating the stage may be retrieved from
the Airflow connection or from the `storage_integration` attribute within `native_support_kwargs`.
:param source_file:
:param target_table:
:param if_exists
.. seealso::
`Snowflake official documentation on COPY INTO
<https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html>`_
`Snowflake official documentation on CREATE STAGE
<https://docs.snowflake.com/en/sql-reference/sql/create-stage.html>`_
"""
native_support_kwargs = native_support_kwargs or {}
storage_integration = native_support_kwargs.get("storage_integration")
stage = self.create_stage(
file=source_file, storage_integration=storage_integration
)
table_name = self.get_table_qualified_name(target_table)
file_path = os.path.basename(source_file.path) or ""
sql_statement = (
f"COPY INTO {table_name} FROM @{stage.qualified_name}/{file_path}"
)
self.hook.run(sql_statement)
self.drop_stage(stage)

def load_pandas_dataframe_to_table(
self,
source_dataframe: pd.DataFrame,
Expand Down

0 comments on commit 25598fe

Please sign in to comment.