diff --git a/airflow/hooks/__init__.py b/airflow/hooks/__init__.py index 9bf5410affdae..af2a7a5747724 100644 --- a/airflow/hooks/__init__.py +++ b/airflow/hooks/__init__.py @@ -24,6 +24,7 @@ 'dbapi_hook': ['DbApiHook'], 'mssql_hook': ['MsSqlHook'], 'oracle_hook': ['OracleHook'], + 'vertica_hook': ['VerticaHook'], } _import_module_attrs(globals(), _hooks) diff --git a/airflow/hooks/vertica_hook.py b/airflow/hooks/vertica_hook.py new file mode 100644 index 0000000000000..45cd0d0e6f434 --- /dev/null +++ b/airflow/hooks/vertica_hook.py @@ -0,0 +1,32 @@ +from vertica_python import connect + +from airflow.hooks.dbapi_hook import DbApiHook + +class VerticaHook(DbApiHook): + ''' + Interact with Vertica. + ''' + + conn_name_attr = 'vertica_conn_id' + default_conn_name = 'vertica_default' + supports_autocommit = True + + def get_conn(self): + """ + Returns verticaql connection object + """ + conn = self.get_connection(self.vertica_conn_id) + conn_config = { + "user": conn.login, + "password": conn.password, + "database": conn.schema, + } + + conn_config["host"] = conn.host or 'localhost' + if not conn.port: + conn_config["port"] = 5433 + else: + conn_config["port"] = int(conn.port) + + conn = connect(**conn_config) + return conn diff --git a/airflow/models.py b/airflow/models.py index c2b86293459e6..eb2825d33ae18 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -390,6 +390,8 @@ def get_hook(self): return hooks.MsSqlHook(mssql_conn_id=self.conn_id) elif self.conn_type == 'oracle': return hooks.OracleHook(oracle_conn_id=self.conn_id) + elif self.conn_type == 'vertica': + return hooks.VerticaHook(vertica_conn_id=self.conn_id) except: return None diff --git a/airflow/operators/__init__.py b/airflow/operators/__init__.py index a6c810abf8deb..0f346c4c058a7 100644 --- a/airflow/operators/__init__.py +++ b/airflow/operators/__init__.py @@ -28,6 +28,8 @@ 'mysql_operator': ['MySqlOperator'], 'sqlite_operator': ['SqliteOperator'], 'mysql_to_hive': ['MySqlToHiveTransfer'], + 'vertica_operator': ['VerticaOperator'], + 'vertica_to_hive': ['VerticaToHiveTransfer'], 'postgres_operator': ['PostgresOperator'], 'sensors': [ 'SqlSensor', diff --git a/airflow/operators/vertica_operator.py b/airflow/operators/vertica_operator.py new file mode 100644 index 0000000000000..fdce789116cfb --- /dev/null +++ b/airflow/operators/vertica_operator.py @@ -0,0 +1,33 @@ +import logging + +from airflow.hooks import VerticaHook +from airflow.models import BaseOperator +from airflow.utils import apply_defaults + + +class VerticaOperator(BaseOperator): + """ + Executes sql code in a specific Vertica database + + :param vertica_conn_id: reference to a specific Vertica database + :type vertica_conn_id: string + :param sql: the sql code to be executed + :type sql: Can receive a str representing a sql statement, + a list of str (sql statements), or reference to a template file. + Template reference are recognized by str ending in '.sql' + """ + + template_fields = ('sql',) + template_ext = ('.sql',) + ui_color = '#b4e0ff' + + @apply_defaults + def __init__(self, sql, vertica_conn_id='vertica_default', *args, **kwargs): + super(VerticaOperator, self).__init__(*args, **kwargs) + self.vertica_conn_id = vertica_conn_id + self.sql = sql + + def execute(self, context): + logging.info('Executing: ' + str(self.sql)) + hook = VerticaHook(vertica_conn_id=self.vertica_conn_id) + hook.run(self.sql) diff --git a/airflow/operators/vertica_to_hive.py b/airflow/operators/vertica_to_hive.py new file mode 100644 index 0000000000000..afc9d18b0c1f2 --- /dev/null +++ b/airflow/operators/vertica_to_hive.py @@ -0,0 +1,113 @@ +from builtins import chr +from collections import OrderedDict +import unicodecsv as csv +import logging +from tempfile import NamedTemporaryFile + +from airflow.hooks import HiveCliHook, VerticaHook +from airflow.models import BaseOperator +from airflow.utils import apply_defaults + +class VerticaToHiveTransfer(BaseOperator): + """ + Moves data from Vertia to Hive. The operator runs + your query against Vertia, stores the file locally + before loading it into a Hive table. If the ``create`` or + ``recreate`` arguments are set to ``True``, + a ``CREATE TABLE`` and ``DROP TABLE`` statements are generated. + Hive data types are inferred from the cursor's metadata. + Note that the table generated in Hive uses ``STORED AS textfile`` + which isn't the most efficient serialization format. If a + large amount of data is loaded and/or if the table gets + queried considerably, you may want to use this operator only to + stage the data into a temporary table before loading it into its + final destination using a ``HiveOperator``. + :param sql: SQL query to execute against the Vertia database + :type sql: str + :param hive_table: target Hive table, use dot notation to target a + specific database + :type hive_table: str + :param create: whether to create the table if it doesn't exist + :type create: bool + :param recreate: whether to drop and recreate the table at every execution + :type recreate: bool + :param partition: target partition as a dict of partition columns and values + :type partition: dict + :param delimiter: field delimiter in the file + :type delimiter: str + :param vertica_conn_id: source Vertica connection + :type vertica_conn_id: str + :param hive_conn_id: destination hive connection + :type hive_conn_id: str + """ + + template_fields = ('sql', 'partition', 'hive_table') + template_ext = ('.sql',) + ui_color = '#b4e0ff' + + @apply_defaults + def __init__( + self, + sql, + hive_table, + create=True, + recreate=False, + partition=None, + delimiter=chr(1), + vertica_conn_id='vertica_default', + hive_cli_conn_id='hive_cli_default', + *args, **kwargs): + super(VerticaToHiveTransfer, self).__init__(*args, **kwargs) + self.sql = sql + self.hive_table = hive_table + self.partition = partition + self.create = create + self.recreate = recreate + self.delimiter = str(delimiter) + self.vertica_conn_id = vertica_conn_id + self.hive_cli_conn_id = hive_cli_conn_id + self.partition = partition or {} + + @classmethod + def type_map(cls, vertica_type): + # vertica-python datatype.py donot provied the full type mapping access. + # Manual hack. Reference: https://github.com/uber/vertica-python/blob/master/vertica_python/vertica/column.py + d = { + 5: 'BOOLEAN', + 6: 'INT', + 7: 'FLOAT', + 8: 'STRING', + 9: 'STRING', + 16: 'FLOAT', + } + return d[vertica_type] if vertica_type in d else 'STRING' + + def execute(self, context): + hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id) + vertica = VerticaHook(vertica_conn_id=self.vertica_conn_id) + + logging.info("Dumping Vertica query results to local file") + conn = vertica.get_conn() + cursor = conn.cursor() + cursor.execute(self.sql) + with NamedTemporaryFile("w") as f: + csv_writer = csv.writer(f, delimiter=self.delimiter, encoding='utf-8') + field_dict = OrderedDict() + col_count = 0 + for field in cursor.description: + col_count += 1 + col_position = "Column{position}".format(position=col_count) + field_dict[col_position if field[0] == '' else field[0]] = self.type_map(field[1]) + csv_writer.writerows(cursor.iterate()) + f.flush() + cursor.close() + conn.close() + logging.info("Loading file into Hive") + hive.load_file( + f.name, + self.hive_table, + field_dict=field_dict, + create=self.create, + partition=self.partition, + delimiter=self.delimiter, + recreate=self.recreate) diff --git a/airflow/utils.py b/airflow/utils.py index 391039cc62c69..4bfb5ed09af63 100644 --- a/airflow/utils.py +++ b/airflow/utils.py @@ -186,6 +186,14 @@ def initdb(): host='localhost', port=1433)) session.commit() + conn = session.query(C).filter(C.conn_id == 'vertica_default').first() + if not conn: + session.add( + models.Connection( + conn_id='vertica_default', conn_type='vertica', + host='localhost', port=5433)) + session.commit() + # Known event types KET = models.KnownEventType if not session.query(KET).filter(KET.know_event_type == 'Holiday').first(): diff --git a/airflow/www/app.py b/airflow/www/app.py index 0aa9ab480ff06..a27080da49296 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -1728,6 +1728,7 @@ class ConnectionModelView(wwwutils.SuperUserMixin, AirflowModelView): ('mysql', 'MySQL',), ('postgres', 'Postgres',), ('oracle', 'Oracle',), + ('vertica', 'Vertica',), ('presto', 'Presto',), ('s3', 'S3',), ('samba', 'Samba',), diff --git a/docs/installation.rst b/docs/installation.rst index c8c1025f20ed9..9fbcf42e81af9 100644 --- a/docs/installation.rst +++ b/docs/installation.rst @@ -42,6 +42,9 @@ Here's the list of the subpackages and what they enable: | mssql | ``pip install airflow[mssql]`` | Microsoft SQL operators and hook, | | | | support as an Airflow backend | +-------------+------------------------------------+------------------------------------------------+ +| vertica | ``pip install airflow[vertica]`` | Vertica hook | +| | | support as an Airflow backend | ++-------------+------------------------------------+------------------------------------------------+ | slack | ``pip install airflow[slack]`` | ``SlackAPIPostOperator`` | +-------------+------------------------------------+------------------------------------------------+ | all | ``pip install airflow[all]`` | All Airflow features known to man | diff --git a/setup.py b/setup.py index 0a5cc4e38d00a..87c29c395db98 100644 --- a/setup.py +++ b/setup.py @@ -27,8 +27,9 @@ slack = ['slackclient>=0.15'] crypto = ['cryptography>=0.9.3'] oracle = ['cx_Oracle>=5.1.2'] +vertica = ['vertica-python>=0.5.1'] -all_dbs = postgres + mysql + hive + mssql + hdfs +all_dbs = postgres + mysql + hive + mssql + hdfs + vertica devel = all_dbs + doc + samba + s3 + ['nose'] + slack + crypto + oracle setup( @@ -80,6 +81,7 @@ 'slack': slack, 'crypto': crypto, 'oracle': oracle, + 'vertica': vertica, }, author='Maxime Beauchemin', author_email='maximebeauchemin@gmail.com',