Skip to content

Commit

Permalink
Merge pull request apache#10 in TP/incubator-airflow from feature/DAT…
Browse files Browse the repository at this point in the history
…A-2826 to develop

* commit '5fce5f558dee437a7807e0878acca5fd8b408766': (36 commits)
  DATA-2826: update changelog for 1.8.1-up1.5.0
  [AIRFLOW-970] Load latest_runs on homepage async
  [AIRFLOW-XXX] Fix merge issue with test/models.py by adding execution_date
  [AIRFLOW-XXX] Set version to 1.8.1
  [AIRFLOW-492] Make sure stat updates cannot fail a task
  [AIRFLOW-1142] Do not reset orphaned state for backfills
  [AIRFLOW-XXX] Bump version to 1.8.1rc1+incubating
  [AIRFLOW-1138] Add missing licenses to files in scripts directory
  [AIRFLOW-1127] Move license notices to LICENSE
  [AIRFLOW-1121][AIRFLOW-1004] Fix `airflow webserver --pid` to write out pid file
  [AIRFLOW-1124] Do not set all tasks to scheduled in backfill
  [AIRFLOW-1120] Update version view to include Apache prefix
  [AIRFLOW-XXX] Set 1.8.1 version
  [AIRFLOW-1000] Rebrand distribution to Apache Airflow
  [AIRFLOW-1030][AIRFLOW-1] Fix hook import for HttpSensor
  [AIRFLOW-1004][AIRFLOW-276] Fix `airflow webserver -D` to run in background
  [AIRFLOW-1001] Fix landing times if there is no following schedule
  [AIRFLOW-111] Include queued tasks in scheduler concurrency check
  [AIRFLOW-1035] Use binary exponential backoff
  [AIRFLOW-1085] Enhance the SparkSubmitOperator
  ...
  • Loading branch information
Daniel Huang committed May 15, 2017
2 parents 9faec8b + 5fce5f5 commit 569365e
Show file tree
Hide file tree
Showing 56 changed files with 2,142 additions and 730 deletions.
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ docs
dist
build
airflow.egg-info
apache_airflow.egg-info
.idea
metastore_db
.*sql
Expand Down
343 changes: 343 additions & 0 deletions LICENSE

Large diffs are not rendered by default.

17 changes: 1 addition & 16 deletions NOTICE
Original file line number Diff line number Diff line change
@@ -1,20 +1,5 @@
Apache Airflow
Copyright 2011-2016 The Apache Software Foundation
Copyright 2011-2017 The Apache Software Foundation

This product includes software developed at The Apache Software
Foundation (http://www.apache.org/).

This product includes jQuery (http://jquery.org - MIT license), Copyright © 2016, John Resig.
This product includes Parallel Coordinates (https://syntagmatic.github.io/parallel-coordinates), Copyright (c) 2012, Kai Chang.
This product includes WebGL-2D.js (https://github.com/gameclosure/webgl-2d), Copyright (c) 2010 Corban Brook.
This product includes Bootstrap (http://getbootstrap.com - MIT license), Copyright (c) 2011-2016 Twitter, Inc.
This product includes Bootstrap Toggle (http://www.bootstraptoggle.com - MIT license), Copyright 2014 Min Hur, The New York Times Company.
This product includes Clock plugin (https://github.com/Lwangaman/jQuery-Clock-Plugin - Dual licensed under the MIT and GPL licenses), Copyright (c) 2010 John R D'Orazio ([email protected])
This product includes DataTables (datatables.net - MIT License), Copyright © 2008-2015 SpryMedia Ltd.
This product includes Underscore.js (http://underscorejs.org - MIT license), Copyright (c) 2011-2013 Jeremy Ashkenas, DocumentCloud and Investigative Reporters & Editors.
This product includes FooTable (http://fooplugins.com/plugins/footable-jquery/ - MIT license), Copyright 2013 Steven Usher & Brad Vincent.
This product includes dagre (https://github.com/cpettitt/dagre - MIT license), Copyright (c) 2012-2014 Chris Pettitt.
This product includes d3js (https://d3js.org/ - BSD License), Copyright (c) 2010-2016, Michael Bostock.
This product includes flask-kerberos (https://github.com/mkomitee/flask-kerberos - BSD License), Copyright (c) 2013, Michael Komitee
This product includes ace (https://github.com/ajaxorg/ace - BSD License), Copyright (c) 2010, Ajax.org B.V.
This product includes d3 tip (https://github.com/Caged/d3-tip - MIT License), Copyright (c) 2013-2017 Justin Palmer
65 changes: 54 additions & 11 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -769,9 +769,14 @@ def webserver(args):
"Starting the web server on port {0} and host {1}.".format(
args.port, args.hostname))
app.run(debug=True, port=args.port, host=args.hostname,
ssl_context=(ssl_cert, ssl_key))
ssl_context=(ssl_cert, ssl_key) if ssl_cert and ssl_key else None)
else:
pid, stdout, stderr, log_file = setup_locations("webserver", pid=args.pid)
pid, stdout, stderr, log_file = setup_locations("webserver", args.pid, args.stdout, args.stderr, args.log_file)
if args.daemon:
handle = setup_logging(log_file)
stdout = open(stdout, 'w+')
stderr = open(stderr, 'w+')

print(
textwrap.dedent('''\
Running the Gunicorn Server with:
Expand Down Expand Up @@ -800,29 +805,67 @@ def webserver(args):
run_args += ['--error-logfile', str(args.error_logfile)]

if args.daemon:
run_args += ["-D"]
run_args += ['-D']

if ssl_cert:
run_args += ['--certfile', ssl_cert, '--keyfile', ssl_key]

run_args += ["airflow.www.app:cached_app()"]

env = os.environ.copy()
gunicorn_master_proc = subprocess.Popen(run_args, env=env)
gunicorn_master_proc = None

def kill_proc(dummy_signum, dummy_frame):
gunicorn_master_proc.terminate()
gunicorn_master_proc.wait()
sys.exit(0)

signal.signal(signal.SIGINT, kill_proc)
signal.signal(signal.SIGTERM, kill_proc)
def monitor_gunicorn(gunicorn_master_proc):
# These run forever until SIG{INT, TERM, KILL, ...} signal is sent
if conf.getint('webserver', 'worker_refresh_interval') > 0:
restart_workers(gunicorn_master_proc, num_workers)
else:
while True:
time.sleep(1)

# These run forever until SIG{INT, TERM, KILL, ...} signal is sent
if conf.getint('webserver', 'worker_refresh_interval') > 0:
restart_workers(gunicorn_master_proc, num_workers)
if args.daemon:
base, ext = os.path.splitext(pid)
ctx = daemon.DaemonContext(
pidfile=TimeoutPIDLockFile(base + "-monitor" + ext, -1),
files_preserve=[handle],
stdout=stdout,
stderr=stderr,
signal_map={
signal.SIGINT: kill_proc,
signal.SIGTERM: kill_proc
},
)
with ctx:
subprocess.Popen(run_args)

# Reading pid file directly, since Popen#pid doesn't
# seem to return the right value with DaemonContext.
while True:
try:
with open(pid) as f:
gunicorn_master_proc_pid = int(f.read())
break
except IOError:
logging.debug("Waiting for gunicorn's pid file to be created.")
time.sleep(0.1)

gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid)
monitor_gunicorn(gunicorn_master_proc)

stdout.close()
stderr.close()
else:
while True:
time.sleep(1)
gunicorn_master_proc = subprocess.Popen(run_args, env=env)

signal.signal(signal.SIGINT, kill_proc)
signal.signal(signal.SIGTERM, kill_proc)

monitor_gunicorn(gunicorn_master_proc)


def scheduler(args):
Expand Down
32 changes: 27 additions & 5 deletions airflow/contrib/hooks/spark_submit_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
#
import logging
import os
import subprocess
import re

Expand All @@ -25,7 +26,8 @@
class SparkSubmitHook(BaseHook):
"""
This hook is a wrapper around the spark-submit binary to kick off a spark-submit job.
It requires that the "spark-submit" binary is in the PATH.
It requires that the "spark-submit" binary is in the PATH or the spark_home to be
supplied.
:param conf: Arbitrary Spark configuration properties
:type conf: dict
:param conn_id: The connection id as configured in Airflow administration. When an
Expand All @@ -38,10 +40,14 @@ class SparkSubmitHook(BaseHook):
:type py_files: str
:param jars: Submit additional jars to upload and place them in executor classpath.
:type jars: str
:param java_class: the main class of the Java application
:type java_class: str
:param executor_cores: Number of cores per executor (Default: 2)
:type executor_cores: int
:param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G)
:type executor_memory: str
:param driver_memory: Memory allocated to the driver (e.g. 1000M, 2G) (Default: 1G)
:type driver_memory: str
:param keytab: Full path to the file that contains the keytab
:type keytab: str
:param principal: The name of the kerberos principal used for keytab
Expand All @@ -60,8 +66,10 @@ def __init__(self,
files=None,
py_files=None,
jars=None,
java_class=None,
executor_cores=None,
executor_memory=None,
driver_memory=None,
keytab=None,
principal=None,
name='default-name',
Expand All @@ -72,8 +80,10 @@ def __init__(self,
self._files = files
self._py_files = py_files
self._jars = jars
self._java_class = java_class
self._executor_cores = executor_cores
self._executor_memory = executor_memory
self._driver_memory = driver_memory
self._keytab = keytab
self._principal = principal
self._name = name
Expand All @@ -82,14 +92,15 @@ def __init__(self,
self._sp = None
self._yarn_application_id = None

(self._master, self._queue, self._deploy_mode) = self._resolve_connection()
(self._master, self._queue, self._deploy_mode, self._spark_home) = self._resolve_connection()
self._is_yarn = 'yarn' in self._master

def _resolve_connection(self):
# Build from connection master or default to yarn if not available
master = 'yarn'
queue = None
deploy_mode = None
spark_home = None

try:
# Master can be local, yarn, spark://HOST:PORT or mesos://HOST:PORT
Expand All @@ -105,14 +116,16 @@ def _resolve_connection(self):
queue = extra['queue']
if 'deploy-mode' in extra:
deploy_mode = extra['deploy-mode']
if 'spark-home' in extra:
spark_home = extra['spark-home']
except AirflowException:
logging.debug(
"Could not load connection string {}, defaulting to {}".format(
self._conn_id, master
)
)

return master, queue, deploy_mode
return master, queue, deploy_mode, spark_home

def get_conn(self):
pass
Expand All @@ -124,8 +137,13 @@ def _build_command(self, application):
:type application: str
:return: full command to be executed
"""
# The spark-submit binary needs to be in the path
connection_cmd = ["spark-submit"]
# If the spark_home is passed then build the spark-submit executable path using
# the spark_home; otherwise assume that spark-submit is present in the path to
# the executing user
if self._spark_home:
connection_cmd = [os.path.join(self._spark_home, 'bin', 'spark-submit')]
else:
connection_cmd = ['spark-submit']

# The url ot the spark master
connection_cmd += ["--master", self._master]
Expand All @@ -145,12 +163,16 @@ def _build_command(self, application):
connection_cmd += ["--executor-cores", str(self._executor_cores)]
if self._executor_memory:
connection_cmd += ["--executor-memory", self._executor_memory]
if self._driver_memory:
connection_cmd += ["--driver-memory", self._driver_memory]
if self._keytab:
connection_cmd += ["--keytab", self._keytab]
if self._principal:
connection_cmd += ["--principal", self._principal]
if self._name:
connection_cmd += ["--name", self._name]
if self._java_class:
connection_cmd += ["--class", self._java_class]
if self._verbose:
connection_cmd += ["--verbose"]
if self._queue:
Expand Down
13 changes: 12 additions & 1 deletion airflow/contrib/operators/spark_submit_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
class SparkSubmitOperator(BaseOperator):
"""
This hook is a wrapper around the spark-submit binary to kick off a spark-submit job.
It requires that the "spark-submit" binary is in the PATH.
It requires that the "spark-submit" binary is in the PATH or the spark-home is set
in the extra on the connection.
:param application: The application that submitted as a job, either jar or py file.
:type application: str
:param conf: Arbitrary Spark configuration properties
Expand All @@ -39,10 +40,14 @@ class SparkSubmitOperator(BaseOperator):
:type py_files: str
:param jars: Submit additional jars to upload and place them in executor classpath.
:type jars: str
:param java_class: the main class of the Java application
:type java_class: str
:param executor_cores: Number of cores per executor (Default: 2)
:type executor_cores: int
:param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G)
:type executor_memory: str
:param driver_memory: Memory allocated to the driver (e.g. 1000M, 2G) (Default: 1G)
:type driver_memory: str
:param keytab: Full path to the file that contains the keytab
:type keytab: str
:param principal: The name of the kerberos principal used for keytab
Expand All @@ -63,8 +68,10 @@ def __init__(self,
files=None,
py_files=None,
jars=None,
java_class=None,
executor_cores=None,
executor_memory=None,
driver_memory=None,
keytab=None,
principal=None,
name='airflow-spark',
Expand All @@ -78,8 +85,10 @@ def __init__(self,
self._files = files
self._py_files = py_files
self._jars = jars
self._java_class = java_class
self._executor_cores = executor_cores
self._executor_memory = executor_memory
self._driver_memory = driver_memory
self._keytab = keytab
self._principal = principal
self._name = name
Expand All @@ -98,8 +107,10 @@ def execute(self, context):
files=self._files,
py_files=self._py_files,
jars=self._jars,
java_class=self._java_class,
executor_cores=self._executor_cores,
executor_memory=self._executor_memory,
driver_memory=self._driver_memory,
keytab=self._keytab,
principal=self._principal,
name=self._name,
Expand Down
10 changes: 7 additions & 3 deletions airflow/hooks/mssql_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@


class MsSqlHook(DbApiHook):
'''
"""
Interact with Microsoft SQL Server.
'''
"""

conn_name_attr = 'mssql_conn_id'
default_conn_name = 'mssql_default'
supports_autocommit = True

def __init__(self, *args, **kwargs):
super(MsSqlHook, self).__init__(*args, **kwargs)
self.schema = kwargs.pop("schema", None)

def get_conn(self):
"""
Returns a mssql connection object
Expand All @@ -35,7 +39,7 @@ def get_conn(self):
server=conn.host,
user=conn.login,
password=conn.password,
database=conn.schema,
database=self.schema or conn.schema,
port=conn.port)
return conn

Expand Down
15 changes: 9 additions & 6 deletions airflow/hooks/mysql_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,39 @@


class MySqlHook(DbApiHook):
'''
"""
Interact with MySQL.
You can specify charset in the extra field of your connection
as ``{"charset": "utf8"}``. Also you can choose cursor as
``{"cursor": "SSCursor"}``. Refer to the MySQLdb.cursors for more details.
'''
"""

conn_name_attr = 'mysql_conn_id'
default_conn_name = 'mysql_default'
supports_autocommit = True

def __init__(self, *args, **kwargs):
super(MySqlHook, self).__init__(*args, **kwargs)
self.schema = kwargs.pop("schema", None)

def get_conn(self):
"""
Returns a mysql connection object
"""
conn = self.get_connection(self.mysql_conn_id)
conn_config = {
"user": conn.login,
"passwd": conn.password or ''
"passwd": conn.password or '',
"host": conn.host or 'localhost',
"db": self.schema or conn.schema or ''
}

conn_config["host"] = conn.host or 'localhost'
if not conn.port:
conn_config["port"] = 3306
else:
conn_config["port"] = int(conn.port)

conn_config["db"] = conn.schema or ''

if conn.extra_dejson.get('charset', False):
conn_config["charset"] = conn.extra_dejson["charset"]
if (conn_config["charset"]).lower() == 'utf8' or\
Expand Down
4 changes: 2 additions & 2 deletions airflow/hooks/postgres_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@


class PostgresHook(DbApiHook):
'''
"""
Interact with Postgres.
You can specify ssl parameters in the extra field of your connection
as ``{"sslmode": "require", "sslcert": "/path/to/cert.pem", etc}``.
'''
"""
conn_name_attr = 'postgres_conn_id'
default_conn_name = 'postgres_default'
supports_autocommit = True
Expand Down
Loading

0 comments on commit 569365e

Please sign in to comment.