Skip to content

Commit

Permalink
Merge pull request #405 from jlowin/ftp
Browse files Browse the repository at this point in the history
Add FTPHook
  • Loading branch information
mistercrunch committed Sep 18, 2015
2 parents 7453450 + d6a2c79 commit 1fb8f5b
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 0 deletions.
1 change: 1 addition & 0 deletions airflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def __init__(self, namespace):
from airflow import hooks
from airflow import executors
from airflow import macros
from airflow import contrib

operators.integrate_plugins()
hooks.integrate_plugins()
Expand Down
2 changes: 2 additions & 0 deletions airflow/contrib/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
import airflow.contrib.hooks
import airflow.contrib.operators
11 changes: 11 additions & 0 deletions airflow/contrib/hooks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
'''
Imports the hooks dynamically while keeping the package API clean,
abstracting the underlying modules
'''
from airflow.utils import import_module_attrs as _import_module_attrs

_hooks = {
'ftp_hook': ['FTPHook'],
}

_import_module_attrs(globals(), _hooks)
197 changes: 197 additions & 0 deletions airflow/contrib/hooks/ftp_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
import datetime
import ftplib
import logging
import os.path
from airflow.hooks.base_hook import BaseHook
from past.builtins import basestring


def mlsd(conn, path="", facts=[]):
'''
BACKPORT FROM PYTHON3 FTPLIB
List a directory in a standardized format by using MLSD
command (RFC-3659). If path is omitted the current directory
is assumed. "facts" is a list of strings representing the type
of information desired (e.g. ["type", "size", "perm"]).
Return a generator object yielding a tuple of two elements
for every file found in path.
First element is the file name, the second one is a dictionary
including a variable number of "facts" depending on the server
and whether "facts" argument has been provided.
'''
if facts:
conn.sendcmd("OPTS MLST " + ";".join(facts) + ";")
if path:
cmd = "MLSD %s" % path
else:
cmd = "MLSD"
lines = []
conn.retrlines(cmd, lines.append)
for line in lines:
facts_found, _, name = line.rstrip(ftplib.CRLF).partition(' ')
entry = {}
for fact in facts_found[:-1].split(";"):
key, _, value = fact.partition("=")
entry[key.lower()] = value
yield (name, entry)


class FTPHook(BaseHook):

"""
Interact with FTP.
Errors that may occur throughout but should be handled
downstream.
"""

def __init__(self, ftp_conn_id='ftp_default'):
self.ftp_conn_id = ftp_conn_id
self.conn = None

def get_conn(self):
"""
Returns a FTP connection object
"""
if self.conn is None:
params = self.get_connection(self.ftp_conn_id)
self.conn = ftplib.FTP(params.host, params.login, params.password)

return self.conn

def close_conn(self):
"""
Closes the connection. An error will occur if the
connection wasnt ever opened.
"""
conn = self.conn
conn.quit()

def describe_directory(self, path):
"""
Returns a dictionary of {filename: {attributes}} for all files
on the remote system (where the MLSD command is supported).
:param path: full path to the remote directory
:type path: str
"""
conn = self.get_conn()
conn.cwd(path)
try:
# only works in Python 3
files = dict(conn.mlsd())
except AttributeError:
files = dict(mlsd(conn))
return files

def list_directory(self, path, nlst=False):
"""
Returns a list of files on the remote system.
:param path: full path to the remote directory to list
:type path: str
"""
conn = self.get_conn()
conn.cwd(path)

files = conn.nlst()
return files

def create_directory(self, path):
"""
Creates a directory on the remote system.
:param path: full path to the remote directory to create
:type path: str
"""
conn = self.get_conn()
conn.mkd(path)

def delete_directory(self, path):
"""
Deletes a directory on the remote system.
:param path: full path to the remote directory to delete
:type path: str
"""
conn = self.get_conn()
conn.rmd(path)

def retrieve_file(self, remote_full_path, local_full_path_or_buffer):
"""
Transfers the remote file to a local location.
If local_full_path_or_buffer is a string path, the file will be put
at that location; if it is a file-like buffer, the file will
be written to the buffer but not closed.
:param remote_full_path: full path to the remote file
:type remote_full_path: str
:param local_full_path_or_buffer: full path to the local file or a
file-like buffer
:type local_full_path: str or file-like buffer
"""
conn = self.get_conn()

is_path = isinstance(local_full_path_or_buffer, basestring)

if is_path:
output_handle = open(local_full_path_or_buffer, 'wb')
else:
output_handle = local_full_path_or_buffer

remote_path, remote_file_name = os.path.split(remote_full_path)
conn.cwd(remote_path)
logging.info('Retrieving file from FTP: {}'.format(remote_full_path))
conn.retrbinary('RETR %s' % remote_file_name, output_handle.write)
logging.info('Finished etrieving file from FTP: {}'.format(
remote_full_path))

if is_path:
output_handle.close()

def store_file(self, remote_full_path, local_full_path_or_buffer):
"""
Transfers a local file to the remote location.
If local_full_path_or_buffer is a string path, the file will be read
from that location; if it is a file-like buffer, the file will
be read from the buffer but not closed.
:param remote_full_path: full path to the remote file
:type remote_full_path: str
:param local_full_path_or_buffer: full path to the local file or a
file-like buffer
:type local_full_path_or_buffer: str or file-like buffer
"""
conn = self.get_conn()

is_path = isinstance(local_full_path_or_buffer, basestring)

if is_path:
input_handle = open(local_full_path_or_buffer, 'rb')
else:
input_handle = local_full_path_or_buffer
remote_path, remote_file_name = os.path.split(remote_full_path)
conn.cwd(remote_path)
conn.storbinary('STOR %s' % remote_file_name, input_handle)

if is_path:
input_handle.close()

def delete_file(self, path):
"""
Removes a file on the FTP Server
:param path: full path to the remote file
:type path: str
"""
conn = self.get_conn()
conn.delete(path)

def get_mod_time(self, path):
conn = self.get_conn()
ftp_mdtm = conn.sendcmd('MDTM ' + path)
return datetime.datetime.strptime(ftp_mdtm[4:], '%Y%m%d%H%M%S')
11 changes: 11 additions & 0 deletions airflow/contrib/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
'''
Imports the operators dynamically while keeping the package API clean,
abstracting the underlying modules
'''
from airflow.utils import import_module_attrs as _import_module_attrs

_operators = {
# 'example': ['ExampleOperator'],
}

_import_module_attrs(globals(), _operators)

0 comments on commit 1fb8f5b

Please sign in to comment.