Skip to content

Commit

Permalink
Merge pull request #418 from griffinqiu/vertica_hook
Browse files Browse the repository at this point in the history
Add Vertica Database support for Airflow
  • Loading branch information
mistercrunch committed Sep 18, 2015
2 parents 3e80e90 + 0fff64b commit df79e59
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 1 deletion.
1 change: 1 addition & 0 deletions airflow/hooks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
'dbapi_hook': ['DbApiHook'],
'mssql_hook': ['MsSqlHook'],
'oracle_hook': ['OracleHook'],
'vertica_hook': ['VerticaHook'],
}

_import_module_attrs(globals(), _hooks)
Expand Down
32 changes: 32 additions & 0 deletions airflow/hooks/vertica_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from vertica_python import connect

from airflow.hooks.dbapi_hook import DbApiHook

class VerticaHook(DbApiHook):
'''
Interact with Vertica.
'''

conn_name_attr = 'vertica_conn_id'
default_conn_name = 'vertica_default'
supports_autocommit = True

def get_conn(self):
"""
Returns verticaql connection object
"""
conn = self.get_connection(self.vertica_conn_id)
conn_config = {
"user": conn.login,
"password": conn.password,
"database": conn.schema,
}

conn_config["host"] = conn.host or 'localhost'
if not conn.port:
conn_config["port"] = 5433
else:
conn_config["port"] = int(conn.port)

conn = connect(**conn_config)
return conn
2 changes: 2 additions & 0 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,8 @@ def get_hook(self):
return hooks.MsSqlHook(mssql_conn_id=self.conn_id)
elif self.conn_type == 'oracle':
return hooks.OracleHook(oracle_conn_id=self.conn_id)
elif self.conn_type == 'vertica':
return hooks.VerticaHook(vertica_conn_id=self.conn_id)
except:
return None

Expand Down
2 changes: 2 additions & 0 deletions airflow/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
'mysql_operator': ['MySqlOperator'],
'sqlite_operator': ['SqliteOperator'],
'mysql_to_hive': ['MySqlToHiveTransfer'],
'vertica_operator': ['VerticaOperator'],
'vertica_to_hive': ['VerticaToHiveTransfer'],
'postgres_operator': ['PostgresOperator'],
'sensors': [
'SqlSensor',
Expand Down
33 changes: 33 additions & 0 deletions airflow/operators/vertica_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import logging

from airflow.hooks import VerticaHook
from airflow.models import BaseOperator
from airflow.utils import apply_defaults


class VerticaOperator(BaseOperator):
"""
Executes sql code in a specific Vertica database
:param vertica_conn_id: reference to a specific Vertica database
:type vertica_conn_id: string
:param sql: the sql code to be executed
:type sql: Can receive a str representing a sql statement,
a list of str (sql statements), or reference to a template file.
Template reference are recognized by str ending in '.sql'
"""

template_fields = ('sql',)
template_ext = ('.sql',)
ui_color = '#b4e0ff'

@apply_defaults
def __init__(self, sql, vertica_conn_id='vertica_default', *args, **kwargs):
super(VerticaOperator, self).__init__(*args, **kwargs)
self.vertica_conn_id = vertica_conn_id
self.sql = sql

def execute(self, context):
logging.info('Executing: ' + str(self.sql))
hook = VerticaHook(vertica_conn_id=self.vertica_conn_id)
hook.run(self.sql)
113 changes: 113 additions & 0 deletions airflow/operators/vertica_to_hive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
from builtins import chr
from collections import OrderedDict
import unicodecsv as csv
import logging
from tempfile import NamedTemporaryFile

from airflow.hooks import HiveCliHook, VerticaHook
from airflow.models import BaseOperator
from airflow.utils import apply_defaults

class VerticaToHiveTransfer(BaseOperator):
"""
Moves data from Vertia to Hive. The operator runs
your query against Vertia, stores the file locally
before loading it into a Hive table. If the ``create`` or
``recreate`` arguments are set to ``True``,
a ``CREATE TABLE`` and ``DROP TABLE`` statements are generated.
Hive data types are inferred from the cursor's metadata.
Note that the table generated in Hive uses ``STORED AS textfile``
which isn't the most efficient serialization format. If a
large amount of data is loaded and/or if the table gets
queried considerably, you may want to use this operator only to
stage the data into a temporary table before loading it into its
final destination using a ``HiveOperator``.
:param sql: SQL query to execute against the Vertia database
:type sql: str
:param hive_table: target Hive table, use dot notation to target a
specific database
:type hive_table: str
:param create: whether to create the table if it doesn't exist
:type create: bool
:param recreate: whether to drop and recreate the table at every execution
:type recreate: bool
:param partition: target partition as a dict of partition columns and values
:type partition: dict
:param delimiter: field delimiter in the file
:type delimiter: str
:param vertica_conn_id: source Vertica connection
:type vertica_conn_id: str
:param hive_conn_id: destination hive connection
:type hive_conn_id: str
"""

template_fields = ('sql', 'partition', 'hive_table')
template_ext = ('.sql',)
ui_color = '#b4e0ff'

@apply_defaults
def __init__(
self,
sql,
hive_table,
create=True,
recreate=False,
partition=None,
delimiter=chr(1),
vertica_conn_id='vertica_default',
hive_cli_conn_id='hive_cli_default',
*args, **kwargs):
super(VerticaToHiveTransfer, self).__init__(*args, **kwargs)
self.sql = sql
self.hive_table = hive_table
self.partition = partition
self.create = create
self.recreate = recreate
self.delimiter = str(delimiter)
self.vertica_conn_id = vertica_conn_id
self.hive_cli_conn_id = hive_cli_conn_id
self.partition = partition or {}

@classmethod
def type_map(cls, vertica_type):
# vertica-python datatype.py donot provied the full type mapping access.
# Manual hack. Reference: https://github.com/uber/vertica-python/blob/master/vertica_python/vertica/column.py
d = {
5: 'BOOLEAN',
6: 'INT',
7: 'FLOAT',
8: 'STRING',
9: 'STRING',
16: 'FLOAT',
}
return d[vertica_type] if vertica_type in d else 'STRING'

def execute(self, context):
hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
vertica = VerticaHook(vertica_conn_id=self.vertica_conn_id)

logging.info("Dumping Vertica query results to local file")
conn = vertica.get_conn()
cursor = conn.cursor()
cursor.execute(self.sql)
with NamedTemporaryFile("w") as f:
csv_writer = csv.writer(f, delimiter=self.delimiter, encoding='utf-8')
field_dict = OrderedDict()
col_count = 0
for field in cursor.description:
col_count += 1
col_position = "Column{position}".format(position=col_count)
field_dict[col_position if field[0] == '' else field[0]] = self.type_map(field[1])
csv_writer.writerows(cursor.iterate())
f.flush()
cursor.close()
conn.close()
logging.info("Loading file into Hive")
hive.load_file(
f.name,
self.hive_table,
field_dict=field_dict,
create=self.create,
partition=self.partition,
delimiter=self.delimiter,
recreate=self.recreate)
8 changes: 8 additions & 0 deletions airflow/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,14 @@ def initdb():
host='localhost', port=1433))
session.commit()

conn = session.query(C).filter(C.conn_id == 'vertica_default').first()
if not conn:
session.add(
models.Connection(
conn_id='vertica_default', conn_type='vertica',
host='localhost', port=5433))
session.commit()

# Known event types
KET = models.KnownEventType
if not session.query(KET).filter(KET.know_event_type == 'Holiday').first():
Expand Down
1 change: 1 addition & 0 deletions airflow/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -1728,6 +1728,7 @@ class ConnectionModelView(wwwutils.SuperUserMixin, AirflowModelView):
('mysql', 'MySQL',),
('postgres', 'Postgres',),
('oracle', 'Oracle',),
('vertica', 'Vertica',),
('presto', 'Presto',),
('s3', 'S3',),
('samba', 'Samba',),
Expand Down
3 changes: 3 additions & 0 deletions docs/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ Here's the list of the subpackages and what they enable:
| mssql | ``pip install airflow[mssql]`` | Microsoft SQL operators and hook, |
| | | support as an Airflow backend |
+-------------+------------------------------------+------------------------------------------------+
| vertica | ``pip install airflow[vertica]`` | Vertica hook |
| | | support as an Airflow backend |
+-------------+------------------------------------+------------------------------------------------+
| slack | ``pip install airflow[slack]`` | ``SlackAPIPostOperator`` |
+-------------+------------------------------------+------------------------------------------------+
| all | ``pip install airflow[all]`` | All Airflow features known to man |
Expand Down
4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
slack = ['slackclient>=0.15']
crypto = ['cryptography>=0.9.3']
oracle = ['cx_Oracle>=5.1.2']
vertica = ['vertica-python>=0.5.1']

all_dbs = postgres + mysql + hive + mssql + hdfs
all_dbs = postgres + mysql + hive + mssql + hdfs + vertica
devel = all_dbs + doc + samba + s3 + ['nose'] + slack + crypto + oracle

setup(
Expand Down Expand Up @@ -80,6 +81,7 @@
'slack': slack,
'crypto': crypto,
'oracle': oracle,
'vertica': vertica,
},
author='Maxime Beauchemin',
author_email='[email protected]',
Expand Down

0 comments on commit df79e59

Please sign in to comment.