From fb64f2ee590ccfd8656dfc163ff2bed41e821571 Mon Sep 17 00:00:00 2001 From: Kris Wilson Date: Thu, 23 Aug 2018 18:08:46 -0700 Subject: [PATCH] [TWTR][AIRFLOW-XXX] Twitter Airflow Customizations + Fixup job scheduling without explicit_defaults_for_timestamp. --- .arcconfig | 6 +++ README_TWITTER.md | 20 +++++++++ airflow/jobs.py | 7 +++- .../0e2a74e0fc9f_add_time_zone_awareness.py | 42 ++++++++++++++++--- airflow/security/kerberos.py | 15 +++---- airflow/version.py | 2 +- 6 files changed, 77 insertions(+), 15 deletions(-) create mode 100644 .arcconfig create mode 100644 README_TWITTER.md diff --git a/.arcconfig b/.arcconfig new file mode 100644 index 0000000000000..2c9c1a3db12a5 --- /dev/null +++ b/.arcconfig @@ -0,0 +1,6 @@ +{ + "arc.feature.start.default": "origin/twtr_rb_1.10.0", + "arc.land.onto.default": "twtr_rb_1.10.0", + "base": "git:merge-base(origin/twtr_rb_1.10.0), arc:amended, arc:prompt", + "history.immutable": false +} diff --git a/README_TWITTER.md b/README_TWITTER.md new file mode 100644 index 0000000000000..ffd8c8299c68f --- /dev/null +++ b/README_TWITTER.md @@ -0,0 +1,20 @@ +# Developing locally + +Here are some steps to develop this dependency locally and interact with source, interpreted from +https://confluence.twitter.biz/display/ENG/Overview%3A+Python+3rdparty+in+Source + +1. Create a git branch for this change. +2. Edit `airflow/version.py` to change the version. +3. Edit `source/3rdparty/python/BUILD` with the corresponding version. +4. Run the command `python2.7 setup.py bdist_wheel` in the `airflow` directory to build the wheel. + It will be written to `airflow/dist`. +5. Clean out the pex cache: `rm -rf ~/.pex ~/.cache/pants`. +6. Run `ps aux | grep pantsd` to find the pid of the pantsd process. +7. Run `kill $pid` where `$pid` is the the pid just observed. +8. From the `source` directory, run `./pants clean-all`. +9. Now here are the hacky parts. The `run-local.sh` and `run-aurora.sh` all run pants commands + without the option `--python-repos-repos`. You can either edit these to include this option, + or run a pants command that includes it, which will cache the local artifact you need, e.g. + `./pants test airflow:: --python-repos-repos="['file:///path/to/airflow/dist/', 'https://science-binaries.local.twitter.com/home/third_party/source/python/wheels/', 'https://science-binaries.local.twitter.com/home/third_party/source/python/bootstrap/','https://science-binaries.local.twitter.com/home/third_party/source/python/sources/']"` +10. Now you can start up airflow instances as usual with the newly built wheel! +11. See the above link for `Adding Dependencies to science-libraries`. diff --git a/airflow/jobs.py b/airflow/jobs.py index 88aa643c50152..83d459fc348be 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1317,6 +1317,11 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, task_instanc # actually enqueue them for task_instance in task_instances: simple_dag = simple_dag_bag.get_dag(task_instance.dag_id) + + path = simple_dag.full_filepath + if path.startswith(settings.DAGS_FOLDER): + path = path.replace(settings.DAGS_FOLDER, "DAGS_FOLDER", 1) + command = " ".join(TI.generate_command( task_instance.dag_id, task_instance.task_id, @@ -1328,7 +1333,7 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, task_instanc ignore_task_deps=False, ignore_ti_state=False, pool=task_instance.pool, - file_path=simple_dag.full_filepath, + file_path=path, pickle_id=simple_dag.pickle_id)) priority = task_instance.priority_weight diff --git a/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py b/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py index 64ee41c44d9e2..3eae1206641e5 100644 --- a/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py +++ b/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py @@ -33,6 +33,7 @@ from alembic import op from sqlalchemy.dialects import mysql +from sqlalchemy import text import sqlalchemy as sa @@ -40,10 +41,15 @@ def upgrade(): conn = op.get_bind() if conn.dialect.name == 'mysql': conn.execute("SET time_zone = '+00:00'") - cur = conn.execute("SELECT @@explicit_defaults_for_timestamp") - res = cur.fetchall() - if res[0][0] == 0: - raise Exception("Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql") + # @awilcox July 2018 + # we only need to worry about explicit_defaults_for_timestamp if we have + # DATETIME columns that are NOT explicitly declared with NULL + # ... and we don't, all are explicit + + # cur = conn.execute("SELECT @@explicit_defaults_for_timestamp") + # res = cur.fetchall() + # if res[0][0] == 0: + # raise Exception("Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql") op.alter_column(table_name='chart', column_name='last_modified', type_=mysql.TIMESTAMP(fsp=6)) @@ -53,7 +59,9 @@ def upgrade(): op.alter_column(table_name='dag_pickle', column_name='created_dttm', type_=mysql.TIMESTAMP(fsp=6)) - op.alter_column(table_name='dag_run', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6)) + # NOTE(kwilson): See below. + op.alter_column(table_name='dag_run', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6), + nullable=False, server_default=text('CURRENT_TIMESTAMP(6)')) op.alter_column(table_name='dag_run', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6)) op.alter_column(table_name='dag_run', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6)) @@ -76,7 +84,29 @@ def upgrade(): op.alter_column(table_name='task_fail', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6)) op.alter_column(table_name='task_fail', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6)) - op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6), nullable=False) + # NOTE(kwilson) + # + # N.B. Here (and above) we explicitly set a default to the string literal `CURRENT_TIMESTAMP(6)` to avoid the + # default MySQL behavior for TIMESTAMP without `explicit_defaults_for_timestamp` turned on as stated here: + # + # "The first TIMESTAMP column in a table, if not explicitly declared with the NULL attribute or an explicit + # DEFAULT or ON UPDATE attribute, is automatically declared with the DEFAULT CURRENT_TIMESTAMP and + # ON UPDATE CURRENT_TIMESTAMP attributes." [0] + # + # Because of the "ON UPDATE CURRENT_TIMESTAMP" default, anytime the `task_instance` table is UPDATE'd without + # explicitly re-passing the current value for the `execution_date` column, it will end up getting clobbered with + # the current timestamp value which breaks `dag_run` <-> `task_instance` alignment and causes all sorts of + # scheduler and DB integrity breakage (because `execution_date` is part of the primary key). + # + # We unfortunately cannot turn `explicit_defaults_for_timestamp` on globally ourselves as is now technically + # required by Airflow [1], because this has to be set in the my.cnf and we don't control that in managed MySQL. + # A request to enable this fleet-wide has been made in MVP-18609. + # + # [0]: https://dev.mysql.com/doc/refman/5.6/en/server-system-variables.html#sysvar_explicit_defaults_for_timestamp + # [1]: https://github.com/apache/incubator-airflow/blob/master/UPDATING.md#mysql-setting-required + + op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6), + nullable=False, server_default=text('CURRENT_TIMESTAMP(6)')) op.alter_column(table_name='task_instance', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6)) op.alter_column(table_name='task_instance', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6)) op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=mysql.TIMESTAMP(fsp=6)) diff --git a/airflow/security/kerberos.py b/airflow/security/kerberos.py index 43c9fcccdca85..0c0dcf534cb0e 100644 --- a/airflow/security/kerberos.py +++ b/airflow/security/kerberos.py @@ -60,13 +60,14 @@ def renew_from_kt(): sys.exit(subp.returncode) global NEED_KRB181_WORKAROUND - if NEED_KRB181_WORKAROUND is None: - NEED_KRB181_WORKAROUND = detect_conf_var() - if NEED_KRB181_WORKAROUND: - # (From: HUE-640). Kerberos clock have seconds level granularity. Make sure we - # renew the ticket after the initial valid time. - time.sleep(1.5) - perform_krb181_workaround() + # This breaks for twitter as we dont issue renewable tickets + # if NEED_KRB181_WORKAROUND is None: + # NEED_KRB181_WORKAROUND = detect_conf_var() + # if NEED_KRB181_WORKAROUND: + # # (From: HUE-640). Kerberos clock have seconds level granularity. Make sure we + # # renew the ticket after the initial valid time. + # time.sleep(1.5) + # perform_krb181_workaround() def perform_krb181_workaround(): diff --git a/airflow/version.py b/airflow/version.py index be4038b4af24d..8629bbdb0c4d1 100644 --- a/airflow/version.py +++ b/airflow/version.py @@ -18,4 +18,4 @@ # under the License. # -version = '1.10.0' +version = '1.10.0+twtr5'