Skip to content

Commit

Permalink
fb64f2e: [TWTR][AIRFLOW-XXX] Twitter Airflow Customizations + Fixup j…
Browse files Browse the repository at this point in the history
…ob scheduling without explicit_defaults_for_timestamp (apache#6)

* fb64f2e: [TWTR][AIRFLOW-XXX] Twitter Airflow Customizations + Fixup job scheduling without explicit_defaults_for_timestamp

* reformat

* flake8 fix
  • Loading branch information
vshshjn7 authored and aoen committed Sep 11, 2019
1 parent a182839 commit 76fe7ac
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 130 deletions.
6 changes: 6 additions & 0 deletions .arcconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"arc.feature.start.default": "origin/twtr_rb_1.10.0",
"arc.land.onto.default": "twtr_rb_1.10.0",
"base": "git:merge-base(origin/twtr_rb_1.10.0), arc:amended, arc:prompt",
"history.immutable": false
}
20 changes: 20 additions & 0 deletions README_TWITTER.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Developing locally

Here are some steps to develop this dependency locally and interact with source, interpreted from
https://confluence.twitter.biz/display/ENG/Overview%3A+Python+3rdparty+in+Source

1. Create a git branch for this change.
2. Edit `airflow/version.py` to change the version.
3. Edit `source/3rdparty/python/BUILD` with the corresponding version.
4. Run the command `python2.7 setup.py bdist_wheel` in the `airflow` directory to build the wheel.
It will be written to `airflow/dist`.
5. Clean out the pex cache: `rm -rf ~/.pex ~/.cache/pants`.
6. Run `ps aux | grep pantsd` to find the pid of the pantsd process.
7. Run `kill $pid` where `$pid` is the the pid just observed.
8. From the `source` directory, run `./pants clean-all`.
9. Now here are the hacky parts. The `run-local.sh` and `run-aurora.sh` all run pants commands
without the option `--python-repos-repos`. You can either edit these to include this option,
or run a pants command that includes it, which will cache the local artifact you need, e.g.
`./pants test airflow:: --python-repos-repos="['file:///path/to/airflow/dist/', 'https://science-binaries.local.twitter.com/home/third_party/source/python/wheels/', 'https://science-binaries.local.twitter.com/home/third_party/source/python/bootstrap/','https://science-binaries.local.twitter.com/home/third_party/source/python/sources/']"`
10. Now you can start up airflow instances as usual with the newly built wheel!
11. See the above link for `Adding Dependencies to science-libraries`.
7 changes: 6 additions & 1 deletion airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,11 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag,
# actually enqueue them
for simple_task_instance in simple_task_instances:
simple_dag = simple_dag_bag.get_dag(simple_task_instance.dag_id)

path = simple_dag.full_filepath
if path.startswith(settings.DAGS_FOLDER):
path = path.replace(settings.DAGS_FOLDER, "DAGS_FOLDER", 1)

command = TI.generate_command(
simple_task_instance.dag_id,
simple_task_instance.task_id,
Expand All @@ -1106,7 +1111,7 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag,
ignore_task_deps=False,
ignore_ti_state=False,
pool=simple_task_instance.pool,
file_path=simple_dag.full_filepath,
file_path=path,
pickle_id=simple_dag.pickle_id)

priority = simple_task_instance.priority_weight
Expand Down
227 changes: 106 additions & 121 deletions airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@

from alembic import op
from sqlalchemy.dialects import mysql
from sqlalchemy import text
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "0e2a74e0fc9f"
down_revision = "d2ae31099d61"
Expand All @@ -38,128 +40,111 @@ def upgrade():
conn = op.get_bind()
if conn.dialect.name == "mysql":
conn.execute("SET time_zone = '+00:00'")
cur = conn.execute("SELECT @@explicit_defaults_for_timestamp")
res = cur.fetchall()
if res[0][0] == 0:
raise Exception(
"Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql"
)

op.alter_column(
table_name="chart",
column_name="last_modified",
type_=mysql.TIMESTAMP(fsp=6),
)

op.alter_column(
table_name="dag",
column_name="last_scheduler_run",
type_=mysql.TIMESTAMP(fsp=6),
)
op.alter_column(
table_name="dag", column_name="last_pickled", type_=mysql.TIMESTAMP(fsp=6)
)
op.alter_column(
table_name="dag", column_name="last_expired", type_=mysql.TIMESTAMP(fsp=6)
)

op.alter_column(
table_name="dag_pickle",
column_name="created_dttm",
type_=mysql.TIMESTAMP(fsp=6),
)

op.alter_column(
table_name="dag_run",
column_name="execution_date",
type_=mysql.TIMESTAMP(fsp=6),
)
op.alter_column(
table_name="dag_run", column_name="start_date", type_=mysql.TIMESTAMP(fsp=6)
)
op.alter_column(
table_name="dag_run", column_name="end_date", type_=mysql.TIMESTAMP(fsp=6)
)
# @awilcox July 2018
# we only need to worry about explicit_defaults_for_timestamp if we have
# DATETIME columns that are NOT explicitly declared with NULL
# ... and we don't, all are explicit

# cur = conn.execute("SELECT @@explicit_defaults_for_timestamp")
# res = cur.fetchall()
# if res[0][0] == 0:
# raise Exception("Global variable explicit_defaults_for_timestamp needs to be on (1)
# for mysql")

op.alter_column(table_name='chart', column_name='last_modified',
type_=mysql.TIMESTAMP(fsp=6))

op.alter_column(table_name='dag', column_name='last_scheduler_run',
type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='dag', column_name='last_pickled', type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='dag', column_name='last_expired', type_=mysql.TIMESTAMP(fsp=6))

op.alter_column(table_name='dag_pickle', column_name='created_dttm',
type_=mysql.TIMESTAMP(fsp=6))

# NOTE(kwilson): See below.
op.alter_column(table_name='dag_run', column_name='execution_date',
type_=mysql.TIMESTAMP(fsp=6), nullable=False,
server_default=text('CURRENT_TIMESTAMP(6)'))
op.alter_column(table_name='dag_run', column_name='start_date',
type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='dag_run', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))

op.alter_column(table_name='import_error', column_name='timestamp',
type_=mysql.TIMESTAMP(fsp=6))

op.alter_column(table_name='job', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='job', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='job', column_name='latest_heartbeat',
type_=mysql.TIMESTAMP(fsp=6))

op.alter_column(table_name='known_event', column_name='start_date',
type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='known_event', column_name='end_date',
type_=mysql.TIMESTAMP(fsp=6))

op.alter_column(table_name='log', column_name='dttm', type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='log', column_name='execution_date',
type_=mysql.TIMESTAMP(fsp=6))

op.alter_column(table_name='sla_miss', column_name='execution_date',
type_=mysql.TIMESTAMP(fsp=6), nullable=False)
op.alter_column(table_name='sla_miss', column_name='timestamp',
type_=mysql.TIMESTAMP(fsp=6))

op.alter_column(table_name='task_fail', column_name='execution_date',
type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='task_fail', column_name='start_date',
type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='task_fail', column_name='end_date',
type_=mysql.TIMESTAMP(fsp=6))

# NOTE(kwilson)
#
# N.B. Here (and above) we explicitly set a default to the string literal
# `CURRENT_TIMESTAMP(6)` to avoid the
# default MySQL behavior for TIMESTAMP without `explicit_defaults_for_timestamp` turned
# on as stated here:
#
# "The first TIMESTAMP column in a table, if not explicitly declared with the NULL
# attribute or an explicit
# DEFAULT or ON UPDATE attribute, is automatically declared with the DEFAULT
# CURRENT_TIMESTAMP and
# ON UPDATE CURRENT_TIMESTAMP attributes." [0]
#
# Because of the "ON UPDATE CURRENT_TIMESTAMP" default, anytime the `task_instance` table
# is UPDATE'd without
# explicitly re-passing the current value for the `execution_date` column, it will end up
# getting clobbered with
# the current timestamp value which breaks `dag_run` <-> `task_instance` alignment and
# causes all sorts of
# scheduler and DB integrity breakage (because `execution_date` is part of the primary key).
#
# We unfortunately cannot turn `explicit_defaults_for_timestamp` on globally ourselves as
# is now technically
# required by Airflow [1], because this has to be set in the my.cnf and we don't control
# that in managed MySQL.
# A request to enable this fleet-wide has been made in MVP-18609.
#
# [0]: https://dev.mysql.com/doc/refman/5.6/en/server-system-variables.html
# #sysvar_explicit_defaults_for_timestamp
# [1]: https://github.com/apache/incubator-airflow/blob/master/UPDATING.md#mysql-setting
# -required

op.alter_column(table_name='task_instance', column_name='execution_date',
type_=mysql.TIMESTAMP(fsp=6), nullable=False,
server_default=text('CURRENT_TIMESTAMP(6)'))
op.alter_column(table_name='task_instance', column_name='start_date',
type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='task_instance', column_name='end_date',
type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='task_instance', column_name='queued_dttm',
type_=mysql.TIMESTAMP(fsp=6))

op.alter_column(table_name='xcom', column_name='timestamp', type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='xcom', column_name='execution_date',
type_=mysql.TIMESTAMP(fsp=6))

op.alter_column(
table_name="import_error",
column_name="timestamp",
type_=mysql.TIMESTAMP(fsp=6),
)

op.alter_column(
table_name="job", column_name="start_date", type_=mysql.TIMESTAMP(fsp=6)
)
op.alter_column(
table_name="job", column_name="end_date", type_=mysql.TIMESTAMP(fsp=6)
)
op.alter_column(
table_name="job",
column_name="latest_heartbeat",
type_=mysql.TIMESTAMP(fsp=6),
)

op.alter_column(
table_name="log", column_name="dttm", type_=mysql.TIMESTAMP(fsp=6)
)
op.alter_column(
table_name="log", column_name="execution_date", type_=mysql.TIMESTAMP(fsp=6)
)

op.alter_column(
table_name="sla_miss",
column_name="execution_date",
type_=mysql.TIMESTAMP(fsp=6),
nullable=False,
)
op.alter_column(
table_name="sla_miss", column_name="timestamp", type_=mysql.TIMESTAMP(fsp=6)
)

op.alter_column(
table_name="task_fail",
column_name="execution_date",
type_=mysql.TIMESTAMP(fsp=6),
)
op.alter_column(
table_name="task_fail",
column_name="start_date",
type_=mysql.TIMESTAMP(fsp=6),
)
op.alter_column(
table_name="task_fail", column_name="end_date", type_=mysql.TIMESTAMP(fsp=6)
)

op.alter_column(
table_name="task_instance",
column_name="execution_date",
type_=mysql.TIMESTAMP(fsp=6),
nullable=False,
)
op.alter_column(
table_name="task_instance",
column_name="start_date",
type_=mysql.TIMESTAMP(fsp=6),
)
op.alter_column(
table_name="task_instance",
column_name="end_date",
type_=mysql.TIMESTAMP(fsp=6),
)
op.alter_column(
table_name="task_instance",
column_name="queued_dttm",
type_=mysql.TIMESTAMP(fsp=6),
)

op.alter_column(
table_name="xcom", column_name="timestamp", type_=mysql.TIMESTAMP(fsp=6)
)
op.alter_column(
table_name="xcom",
column_name="execution_date",
type_=mysql.TIMESTAMP(fsp=6),
)
else:
# sqlite and mssql datetime are fine as is. Therefore, not converting
if conn.dialect.name in ("sqlite", "mssql"):
Expand Down
15 changes: 8 additions & 7 deletions airflow/security/kerberos.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,14 @@ def renew_from_kt(principal, keytab):
sys.exit(subp.returncode)

global NEED_KRB181_WORKAROUND
if NEED_KRB181_WORKAROUND is None:
NEED_KRB181_WORKAROUND = detect_conf_var()
if NEED_KRB181_WORKAROUND:
# (From: HUE-640). Kerberos clock have seconds level granularity. Make sure we
# renew the ticket after the initial valid time.
time.sleep(1.5)
perform_krb181_workaround(principal)
# This breaks for twitter as we dont issue renewable tickets
# if NEED_KRB181_WORKAROUND is None:
# NEED_KRB181_WORKAROUND = detect_conf_var()
# if NEED_KRB181_WORKAROUND:
# # (From: HUE-640). Kerberos clock have seconds level granularity. Make sure we
# # renew the ticket after the initial valid time.
# time.sleep(1.5)
# perform_krb181_workaround()


def perform_krb181_workaround(principal):
Expand Down
2 changes: 1 addition & 1 deletion airflow/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@
# under the License.
#

version = '1.10.4'
version = '1.10.4+twtr'

0 comments on commit 76fe7ac

Please sign in to comment.