Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Handle all type-mapping with SQLConnector.jsonschema_to_sql #304

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ target-version = "py39"

[tool.ruff.lint]
ignore = [
"ANN101", # missing-type-self
"ANN102", # missing-type-cls
"ANN201",
"TD",
"D",
Expand Down
86 changes: 13 additions & 73 deletions target_snowflake/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,15 @@
import urllib.parse
from enum import Enum
from functools import cached_property
from operator import contains, eq
from pathlib import Path
from typing import TYPE_CHECKING, Any, cast
from typing import TYPE_CHECKING, Any

import snowflake.sqlalchemy.custom_types as sct
import sqlalchemy
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from singer_sdk import typing as th
from singer_sdk.connectors import SQLConnector
from singer_sdk.connectors.sql import FullyQualifiedName
from singer_sdk.connectors.sql import FullyQualifiedName, JSONSchemaToSQL
from singer_sdk.exceptions import ConfigValidationError
from snowflake.sqlalchemy import URL
from snowflake.sqlalchemy.base import SnowflakeIdentifierPreparer
Expand All @@ -27,30 +25,6 @@

from sqlalchemy.engine import Engine

SNOWFLAKE_MAX_STRING_LENGTH = 16777216


class TypeMap:
def __init__(self, operator, map_value, match_value=None) -> None: # noqa: ANN001
self.operator = operator
self.map_value = map_value
self.match_value = match_value

def match(self, compare_value): # noqa: ANN001
try:
if self.match_value:
return self.operator(compare_value, self.match_value)
return self.operator(compare_value)
except TypeError:
return False


def evaluate_typemaps(type_maps, compare_value, unmatched_value): # noqa: ANN001
for type_map in type_maps:
if type_map.match(compare_value):
return type_map.map_value
return unmatched_value


class SnowflakeFullyQualifiedName(FullyQualifiedName):
def __init__(
Expand Down Expand Up @@ -89,6 +63,8 @@ class SnowflakeConnector(SQLConnector):
allow_merge_upsert: bool = False # Whether MERGE UPSERT is supported.
allow_temp_tables: bool = True # Whether temp tables are supported.

max_varchar_length = 16_777_216

def __init__(self, *args: Any, **kwargs: Any) -> None:
self.table_cache: dict = {}
self.schema_cache: dict = {}
Expand Down Expand Up @@ -309,52 +285,16 @@ def get_column_alter_ddl(
},
)

@staticmethod
def _conform_max_length(jsonschema_type): # noqa: ANN205, ANN001
"""Alter jsonschema representations to limit max length to Snowflake's VARCHAR length."""
max_length = jsonschema_type.get("maxLength")
if max_length and max_length > SNOWFLAKE_MAX_STRING_LENGTH:
jsonschema_type["maxLength"] = SNOWFLAKE_MAX_STRING_LENGTH
return jsonschema_type

def to_sql_type(self, jsonschema_type: dict) -> sqlalchemy.types.TypeEngine:
"""Return a JSON Schema representation of the provided type.

Uses custom Snowflake types from [snowflake-sqlalchemy](https://github.com/snowflakedb/snowflake-sqlalchemy/blob/main/src/snowflake/sqlalchemy/custom_types.py)

Args:
jsonschema_type: The JSON Schema representation of the source type.

Returns:
The SQLAlchemy type representation of the data type.
"""
# start with default implementation
jsonschema_type = SnowflakeConnector._conform_max_length(jsonschema_type)
target_type = super().to_sql_type(jsonschema_type)
# snowflake max and default varchar length
@cached_property
def jsonschema_to_sql(self) -> JSONSchemaToSQL:
# https://docs.snowflake.com/en/sql-reference/intro-summary-data-types.html
maxlength = jsonschema_type.get("maxLength", SNOWFLAKE_MAX_STRING_LENGTH)
# define type maps
string_submaps = [
TypeMap(eq, TIMESTAMP_NTZ(), "date-time"),
TypeMap(contains, sqlalchemy.types.TIME(), "time"),
TypeMap(eq, sqlalchemy.types.DATE(), "date"),
TypeMap(eq, sqlalchemy.types.VARCHAR(maxlength), None),
]
type_maps = [
TypeMap(th._jsonschema_type_check, NUMBER(), ("integer",)), # noqa: SLF001
TypeMap(th._jsonschema_type_check, VARIANT(), ("object",)), # noqa: SLF001
TypeMap(th._jsonschema_type_check, VARIANT(), ("array",)), # noqa: SLF001
TypeMap(th._jsonschema_type_check, sct.DOUBLE(), ("number",)), # noqa: SLF001
]
# apply type maps
if th._jsonschema_type_check(jsonschema_type, ("string",)): # noqa: SLF001
datelike_type = th.get_datelike_property_type(jsonschema_type)
target_type = evaluate_typemaps(string_submaps, datelike_type, target_type)
else:
target_type = evaluate_typemaps(type_maps, jsonschema_type, target_type)

return cast(sqlalchemy.types.TypeEngine, target_type)
to_sql = super().jsonschema_to_sql
to_sql.register_type_handler("integer", NUMBER)
to_sql.register_type_handler("object", VARIANT)
to_sql.register_type_handler("array", VARIANT)
to_sql.register_type_handler("number", sct.DOUBLE)
to_sql.register_format_handler("date-time", TIMESTAMP_NTZ)
return to_sql

def schema_exists(self, schema_name: str) -> bool:
if schema_name in self.schema_cache:
Expand Down
Loading