diff --git a/airflow/hooks/dbapi_hook.py b/airflow/hooks/dbapi_hook.py index 5ca7ae438c47c..11957ffe2df4f 100644 --- a/airflow/hooks/dbapi_hook.py +++ b/airflow/hooks/dbapi_hook.py @@ -46,6 +46,12 @@ def get_conn(self): def get_pandas_df(self, sql, parameters=None): ''' Executes the sql and returns a pandas dataframe + + :param sql: the sql statement to be executed (str) or a list of + sql statements to execute + :type sql: str or list + :param parameters: The parameters to render the SQL query with. + :type parameters: mapping or iterable ''' import pandas.io.sql as psql conn = self.get_conn() @@ -56,6 +62,12 @@ def get_pandas_df(self, sql, parameters=None): def get_records(self, sql, parameters=None): ''' Executes the sql and returns a set of records. + + :param sql: the sql statement to be executed (str) or a list of + sql statements to execute + :type sql: str or list + :param parameters: The parameters to render the SQL query with. + :type parameters: mapping or iterable ''' conn = self.get_conn() cur = self.get_cursor() @@ -70,7 +82,13 @@ def get_records(self, sql, parameters=None): def get_first(self, sql, parameters=None): ''' - Executes the sql and returns a set of records. + Executes the sql and returns the first resulting row. + + :param sql: the sql statement to be executed (str) or a list of + sql statements to execute + :type sql: str or list + :param parameters: The parameters to render the SQL query with. + :type parameters: mapping or iterable ''' conn = self.get_conn() cur = conn.cursor() @@ -92,6 +110,11 @@ def run(self, sql, autocommit=False, parameters=None): :param sql: the sql statement to be executed (str) or a list of sql statements to execute :type sql: str or list + :param autocommit: What to set the connection's autocommit setting to + before executing the query. + :type autocommit: bool + :param parameters: The parameters to render the SQL query with. + :type parameters: mapping or iterable """ conn = self.get_conn() if isinstance(sql, basestring): @@ -124,6 +147,16 @@ def insert_rows(self, table, rows, target_fields=None, commit_every=1000): """ A generic way to insert a set of tuples into a table, the whole set of inserts is treated as one transaction + + :param table: Name of the target table + :type table: str + :param rows: The rows to insert into the table + :type rows: iterable of tuples + :param target_fields: The names of the columns to fill in the table + :type target_fields: iterable of strings + :param commit_every: The maximum number of rows to insert in one + transaction. Set to 0 to insert all rows in one transaction. + :type commit_every: int """ if target_fields: target_fields = ", ".join(target_fields) @@ -147,7 +180,7 @@ def insert_rows(self, table, rows, target_fields=None, commit_every=1000): target_fields, ",".join(values)) cur.execute(sql) - if i % commit_every == 0: + if commit_every and i % commit_every == 0: conn.commit() logging.info( "Loaded {i} into {table} rows so far".format(**locals())) @@ -173,5 +206,10 @@ def _serialize_cell(cell): def bulk_load(self, table, tmp_file): """ Loads a tab-delimited file into a database table + + :param table: The name of the target table + :type table: str + :param tmp_file: The path of the file to load into the table + :type tmp_file: str """ raise NotImplementedError()