Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[contrib.postgres] copy_from does not accept schema.table notation in most recent psycopg2 versions #3198

Closed
delhomer opened this issue Sep 7, 2022 · 0 comments · Fixed by #3324

Comments

@delhomer
Copy link
Contributor

delhomer commented Sep 7, 2022

Description

I'm trying to maintain an old (2018) project that includes a lot of Luigi tasks, amongst which there are some tasks derived from CopyToTable.

In this project, the PostgreSQL database contains several schemas, and some data may be added to tables which are not in default public schema through CopyToTable-derived tasks.

However CopyToTable uses the cursor.copy_from method, which has been recently modified in psycopg2 API (see e.g psycopg/psycopg2#1294). Hence using Luigi with a recent psycopg2 raises an error like psycopg2.errors.UndefinedTable: relation "schema.table" does not exist.

Expected behavior

Taking into account the behavior change in psycopg2, considering schema.table notation for Postgres tables that are located in a dedicated schema.

Minimal Working Example

Let's consider the following Python module (let's call it luigi_copytotable.py) :

from luigi.contrib.postgres import CopyToTable
import pandas as pd


class SendToDB(CopyToTable):
    """Insert bike availability data into a PostgreSQL table                                  
    """

    host = "localhost"
    database = "my_db"
    user = "my_username"
    password = "my_password"

    columns = [('a', 'VARCHAR'), ('b', 'INT')]

    @property
    def table(self):
        return 'my_schema.my_table'

    def rows(self):
        df = pd.DataFrame({"a": ["foo", "bar", "wiz"], "b": [1, 2, 3]})
        for idx, row in df.iterrows():
            yield row.values

Running luigi --local-scheduler --module luigi_copytotable SendToDB throws:

16:04 $ luigi --local-scheduler --module luigi_copytotable SendToDB_
DEBUG: Checking if SendToDB() is complete
INFO: Informed scheduler that task   SendToDB__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 494717] Worker Worker(salt=450412579, workers=1, host=*******, username=my_username, pid=494717) running   SendToDB_()
INFO: Done writing, importing at 2022-09-07 16:04:05.364381
INFO: Creating table my_schema.my_table
ERROR: [pid 494717] Worker Worker(salt=450412579, workers=1, host=*******, username=my_username, pid=494717) failed    SendToDB()
Traceback (most recent call last):
  File "/home/rdelhome/.virtualenvs/jitenv/lib/python3.10/site-packages/luigi/worker.py", line 198, in run
    new_deps = self._run_get_new_deps()
  File "/home/rdelhome/.virtualenvs/jitenv/lib/python3.10/site-packages/luigi/worker.py", line 138, in _run_get_new_deps
    task_gen = self.task.run()
  File "/home/rdelhome/.virtualenvs/jitenv/lib/python3.10/site-packages/luigi/contrib/postgres.py", line 403, in run
    self.copy(cursor, tmp_file)
  File "/home/rdelhome/.virtualenvs/jitenv/lib/python3.10/site-packages/luigi/contrib/postgres.py", line 358, in copy
    cursor.copy_from(
psycopg2.errors.UndefinedTable: relation "my_schema.my_table" does not exist

DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   SendToDB__99914b932b   has status   FAILED
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 1 pending tasks possibly being run by other workers
DEBUG: There are 1 pending tasks unique to this worker
DEBUG: There are 1 pending tasks last scheduled by this worker
INFO: Worker Worker(salt=450412579, workers=1, host=*********, username=my_username, pid=494717) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 failed:
    - 1 SendToDB()

This progress looks :( because there were failed tasks

===== Luigi Execution Summary =====

Hints for resolution

As suggested in the psycopg2 issue, use copy_expert ? Or maybe modify the if predicate in https://github.com/spotify/luigi/blob/master/luigi/contrib/postgres.py#L357, to choose the else case if copy_from is not happy...

(Note: As a temporary solution, I've downgraded my  psycopg2 version to <2.9 to make it work.)

Related issue

See on psycopg2 project: psycopg/psycopg2#1294

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant