Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Support Snowflake's travel time #414

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions airbyte/_processors/sql/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ class SnowflakeConfig(SqlConfig):
database: str
role: str
schema_name: str = Field(default=DEFAULT_CACHE_SCHEMA_NAME)
data_retention_time_in_days: int | None = None

@overrides
def get_create_table_extra_clauses(self) -> list[str]:
"""Return a list of clauses to append on CREATE TABLE statements."""
clauses = []

if self.data_retention_time_in_days is not None:
clauses.append(f"DATA_RETENTION_TIME_IN_DAYS = {self.data_retention_time_in_days}")

return clauses

@overrides
def get_database_name(self) -> str:
Expand Down
7 changes: 7 additions & 0 deletions airbyte/shared/sql_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ def config_hash(self) -> str | None:
)
)

def get_create_table_extra_clauses(self) -> list[str]:
"""Return a list of clauses to append on CREATE TABLE statements."""
return []
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved

def get_sql_engine(self) -> Engine:
"""Return a new SQL engine to use."""
return create_engine(
Expand Down Expand Up @@ -653,10 +657,13 @@ def _create_table(
pk_str = ", ".join(primary_keys)
column_definition_str += f",\n PRIMARY KEY ({pk_str})"

extra_clauses = "\n".join(self.sql_config.get_create_table_extra_clauses())

cmd = f"""
CREATE TABLE {self._fully_qualified(table_name)} (
{column_definition_str}
)
{extra_clauses}
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved
"""
_ = self._execute_sql(cmd)

Expand Down
75 changes: 75 additions & 0 deletions tests/unit_tests/test_processors.py
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
from __future__ import annotations

from pathlib import Path
from typing import Optional
import pytest_mock
from airbyte.caches.snowflake import SnowflakeSqlProcessor, SnowflakeConfig
from airbyte_protocol.models import ConfiguredAirbyteCatalog
from airbyte.secrets.base import SecretString
from airbyte.shared.catalog_providers import CatalogProvider


def test_snowflake_cache_config_data_retention_time_in_days(
mocker: pytest_mock.MockFixture,
):
expected_cmd = """
CREATE TABLE airbyte_raw."table_name" (
col_name type
)
DATA_RETENTION_TIME_IN_DAYS = 1
"""

def _execute_sql(cmd):
global actual_cmd
actual_cmd = cmd

mocker.patch.object(SnowflakeSqlProcessor, "_execute_sql", side_effect=_execute_sql)
config = _build_mocked_snowflake_processor(mocker, data_retention_time_in_days=1)
config._create_table(table_name="table_name", column_definition_str="col_name type")

assert actual_cmd == expected_cmd


def test_snowflake_cache_config_no_data_retention_time_in_days(
mocker: pytest_mock.MockFixture,
):
expected_cmd = """
CREATE TABLE airbyte_raw."table_name" (
col_name type
)
\n """

def _execute_sql(cmd):
global actual_cmd
actual_cmd = cmd

mocker.patch.object(SnowflakeSqlProcessor, "_execute_sql", side_effect=_execute_sql)
config = _build_mocked_snowflake_processor(mocker)
config._create_table(table_name="table_name", column_definition_str="col_name type")

assert actual_cmd == expected_cmd


def _build_mocked_snowflake_processor(
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved
mocker: pytest_mock.MockFixture, data_retention_time_in_days: Optional[int] = None
):
sql_config = SnowflakeConfig(
account="foo",
username="foo",
password=SecretString("foo"),
warehouse="foo",
database="foo",
role="foo",
data_retention_time_in_days=data_retention_time_in_days,
)

mocker.patch.object(
SnowflakeSqlProcessor, "_ensure_schema_exists", return_value=None
)
return SnowflakeSqlProcessor(
catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])),
temp_dir=Path(),
temp_file_cleanup=True,
sql_config=sql_config,
)