From 1cc99e22be8e7057eed45e2bb7234e8a19127eba Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Fri, 13 Oct 2023 23:04:10 +0200 Subject: [PATCH 1/2] Fixes #34816 Change the order of operations so that async child thread is created after forking when entering daemon context. This makes sure that the thread stays alive in the internal loop. --- airflow/cli/commands/triggerer_command.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow/cli/commands/triggerer_command.py b/airflow/cli/commands/triggerer_command.py index 2288f1537fbb9..addccf99d177c 100644 --- a/airflow/cli/commands/triggerer_command.py +++ b/airflow/cli/commands/triggerer_command.py @@ -58,7 +58,6 @@ def triggerer(args): settings.MASK_SECRETS_IN_LOGS = True print(settings.HEADER) triggerer_heartrate = conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC") - triggerer_job_runner = TriggererJobRunner(job=Job(heartrate=triggerer_heartrate), capacity=args.capacity) if args.daemon: pid, stdout, stderr, log_file = setup_locations( @@ -77,10 +76,12 @@ def triggerer(args): umask=int(settings.DAEMON_UMASK, 8), ) with daemon_context, _serve_logs(args.skip_serve_logs): + triggerer_job_runner = TriggererJobRunner(job=Job(heartrate=triggerer_heartrate), capacity=args.capacity) run_job(job=triggerer_job_runner.job, execute_callable=triggerer_job_runner._execute) else: signal.signal(signal.SIGINT, sigint_handler) signal.signal(signal.SIGTERM, sigint_handler) signal.signal(signal.SIGQUIT, sigquit_handler) with _serve_logs(args.skip_serve_logs): + triggerer_job_runner = TriggererJobRunner(job=Job(heartrate=triggerer_heartrate), capacity=args.capacity) run_job(job=triggerer_job_runner.job, execute_callable=triggerer_job_runner._execute) From bba231653fe33a4a8a693eeddf93a9d06de680a9 Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Sat, 14 Oct 2023 00:08:52 +0200 Subject: [PATCH 2/2] format --- airflow/cli/commands/triggerer_command.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/airflow/cli/commands/triggerer_command.py b/airflow/cli/commands/triggerer_command.py index addccf99d177c..5ddb4e23b6633 100644 --- a/airflow/cli/commands/triggerer_command.py +++ b/airflow/cli/commands/triggerer_command.py @@ -76,12 +76,16 @@ def triggerer(args): umask=int(settings.DAEMON_UMASK, 8), ) with daemon_context, _serve_logs(args.skip_serve_logs): - triggerer_job_runner = TriggererJobRunner(job=Job(heartrate=triggerer_heartrate), capacity=args.capacity) + triggerer_job_runner = TriggererJobRunner( + job=Job(heartrate=triggerer_heartrate), capacity=args.capacity + ) run_job(job=triggerer_job_runner.job, execute_callable=triggerer_job_runner._execute) else: signal.signal(signal.SIGINT, sigint_handler) signal.signal(signal.SIGTERM, sigint_handler) signal.signal(signal.SIGQUIT, sigquit_handler) with _serve_logs(args.skip_serve_logs): - triggerer_job_runner = TriggererJobRunner(job=Job(heartrate=triggerer_heartrate), capacity=args.capacity) + triggerer_job_runner = TriggererJobRunner( + job=Job(heartrate=triggerer_heartrate), capacity=args.capacity + ) run_job(job=triggerer_job_runner.job, execute_callable=triggerer_job_runner._execute)