Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ValueError: Unable to merge sql types: INTEGER, INTEGER #9

Closed
visch opened this issue Jul 17, 2022 · 17 comments
Closed

ValueError: Unable to merge sql types: INTEGER, INTEGER #9

visch opened this issue Jul 17, 2022 · 17 comments
Labels
bug Something isn't working

Comments

@visch
Copy link
Member

visch commented Jul 17, 2022

To replicate run tap-smoke-test twice.

Traceback (most recent call last):
  File "/home/visch/git/target-postgres/.meltano/loaders/target-postgres/venv/bin/target-postgres", line 8, in <module>
    sys.exit(TargetPostgres.cli())
  File "/home/visch/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.8/site-packages/click/core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
  File "/home/visch/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.8/site-packages/click/core.py", line 1055, in main
    rv = self.invoke(ctx)
  File "/home/visch/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.8/site-packages/click/core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/visch/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.8/site-packages/click/core.py", line 760, in invoke
    return __callback(*args, **kwargs)
  File "/home/visch/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.8/site-packages/singer_sdk/target_base.py", line 546, in cli
    target.listen(file_input)
  File "/home/visch/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.8/site-packages/singer_sdk/io_base.py", line 44, in listen
    self._process_endofpipe()
  File "/home/visch/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.8/site-packages/singer_sdk/target_base.py", line 276, in _process_endofpipe
    self.drain_all(is_endofpipe=True)
  File "/home/visch/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.8/site-packages/singer_sdk/target_base.py", line 423, in drain_all
    self._drain_all(list(self._sinks_active.values()), self.max_parallelism)
  File "/home/visch/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.8/site-packages/singer_sdk/target_base.py", line 456, in _drain_all
    Parallel()(delayed(_drain_sink)(sink=sink) for sink in sink_list)
  File "/home/visch/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.8/site-packages/joblib/parallel.py", line 1056, in __call__
    self.retrieve()
  File "/home/visch/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.8/site-packages/joblib/parallel.py", line 935, in retrieve
    self._output.extend(job.get(timeout=self.timeout))
  File "/home/visch/.pyenv/versions/3.8.12/lib/python3.8/multiprocessing/pool.py", line 771, in get
    raise self._value
  File "/home/visch/.pyenv/versions/3.8.12/lib/python3.8/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/home/visch/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.8/site-packages/joblib/_parallel_backends.py", line 595, in __call__
    return self.func(*args, **kwargs)
  File "/home/visch/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.8/site-packages/joblib/parallel.py", line 262, in __call__
    return [func(*args, **kwargs)
  File "/home/visch/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.8/site-packages/joblib/parallel.py", line 262, in <listcomp>
    return [func(*args, **kwargs)
  File "/home/visch/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.8/site-packages/singer_sdk/target_base.py", line 453, in _drain_sink
    self.drain_one(sink)
  File "/home/visch/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.8/site-packages/singer_sdk/target_base.py", line 443, in drain_one
    sink.process_batch(draining_status)
  File "/home/visch/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.8/site-packages/singer_sdk/sinks/sql.py", line 107, in process_batch
    self.connector.prepare_table(
  File "/home/visch/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.8/site-packages/singer_sdk/streams/sql.py", line 652, in prepare_table
    self.prepare_column(
  File "/home/visch/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.8/site-packages/singer_sdk/streams/sql.py", line 677, in prepare_column
    self._adapt_column_type(
  File "/home/visch/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.8/site-packages/singer_sdk/streams/sql.py", line 829, in _adapt_column_type
    compatible_sql_type = self.merge_sql_types([current_type, sql_type])
  File "/home/visch/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.8/site-packages/singer_sdk/streams/sql.py", line 744, in merge_sql_types
    raise ValueError(
ValueError: Unable to merge sql types: INTEGER, INTEGER
@visch visch added the bug Something isn't working label Jul 17, 2022
@visch
Copy link
Member Author

visch commented Aug 2, 2022

  1. poetry run pytest target_postgres/tests/test_standard_target.py::test_relational_data Causes this as well. Going to focus on this one

@aaronsteers
Copy link
Contributor

@visch - Looks like this may be resolve with :

Can you confirm?

@visch
Copy link
Member Author

visch commented Aug 15, 2022

@aaronsteers Yes

Fixes this exception, then we immeditly hit a new one


The above exception was the direct cause of the following exception:

postgres_target = <target_postgres.target.TargetPostgres object at 0x7f494d910b50>

    def test_relational_data(postgres_target):
        file_name = "user_location_data.singer"
>       singer_file_to_target(file_name, postgres_target)

target_postgres/tests/test_standard_target.py:112:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target_postgres/tests/test_standard_target.py:44: in singer_file_to_target
    target.listen(buf)
.venv/lib/python3.9/site-packages/singer_sdk/io_base.py:44: in listen
    self._process_endofpipe()
.venv/lib/python3.9/site-packages/singer_sdk/target_base.py:276: in _process_endofpipe
    self.drain_all(is_endofpipe=True)
.venv/lib/python3.9/site-packages/singer_sdk/target_base.py:423: in drain_all
    self._drain_all(list(self._sinks_active.values()), self.max_parallelism)
.venv/lib/python3.9/site-packages/singer_sdk/target_base.py:449: in _drain_all
    self.drain_one(sink)
.venv/lib/python3.9/site-packages/singer_sdk/target_base.py:443: in drain_one
    sink.process_batch(draining_status)
.venv/lib/python3.9/site-packages/singer_sdk/sinks/sql.py:113: in process_batch
    self.bulk_insert_records(
.venv/lib/python3.9/site-packages/singer_sdk/sinks/sql.py:193: in bulk_insert_records
    self.connector.connection.execute(
.venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1380: in execute
    return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
.venv/lib/python3.9/site-packages/sqlalchemy/sql/elements.py:333: in _execute_on_connection
    return connection._execute_clauseelement(
.venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1572: in _execute_clauseelement
    ret = self._execute_context(
.venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1943: in _execute_context
    self._handle_dbapi_exception(
.venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:2124: in _handle_dbapi_exception
    util.raise_(
.venv/lib/python3.9/site-packages/sqlalchemy/util/compat.py:208: in raise_
    raise exception
.venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1880: in _execute_context
    self.dialect.do_executemany(

As upserts aren't implemented yet working on that now

@visch
Copy link
Member Author

visch commented Aug 15, 2022

There's also a new error during the first run

 sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) can't adapt type 'dict'
 

Not certain what this is from

E           sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) can't adapt type 'dict'
E           [SQL: INSERT INTO test_user_in_location (id, user_id, location_id, info) VALUES (%(id)s, %(user_id)s, %(location_id)s, %(info)s)]
E           [parameters: ({'id': 1, 'user_id': 1, 'location_id': 1, 'info': {'weather': 'rainy', 'mood': 'sad'}}, {'id': 2, 'user_id': 1, 'location_id': 2, 'info': {'weather': 'sunny', 'mood': 'satisfied'}}, {'id': 3, 'user_id': 1, 'location_id': 3, 'info': {'weather': 'sunny', 'mood': 'happy'}})]
E           (Background on this error at: https://sqlalche.me/e/14/f405)

@BuzzCutNorman
Copy link
Contributor

BuzzCutNorman commented Aug 15, 2022

That is from your insert so you are past the table check and build out. Maybe check the batch_insert_records() . That is right that just got refactored to allow for overriding. Check the generate_insert_statment as well. I am not sure how we are suppose to properly override.

cc @edgarrmondragon

@edgarrmondragon
Copy link
Member

generate_insert_statment

@BuzzCutNorman The API is a WIP, and generate_insert_statement is currently expected to return an insert statement as a string. I have not surveyed different databases for upsert methods, but I think most should be able to extent the basic INSERT INTO <table> (<columns...>) VALUES (...) with something like ON CONFLICT (<pk...>) DO UPDATE SET .... So, the overriding pattern could look something like:

def generate_insert_statement(
    self,
    full_table_name: str,
    schema: dict,
):
    stmt = super().generate_insert_statement(full_table_name, schema)
    stmt += "ON CONFLICT ...`

Wdyt?

@BuzzCutNorman
Copy link
Contributor

BuzzCutNorman commented Aug 15, 2022

@edgarrmondragon I wish I was good enough at python to have a definite educated opinion on that. it looks good to me. The only question I have is why manually create the insert statement when you could let sqlalchemy core do it for you. Then it can also edit to match any fast helper functions like psycopg2 execute_many. I overloaded bulk_insert_records() and utilized the following code that creates the batch inserts for me and then execute it. I got the idea from this Psycopg2 Fast Execution Helpers

By “qualifying statements”, we mean that the statement being executed must be a Core insert(), update() or delete() construct, and not a plain textual SQL string or one constructed using text(). When using the ORM, all insert/update/delete statements used by the ORM flush process are qualifying.

        insert_sql = sqlalchemy.insert(self.connector.get_table(full_table_name))
        self.connector.connection.engine.execute(
            insert_sql,
            records,
        )

@BuzzCutNorman
Copy link
Contributor

@visch sweet the title look like it is working for you now 🎈 🥳 🎈

@visch
Copy link
Member Author

visch commented Aug 15, 2022

@visch sweet the title look like it is working for you now 🎈 🥳 🎈

I went with my previous iteration for checking Type differences! But it does work at least for target-postgres's use case

@BuzzCutNorman
Copy link
Contributor

I went with my previous iteration for checking Type differences! But it does work at least for target-postgres's use case
Thanks for giving it a try. 🙏

@edgarrmondragon
Copy link
Member

@edgarrmondragon I wish I was good enough at python to have a definite educated opinion on that. it looks good to me. The only question I have is why manually create the insert statement when you could let sqlalchemy core do it for you.

That'd be ideal. Though it would require target devs to rely on dialect-specific SQLAlchemy functions, like sqlalchemy.dialects.sqlite::insert. I'd be willing to support both a string and Insert object return types and perhaps slowly deprecate the former if no problems come up with the latter. Wdyt?


@aaronsteers one thing that's perhaps not too clear is where responsibility of the SQLConnector ends and that of SQLSink begins. I'm not sure if this an expected use of the API to retrieve an engine and MetaData objects:

from sqlalchemy.dialects.sqlite import insert

class SQLiteSink(SQLSink):
    ...
    def generate_insert_statement(
        self,
        full_table_name: str,
        schema: dict,
    ) -> str:
        engine = self.connector.create_sqlalchemy_engine()
        meta = sqlalchemy.MetaData(bind=engine)
        table = sqlalchemy.Table(full_table_name, meta, autoload=True)
        statement = insert(table)
 
        if self.key_properties:
            statement = statement.on_conflict_do_update(
                index_elements=table.primary_key.columns,
                set_={
                    column.name: getattr(statement.excluded, column.name)
                    for column in table.columns
                },
            )

        return statement

@BuzzCutNorman
Copy link
Contributor

That'd be ideal. Though it would require target devs to rely on dialect-specific SQLAlchemy functions, like sqlalchemy.dialects.sqlite::insert. I'd be willing to support both a string and Insert object return types and perhaps slowly deprecate the former if no problems come up with the latter. Wdy

After working with this a little I thought maybe this might be a way. In the cookiecutter for a SQL target what if you asked what dialect they were going to use. Then add that as an import at the top of the sink.py file. Leave a version of the genertate_insert_statement in the class targetnameSink(SQLSink): that calls the super() of it . Just a thought let me know what you think

@aaronsteers
Copy link
Contributor

@kgpayne, @BuzzCutNorman, @edgarrmondragon - Circling back on this...

I believe this is resolved as of now, due to recent improvements and bug fixes on the SQL interfaces.

Do I have that right?

@visch
Copy link
Member Author

visch commented Oct 28, 2022

generate_insert_statment

@BuzzCutNorman The API is a WIP, and generate_insert_statement is currently expected to return an insert statement as a string. I have not surveyed different databases for upsert methods, but I think most should be able to extent the basic INSERT INTO <table> (<columns...>) VALUES (...) with something like ON CONFLICT (<pk...>) DO UPDATE SET .... So, the overriding pattern could look something like:

def generate_insert_statement(
    self,
    full_table_name: str,
    schema: dict,
):
    stmt = super().generate_insert_statement(full_table_name, schema)
    stmt += "ON CONFLICT ...`

Wdyt?

Created an issue #22 for this

@aaronsteers I'm not certain if the main SDK fixes this now or not. I'll have to test a fresh cookiecutter and after I do that I"ll close up this issue. The key for this repo is this is fixed now :)

@aaronsteers
Copy link
Contributor

aaronsteers commented Nov 1, 2022

From @edgarrmondragon:

@aaronsteers one thing that's perhaps not too clear is where responsibility of the SQLConnector ends and that of SQLSink begins.

I gave this tons of thought and sorry, I missed your question above at the time and did not reply directly.

I propose the distinction:

  • SQLConnector knows everything about how to orchestrate the backend database and nothing about the Singer spec.
  • SQLSink conversely knows almost nothing about the backend database dialect but very much about Singer and the Singer SDK protocols.

This means that SQLConnector should necessarily take on all dialect-related questions and SQL text generation. And SQLSink/SQLTarget might make some high level strategic decisions and/or they may select a viable algorithm/approach from a set of the available approaches - but those are not at all responsible for SQL dialect and SQL DDL expressions themselves.

Wdyt?

@aaronsteers
Copy link
Contributor

@visch - re:

Created an issue #22 for this

Fantastic - thanks!

@aaronsteers I'm not certain if the main SDK fixes this now or not. I'll have to test a fresh cookiecutter and after I do that I"ll close up this issue. The key for this repo is this is fixed now :)

👍 Thanks!

@visch
Copy link
Member Author

visch commented Nov 20, 2022

@aaronsteers this works well from a new cookiecutter target!

@visch visch closed this as completed Nov 20, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants