From 70e5f308137364a4b76cfac65dba9f61fb604f0a Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Wed, 16 Sep 2015 15:43:55 -0400 Subject: [PATCH 1/5] add FTPHook --- airflow/hooks/__init__.py | 1 + airflow/hooks/ftp_hook.py | 197 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 198 insertions(+) create mode 100644 airflow/hooks/ftp_hook.py diff --git a/airflow/hooks/__init__.py b/airflow/hooks/__init__.py index 9bf5410affdae..45db5640845aa 100644 --- a/airflow/hooks/__init__.py +++ b/airflow/hooks/__init__.py @@ -6,6 +6,7 @@ from airflow.hooks.base_hook import BaseHook as _BaseHook _hooks = { + 'ftp_hook': ['FTPHook'], 'hive_hooks': [ 'HiveCliHook', 'HiveMetastoreHook', diff --git a/airflow/hooks/ftp_hook.py b/airflow/hooks/ftp_hook.py new file mode 100644 index 0000000000000..867b7763464c0 --- /dev/null +++ b/airflow/hooks/ftp_hook.py @@ -0,0 +1,197 @@ +import datetime +import ftplib +import logging +import os.path +from airflow.hooks.base_hook import BaseHook +from past.builtins import basestring + + +def mlsd(conn, path="", facts=[]): + ''' + BACKPORT FROM PYTHON3 FTPLIB + + List a directory in a standardized format by using MLSD + command (RFC-3659). If path is omitted the current directory + is assumed. "facts" is a list of strings representing the type + of information desired (e.g. ["type", "size", "perm"]). + + Return a generator object yielding a tuple of two elements + for every file found in path. + First element is the file name, the second one is a dictionary + including a variable number of "facts" depending on the server + and whether "facts" argument has been provided. + ''' + if facts: + conn.sendcmd("OPTS MLST " + ";".join(facts) + ";") + if path: + cmd = "MLSD %s" % path + else: + cmd = "MLSD" + lines = [] + conn.retrlines(cmd, lines.append) + for line in lines: + facts_found, _, name = line.rstrip(ftplib.CRLF).partition(' ') + entry = {} + for fact in facts_found[:-1].split(";"): + key, _, value = fact.partition("=") + entry[key.lower()] = value + yield (name, entry) + + +class FTPHook(BaseHook): + + """ + Interact with FTP. + + Errors that may occur throughout but should be handled + downstream. + """ + + def __init__(self, ftp_conn_id='ftp_default'): + self.ftp_conn_id = ftp_conn_id + self.conn = None + + def get_conn(self): + """ + Returns a FTP connection object + """ + if self.conn is None: + params = self.get_connection(self.ftp_conn_id) + self.conn = ftplib.FTP(params.host, params.login, params.password) + + return self.conn + + def close_conn(self): + """ + Closes the connection. An error will occur if the + connection wasnt ever opened. + """ + conn = self.conn + conn.quit() + + def describe_directory(self, path): + """ + Returns a dictionary of {filename: {attributes}} for all files + on the remote system (where the MLSD command is supported). + + :param path: full path to the remote directory + :type path: str + """ + conn = self.get_conn() + conn.cwd(path) + try: + # only works in Python 3 + files = dict(conn.mlsd()) + except AttributeError: + files = dict(mlsd(conn)) + return files + + def list_directory(self, path, nlst=False): + """ + Returns a list of files on the remote system. + + :param path: full path to the remote directory to list + :type path: str + """ + conn = self.get_conn() + conn.cwd(path) + + files = conn.nlst() + return files + + def create_directory(self, path): + """ + Creates a directory on the remote system. + + :param path: full path to the remote directory to create + :type path: str + """ + conn = self.get_conn() + conn.mkd(path) + + def delete_directory(self, path): + """ + Deletes a directory on the remote system. + + :param path: full path to the remote directory to delete + :type path: str + """ + conn = self.get_conn() + conn.rmd(path) + + def retrieve_file(self, remote_full_path, local_full_path_or_buffer): + """ + Transfers the remote file to a local location. + + If local_full_path_or_buffer is a string path, the file will be put + at that location; if it is a file-like buffer, the file will + be written to the buffer but not closed. + + :param remote_full_path: full path to the remote file + :type remote_full_path: str + :param local_full_path_or_buffer: full path to the local file or a + file-like buffer + :type local_full_path: str or file-like buffer + """ + conn = self.get_conn() + + is_path = isinstance(local_full_path_or_buffer, basestring) + + if is_path: + output_handle = open(local_full_path_or_buffer, 'wb') + else: + output_handle = local_full_path_or_buffer + + remote_path, remote_file_name = os.path.split(remote_full_path) + conn.cwd(remote_path) + logging.info('Retrieving file from FTP: {}'.format(remote_full_path)) + conn.retrbinary('RETR %s' % remote_file_name, output_handle.write) + logging.info('Finished etrieving file from FTP: {}'.format( + remote_full_path)) + + if is_path: + output_handle.close() + + def store_file(self, remote_full_path, local_full_path_or_buffer): + """ + Transfers a local file to the remote location. + + If local_full_path_or_buffer is a string path, the file will be read + from that location; if it is a file-like buffer, the file will + be read from the buffer but not closed. + + :param remote_full_path: full path to the remote file + :type remote_full_path: str + :param local_full_path_or_buffer: full path to the local file or a + file-like buffer + :type local_full_path_or_buffer: str or file-like buffer + """ + conn = self.get_conn() + + is_path = isinstance(local_full_path_or_buffer, basestring) + + if is_path: + input_handle = open(local_full_path_or_buffer, 'rb') + else: + input_handle = local_full_path_or_buffer + remote_path, remote_file_name = os.path.split(remote_full_path) + conn.cwd(remote_path) + conn.storbinary('STOR %s' % remote_file_name, input_handle) + + if is_path: + input_handle.close() + + def delete_file(self, path): + """ + Removes a file on the FTP Server + + :param path: full path to the remote file + :type path: str + """ + conn = self.get_conn() + conn.delete(path) + + def get_mod_time(self, path): + conn = self.get_conn() + ftp_mdtm = conn.sendcmd('MDTM ' + path) + return datetime.datetime.strptime(ftp_mdtm[4:], '%Y%m%d%H%M%S') From 813279859c7d47877e5dfd180e640413ab8559e9 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Wed, 16 Sep 2015 18:26:35 -0400 Subject: [PATCH 2/5] move FTPHook to contrib folder --- airflow/contrib/hooks/__init__.py | 11 ++ airflow/contrib/hooks/ftp_hook.py | 197 ++++++++++++++++++++++++++++++ airflow/hooks/__init__.py | 1 + 3 files changed, 209 insertions(+) create mode 100644 airflow/contrib/hooks/__init__.py create mode 100644 airflow/contrib/hooks/ftp_hook.py diff --git a/airflow/contrib/hooks/__init__.py b/airflow/contrib/hooks/__init__.py new file mode 100644 index 0000000000000..7ef723417f542 --- /dev/null +++ b/airflow/contrib/hooks/__init__.py @@ -0,0 +1,11 @@ +''' +Imports the hooks dynamically while keeping the package API clean, +abstracting the underlying modules +''' +from airflow.utils import import_module_attrs as _import_module_attrs + +_hooks = { + 'ftp_hook': ['FTPHook'], +} + +_import_module_attrs(globals(), _hooks) diff --git a/airflow/contrib/hooks/ftp_hook.py b/airflow/contrib/hooks/ftp_hook.py new file mode 100644 index 0000000000000..867b7763464c0 --- /dev/null +++ b/airflow/contrib/hooks/ftp_hook.py @@ -0,0 +1,197 @@ +import datetime +import ftplib +import logging +import os.path +from airflow.hooks.base_hook import BaseHook +from past.builtins import basestring + + +def mlsd(conn, path="", facts=[]): + ''' + BACKPORT FROM PYTHON3 FTPLIB + + List a directory in a standardized format by using MLSD + command (RFC-3659). If path is omitted the current directory + is assumed. "facts" is a list of strings representing the type + of information desired (e.g. ["type", "size", "perm"]). + + Return a generator object yielding a tuple of two elements + for every file found in path. + First element is the file name, the second one is a dictionary + including a variable number of "facts" depending on the server + and whether "facts" argument has been provided. + ''' + if facts: + conn.sendcmd("OPTS MLST " + ";".join(facts) + ";") + if path: + cmd = "MLSD %s" % path + else: + cmd = "MLSD" + lines = [] + conn.retrlines(cmd, lines.append) + for line in lines: + facts_found, _, name = line.rstrip(ftplib.CRLF).partition(' ') + entry = {} + for fact in facts_found[:-1].split(";"): + key, _, value = fact.partition("=") + entry[key.lower()] = value + yield (name, entry) + + +class FTPHook(BaseHook): + + """ + Interact with FTP. + + Errors that may occur throughout but should be handled + downstream. + """ + + def __init__(self, ftp_conn_id='ftp_default'): + self.ftp_conn_id = ftp_conn_id + self.conn = None + + def get_conn(self): + """ + Returns a FTP connection object + """ + if self.conn is None: + params = self.get_connection(self.ftp_conn_id) + self.conn = ftplib.FTP(params.host, params.login, params.password) + + return self.conn + + def close_conn(self): + """ + Closes the connection. An error will occur if the + connection wasnt ever opened. + """ + conn = self.conn + conn.quit() + + def describe_directory(self, path): + """ + Returns a dictionary of {filename: {attributes}} for all files + on the remote system (where the MLSD command is supported). + + :param path: full path to the remote directory + :type path: str + """ + conn = self.get_conn() + conn.cwd(path) + try: + # only works in Python 3 + files = dict(conn.mlsd()) + except AttributeError: + files = dict(mlsd(conn)) + return files + + def list_directory(self, path, nlst=False): + """ + Returns a list of files on the remote system. + + :param path: full path to the remote directory to list + :type path: str + """ + conn = self.get_conn() + conn.cwd(path) + + files = conn.nlst() + return files + + def create_directory(self, path): + """ + Creates a directory on the remote system. + + :param path: full path to the remote directory to create + :type path: str + """ + conn = self.get_conn() + conn.mkd(path) + + def delete_directory(self, path): + """ + Deletes a directory on the remote system. + + :param path: full path to the remote directory to delete + :type path: str + """ + conn = self.get_conn() + conn.rmd(path) + + def retrieve_file(self, remote_full_path, local_full_path_or_buffer): + """ + Transfers the remote file to a local location. + + If local_full_path_or_buffer is a string path, the file will be put + at that location; if it is a file-like buffer, the file will + be written to the buffer but not closed. + + :param remote_full_path: full path to the remote file + :type remote_full_path: str + :param local_full_path_or_buffer: full path to the local file or a + file-like buffer + :type local_full_path: str or file-like buffer + """ + conn = self.get_conn() + + is_path = isinstance(local_full_path_or_buffer, basestring) + + if is_path: + output_handle = open(local_full_path_or_buffer, 'wb') + else: + output_handle = local_full_path_or_buffer + + remote_path, remote_file_name = os.path.split(remote_full_path) + conn.cwd(remote_path) + logging.info('Retrieving file from FTP: {}'.format(remote_full_path)) + conn.retrbinary('RETR %s' % remote_file_name, output_handle.write) + logging.info('Finished etrieving file from FTP: {}'.format( + remote_full_path)) + + if is_path: + output_handle.close() + + def store_file(self, remote_full_path, local_full_path_or_buffer): + """ + Transfers a local file to the remote location. + + If local_full_path_or_buffer is a string path, the file will be read + from that location; if it is a file-like buffer, the file will + be read from the buffer but not closed. + + :param remote_full_path: full path to the remote file + :type remote_full_path: str + :param local_full_path_or_buffer: full path to the local file or a + file-like buffer + :type local_full_path_or_buffer: str or file-like buffer + """ + conn = self.get_conn() + + is_path = isinstance(local_full_path_or_buffer, basestring) + + if is_path: + input_handle = open(local_full_path_or_buffer, 'rb') + else: + input_handle = local_full_path_or_buffer + remote_path, remote_file_name = os.path.split(remote_full_path) + conn.cwd(remote_path) + conn.storbinary('STOR %s' % remote_file_name, input_handle) + + if is_path: + input_handle.close() + + def delete_file(self, path): + """ + Removes a file on the FTP Server + + :param path: full path to the remote file + :type path: str + """ + conn = self.get_conn() + conn.delete(path) + + def get_mod_time(self, path): + conn = self.get_conn() + ftp_mdtm = conn.sendcmd('MDTM ' + path) + return datetime.datetime.strptime(ftp_mdtm[4:], '%Y%m%d%H%M%S') diff --git a/airflow/hooks/__init__.py b/airflow/hooks/__init__.py index 45db5640845aa..e2e5ba085d63c 100644 --- a/airflow/hooks/__init__.py +++ b/airflow/hooks/__init__.py @@ -28,6 +28,7 @@ } _import_module_attrs(globals(), _hooks) +from airflow.contrib.hooks import * def integrate_plugins(): From 05be03c9127644edf065b87ce6fbd9b8c30345de Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Wed, 16 Sep 2015 18:28:13 -0400 Subject: [PATCH 3/5] remove FTPhook from hooks --- airflow/hooks/__init__.py | 1 - airflow/hooks/ftp_hook.py | 197 -------------------------------------- 2 files changed, 198 deletions(-) delete mode 100644 airflow/hooks/ftp_hook.py diff --git a/airflow/hooks/__init__.py b/airflow/hooks/__init__.py index e2e5ba085d63c..361cafb45ac61 100644 --- a/airflow/hooks/__init__.py +++ b/airflow/hooks/__init__.py @@ -6,7 +6,6 @@ from airflow.hooks.base_hook import BaseHook as _BaseHook _hooks = { - 'ftp_hook': ['FTPHook'], 'hive_hooks': [ 'HiveCliHook', 'HiveMetastoreHook', diff --git a/airflow/hooks/ftp_hook.py b/airflow/hooks/ftp_hook.py deleted file mode 100644 index 867b7763464c0..0000000000000 --- a/airflow/hooks/ftp_hook.py +++ /dev/null @@ -1,197 +0,0 @@ -import datetime -import ftplib -import logging -import os.path -from airflow.hooks.base_hook import BaseHook -from past.builtins import basestring - - -def mlsd(conn, path="", facts=[]): - ''' - BACKPORT FROM PYTHON3 FTPLIB - - List a directory in a standardized format by using MLSD - command (RFC-3659). If path is omitted the current directory - is assumed. "facts" is a list of strings representing the type - of information desired (e.g. ["type", "size", "perm"]). - - Return a generator object yielding a tuple of two elements - for every file found in path. - First element is the file name, the second one is a dictionary - including a variable number of "facts" depending on the server - and whether "facts" argument has been provided. - ''' - if facts: - conn.sendcmd("OPTS MLST " + ";".join(facts) + ";") - if path: - cmd = "MLSD %s" % path - else: - cmd = "MLSD" - lines = [] - conn.retrlines(cmd, lines.append) - for line in lines: - facts_found, _, name = line.rstrip(ftplib.CRLF).partition(' ') - entry = {} - for fact in facts_found[:-1].split(";"): - key, _, value = fact.partition("=") - entry[key.lower()] = value - yield (name, entry) - - -class FTPHook(BaseHook): - - """ - Interact with FTP. - - Errors that may occur throughout but should be handled - downstream. - """ - - def __init__(self, ftp_conn_id='ftp_default'): - self.ftp_conn_id = ftp_conn_id - self.conn = None - - def get_conn(self): - """ - Returns a FTP connection object - """ - if self.conn is None: - params = self.get_connection(self.ftp_conn_id) - self.conn = ftplib.FTP(params.host, params.login, params.password) - - return self.conn - - def close_conn(self): - """ - Closes the connection. An error will occur if the - connection wasnt ever opened. - """ - conn = self.conn - conn.quit() - - def describe_directory(self, path): - """ - Returns a dictionary of {filename: {attributes}} for all files - on the remote system (where the MLSD command is supported). - - :param path: full path to the remote directory - :type path: str - """ - conn = self.get_conn() - conn.cwd(path) - try: - # only works in Python 3 - files = dict(conn.mlsd()) - except AttributeError: - files = dict(mlsd(conn)) - return files - - def list_directory(self, path, nlst=False): - """ - Returns a list of files on the remote system. - - :param path: full path to the remote directory to list - :type path: str - """ - conn = self.get_conn() - conn.cwd(path) - - files = conn.nlst() - return files - - def create_directory(self, path): - """ - Creates a directory on the remote system. - - :param path: full path to the remote directory to create - :type path: str - """ - conn = self.get_conn() - conn.mkd(path) - - def delete_directory(self, path): - """ - Deletes a directory on the remote system. - - :param path: full path to the remote directory to delete - :type path: str - """ - conn = self.get_conn() - conn.rmd(path) - - def retrieve_file(self, remote_full_path, local_full_path_or_buffer): - """ - Transfers the remote file to a local location. - - If local_full_path_or_buffer is a string path, the file will be put - at that location; if it is a file-like buffer, the file will - be written to the buffer but not closed. - - :param remote_full_path: full path to the remote file - :type remote_full_path: str - :param local_full_path_or_buffer: full path to the local file or a - file-like buffer - :type local_full_path: str or file-like buffer - """ - conn = self.get_conn() - - is_path = isinstance(local_full_path_or_buffer, basestring) - - if is_path: - output_handle = open(local_full_path_or_buffer, 'wb') - else: - output_handle = local_full_path_or_buffer - - remote_path, remote_file_name = os.path.split(remote_full_path) - conn.cwd(remote_path) - logging.info('Retrieving file from FTP: {}'.format(remote_full_path)) - conn.retrbinary('RETR %s' % remote_file_name, output_handle.write) - logging.info('Finished etrieving file from FTP: {}'.format( - remote_full_path)) - - if is_path: - output_handle.close() - - def store_file(self, remote_full_path, local_full_path_or_buffer): - """ - Transfers a local file to the remote location. - - If local_full_path_or_buffer is a string path, the file will be read - from that location; if it is a file-like buffer, the file will - be read from the buffer but not closed. - - :param remote_full_path: full path to the remote file - :type remote_full_path: str - :param local_full_path_or_buffer: full path to the local file or a - file-like buffer - :type local_full_path_or_buffer: str or file-like buffer - """ - conn = self.get_conn() - - is_path = isinstance(local_full_path_or_buffer, basestring) - - if is_path: - input_handle = open(local_full_path_or_buffer, 'rb') - else: - input_handle = local_full_path_or_buffer - remote_path, remote_file_name = os.path.split(remote_full_path) - conn.cwd(remote_path) - conn.storbinary('STOR %s' % remote_file_name, input_handle) - - if is_path: - input_handle.close() - - def delete_file(self, path): - """ - Removes a file on the FTP Server - - :param path: full path to the remote file - :type path: str - """ - conn = self.get_conn() - conn.delete(path) - - def get_mod_time(self, path): - conn = self.get_conn() - ftp_mdtm = conn.sendcmd('MDTM ' + path) - return datetime.datetime.strptime(ftp_mdtm[4:], '%Y%m%d%H%M%S') From cbcee5ec980afeacef739eabc3f20fc11d38e1d4 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Fri, 18 Sep 2015 08:45:43 -0400 Subject: [PATCH 4/5] put FTPHook in contrib module --- airflow/__init__.py | 1 + airflow/contrib/__init__.py | 1 + airflow/hooks/__init__.py | 1 - 3 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 airflow/contrib/__init__.py diff --git a/airflow/__init__.py b/airflow/__init__.py index 07982f9339203..6b089af898874 100644 --- a/airflow/__init__.py +++ b/airflow/__init__.py @@ -41,6 +41,7 @@ def __init__(self, namespace): from airflow import hooks from airflow import executors from airflow import macros +from airflow import contrib operators.integrate_plugins() hooks.integrate_plugins() diff --git a/airflow/contrib/__init__.py b/airflow/contrib/__init__.py new file mode 100644 index 0000000000000..64ecae2ba5aec --- /dev/null +++ b/airflow/contrib/__init__.py @@ -0,0 +1 @@ +import airflow.contrib.hooks diff --git a/airflow/hooks/__init__.py b/airflow/hooks/__init__.py index 361cafb45ac61..9bf5410affdae 100644 --- a/airflow/hooks/__init__.py +++ b/airflow/hooks/__init__.py @@ -27,7 +27,6 @@ } _import_module_attrs(globals(), _hooks) -from airflow.contrib.hooks import * def integrate_plugins(): From cd94ada28863e5546b72664574a42f2d3e0d8d9e Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin Date: Fri, 18 Sep 2015 08:45:51 -0400 Subject: [PATCH 5/5] add contrib placeholder for operators --- airflow/contrib/__init__.py | 1 + airflow/contrib/operators/__init__.py | 11 +++++++++++ 2 files changed, 12 insertions(+) create mode 100644 airflow/contrib/operators/__init__.py diff --git a/airflow/contrib/__init__.py b/airflow/contrib/__init__.py index 64ecae2ba5aec..0f2017b2e2101 100644 --- a/airflow/contrib/__init__.py +++ b/airflow/contrib/__init__.py @@ -1 +1,2 @@ import airflow.contrib.hooks +import airflow.contrib.operators diff --git a/airflow/contrib/operators/__init__.py b/airflow/contrib/operators/__init__.py new file mode 100644 index 0000000000000..010acbb9fd6d3 --- /dev/null +++ b/airflow/contrib/operators/__init__.py @@ -0,0 +1,11 @@ +''' +Imports the operators dynamically while keeping the package API clean, +abstracting the underlying modules +''' +from airflow.utils import import_module_attrs as _import_module_attrs + +_operators = { + # 'example': ['ExampleOperator'], +} + +_import_module_attrs(globals(), _operators)