Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix SQL type merging for pre-existing target tables #898

Merged
Merged
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 53 additions & 15 deletions singer_sdk/streams/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,27 +721,53 @@ def merge_sql_types(
if len(sql_types) == 1:
return sql_types[0]

# Gathering Type to match variables
# sent in _adapt_column_type
current_type = sql_types[0]
# sql_type = sql_types[1]

# Getting the length of each type
# current_type_len: int = getattr(sql_types[0], "length", 0)
sql_type_len: int = getattr(sql_types[1], "length", 0)
if sql_type_len is None:
sql_type_len = 0

# Convert the two types given into a sorted list
# containing the best conversion classes
sql_types = self._sort_types(sql_types)

# If greater than two evaluate the first pair then on down the line
if len(sql_types) > 2:
return self.merge_sql_types(
[self.merge_sql_types([sql_types[0], sql_types[1]])] + sql_types[2:]
)

assert len(sql_types) == 2
generic_type = type(sql_types[0].as_generic())
if isinstance(generic_type, type):
if issubclass(
generic_type,
(sqlalchemy.types.String, sqlalchemy.types.Unicode),
):
return sql_types[0]

elif isinstance(
generic_type,
(sqlalchemy.types.String, sqlalchemy.types.Unicode),
):
return sql_types[0]
# Get the generic type class
for opt in sql_types:
# Get the length
opt_len: int = getattr(opt, "length", 0)
generic_type = type(opt.as_generic())

if isinstance(generic_type, type):
if issubclass(
generic_type,
(sqlalchemy.types.String, sqlalchemy.types.Unicode),
):
# If length None or 0 then is varchar max ?
if (opt_len is None) or (opt_len == 0):
return opt
elif isinstance(
generic_type,
(sqlalchemy.types.String, sqlalchemy.types.Unicode),
):
# If length None or 0 then is varchar max ?
if (opt_len is None) or (opt_len == 0):
return opt
# If best conversion class is equal to current type
# return the best conversion class
elif str(opt) == str(current_type):
return opt

raise ValueError(
f"Unable to merge sql types: {', '.join([str(t) for t in sql_types])}"
Expand Down Expand Up @@ -827,9 +853,21 @@ def _adapt_column_type(
Raises:
NotImplementedError: if altering columns is not supported.
"""
current_type = self._get_column_type(full_table_name, column_name)
current_type: sqlalchemy.types.TypeEngine = self._get_column_type(
full_table_name, column_name
)

# Check if the existing column type and the sql type are the same
if str(sql_type) == str(current_type):
# The current column and sql type are the same
# Nothing to do
return

# Not the same type, generic type or compatible types
# calling merge_sql_types for assistnace
compatible_sql_type = self.merge_sql_types([current_type, sql_type])
if current_type == compatible_sql_type:

if str(compatible_sql_type) == str(current_type):
# Nothing to do
return

Expand Down