You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Originally created by @tbriggs2 on 2022-05-05 16:43:59
Summary
When SQLSink.process_batch writes to a table that already exists it ensures that the column definition is compatible with the latest schema and alters the table's columns if necessary. That logic is failing, at least when using my MySQL target. I believe the error is reproducible with target-sqlite as well (because my MySQL target is based on that...)
Steps to reproduce
Set up a pipeline that loads data into a MySQL table
Execute the pipeline (this will create the table and succeed)
Execute the pipeline again
What is the current bug behavior?
Processing fails with an exception coming from the SDK.
What is the expected correct behavior?
Processing should succeed without error.
Relevant logs and/or screenshots
time=2022-05-05 12:26:14 name=sqlalchemy.engine.Engine level=INFO message=SHOW CREATE TABLE `targetTest`
time=2022-05-05 12:26:14 name=sqlalchemy.engine.Engine level=INFO message=[raw sql] ()
Traceback (most recent call last):
File "/home/tom/.cache/pypoetry/virtualenvs/singer-sqlite-oMbmf0Lm-py3.8/lib/python3.8/site-packages/click/core.py", line 1053, in main
rv = self.invoke(ctx)
File "/home/tom/.cache/pypoetry/virtualenvs/singer-sqlite-oMbmf0Lm-py3.8/lib/python3.8/site-packages/click/core.py", line 1395, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/home/tom/.cache/pypoetry/virtualenvs/singer-sqlite-oMbmf0Lm-py3.8/lib/python3.8/site-packages/click/core.py", line 754, in invoke
return __callback(*args, **kwargs)
File "/home/tom/.cache/pypoetry/virtualenvs/singer-sqlite-oMbmf0Lm-py3.8/lib/python3.8/site-packages/singer_sdk/target_base.py", line 535, in cli
target.listen(file_input)
File "/home/tom/.cache/pypoetry/virtualenvs/singer-sqlite-oMbmf0Lm-py3.8/lib/python3.8/site-packages/singer_sdk/io_base.py", line 43, in listen
self._process_endofpipe()
File "/home/tom/.cache/pypoetry/virtualenvs/singer-sqlite-oMbmf0Lm-py3.8/lib/python3.8/site-packages/singer_sdk/target_base.py", line 275, in _process_endofpipe
self.drain_all()
File "/home/tom/.cache/pypoetry/virtualenvs/singer-sqlite-oMbmf0Lm-py3.8/lib/python3.8/site-packages/singer_sdk/target_base.py", line 413, in drain_all
self._drain_all(list(self._sinks_active.values()), self.max_parallelism)
File "/home/tom/.cache/pypoetry/virtualenvs/singer-sqlite-oMbmf0Lm-py3.8/lib/python3.8/site-packages/singer_sdk/target_base.py", line 436, in _drain_all
self.drain_one(sink)
File "/home/tom/.cache/pypoetry/virtualenvs/singer-sqlite-oMbmf0Lm-py3.8/lib/python3.8/site-packages/singer_sdk/target_base.py", line 430, in drain_one
sink.process_batch(draining_status)
File "/home/tom/.cache/pypoetry/virtualenvs/singer-sqlite-oMbmf0Lm-py3.8/lib/python3.8/site-packages/singer_sdk/sinks/sql.py", line 106, in process_batch
self.connector.prepare_table(
File "/home/tom/.cache/pypoetry/virtualenvs/singer-sqlite-oMbmf0Lm-py3.8/lib/python3.8/site-packages/singer_sdk/streams/sql.py", line 596, in prepare_table
self.prepare_column(
File "/home/tom/.cache/pypoetry/virtualenvs/singer-sqlite-oMbmf0Lm-py3.8/lib/python3.8/site-packages/singer_sdk/streams/sql.py", line 621, in prepare_column
self._adapt_column_type(
File "/home/tom/.cache/pypoetry/virtualenvs/singer-sqlite-oMbmf0Lm-py3.8/lib/python3.8/site-packages/singer_sdk/streams/sql.py", line 774, in _adapt_column_type
compatible_sql_type = self.merge_sql_types([current_type, sql_type])
File "/home/tom/.cache/pypoetry/virtualenvs/singer-sqlite-oMbmf0Lm-py3.8/lib/python3.8/site-packages/singer_sdk/streams/sql.py", line 689, in merge_sql_types
raise ValueError(
ValueError: Unable to merge sql types: INTEGER, INTEGER
Possible fixes
I believe the error is here. The two types are obviously the same but it's concluding that they're not. I'm not actually sure how to fix it though.
The text was updated successfully, but these errors were encountered:
@visch I have been looking at this and I think I figured out why the original merge_sql_types() seemed off to me. The merge_sql_types() is trying to do two tasks that I think oppose each other
Check if the types are equal and return the current column type
Find the current type will accept insert of the new type. If it won't except inserts then find an type that will allow both data types.
The solution you gave I think needs to be placed in the _adapt_column_type() function that calls the merge_sql_types(). I am seeing the function _adapt_column_type() as a question. If the types are the same then the answer to the question is no don't adapt anything. If they are different then go see if the new type is a subclass of the current type and I think a conversion will happen on insert. If all the checks come back false determine a compatible type using the merge_sql_types then adapt (alter) the table column to have that type. This way the merge_sql_types() is only working on non equal types. I made these changes in the PR I submitted. Please take a look let me know what you think.
Migrated from GitLab: https://gitlab.com/meltano/sdk/-/issues/377
Originally created by @tbriggs2 on 2022-05-05 16:43:59
Summary
When SQLSink.process_batch writes to a table that already exists it ensures that the column definition is compatible with the latest schema and alters the table's columns if necessary. That logic is failing, at least when using my MySQL target. I believe the error is reproducible with target-sqlite as well (because my MySQL target is based on that...)
Steps to reproduce
What is the current bug behavior?
Processing fails with an exception coming from the SDK.
What is the expected correct behavior?
Processing should succeed without error.
Relevant logs and/or screenshots
Possible fixes
I believe the error is here. The two types are obviously the same but it's concluding that they're not. I'm not actually sure how to fix it though.
The text was updated successfully, but these errors were encountered: