Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send context using in venv operator #41039

Merged
merged 44 commits into from
Aug 8, 2024
Merged

Conversation

phi-friday
Copy link
Contributor

@phi-friday phi-friday commented Jul 26, 2024

related: #34158

allow airflow.operators.python.get_current_context() in venv

limits:

  • need airflow in venv

sample dag script:

from __future__ import annotations

from pendulum import datetime

from airflow.decorators import dag, task


@dag(start_date=datetime(2022, 1, 1), schedule=None, catchup=False)
def context_sample():
    @task.virtualenv(use_airflow_context=True)
    def print_context() -> None:
        from pprint import pprint

        from airflow.operators.python import get_current_context

        context = get_current_context()
        pprint(context)

    print_context()


context_sample()

output:

c8f1266ef705
*** Found local files:
***   * /root/airflow/logs/dag_id=context_sample/run_id=manual__2024-08-08T16:37:54.976605+00:00/task_id=print_context/attempt=1.log
[2024-08-08, 16:37:55 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2024-08-08, 16:37:55 UTC] {process_utils.py:186} INFO - Executing cmd: /usr/local/bin/python -m virtualenv /tmp/venvhzgff1ag --system-site-packages --python=python
[2024-08-08, 16:37:55 UTC] {process_utils.py:190} INFO - Output:
[2024-08-08, 16:37:56 UTC] {process_utils.py:194} INFO - created virtual environment CPython3.8.19.final.0-64 in 296ms
[2024-08-08, 16:37:56 UTC] {process_utils.py:194} INFO -   creator CPython3Posix(dest=/tmp/venvhzgff1ag, clear=False, no_vcs_ignore=False, global=True)
[2024-08-08, 16:37:56 UTC] {process_utils.py:194} INFO -   seeder FromAppData(download=False, pip=bundle, setuptools=bundle, wheel=bundle, via=copy, app_data_dir=/root/.local/share/virtualenv)
[2024-08-08, 16:37:56 UTC] {process_utils.py:194} INFO -     added seed packages: pip==24.1, setuptools==70.1.0, wheel==0.43.0
[2024-08-08, 16:37:56 UTC] {process_utils.py:194} INFO -   activators BashActivator,CShellActivator,FishActivator,NushellActivator,PowerShellActivator,PythonActivator
[2024-08-08, 16:37:56 UTC] {process_utils.py:186} INFO - Executing cmd: /tmp/venvhzgff1ag/bin/pip install -r /tmp/venvhzgff1ag/requirements.txt
[2024-08-08, 16:37:56 UTC] {process_utils.py:190} INFO - Output:
[2024-08-08, 16:37:59 UTC] {process_utils.py:194} INFO - 
[2024-08-08, 16:37:59 UTC] {process_utils.py:194} INFO - [notice] A new release of pip is available: 24.1 -> 24.2
[2024-08-08, 16:37:59 UTC] {process_utils.py:194} INFO - [notice] To update, run: python -m pip install --upgrade pip
[2024-08-08, 16:38:00 UTC] {process_utils.py:186} INFO - Executing cmd: /tmp/venvhzgff1ag/bin/python /tmp/venv-callt3nisr22/script.py /tmp/venv-callt3nisr22/script.in /tmp/venv-callt3nisr22/script.out /tmp/venv-callt3nisr22/string_args.txt /tmp/venv-callt3nisr22/termination.log /tmp/venv-callt3nisr22/***_context.json
[2024-08-08, 16:38:00 UTC] {process_utils.py:190} INFO - Output:
[2024-08-08, 16:38:03 UTC] {process_utils.py:194} INFO - {'conf': '<***.configuration.AirflowConfigParser object at 0xf04f9ce6f9d0>', 'dag': <DAG: context_sample>, 'dag_run': DagRunPydantic(id=25, dag_id='context_sample', queued_at=datetime.datetime(2024, 8, 8, 16, 37, 55, 1087, tzinfo=TzInfo(UTC)), execution_date=datetime.datetime(2024, 8, 8, 16, 37, 54, 976605, tzinfo=TzInfo(UTC)), start_date=datetime.datetime(2024, 8, 8, 16, 37, 55, 187374, tzinfo=TzInfo(UTC)), end_date=None, state='running', run_id='manual__2024-08-08T16:37:54.976605+00:00', creating_job_id=None, external_trigger=True, run_type='manual', conf={}, data_interval_start=datetime.datetime(2024, 8, 8, 16, 37, 54, 976605, tzinfo=TzInfo(UTC)), data_interval_end=datetime.datetime(2024, 8, 8, 16, 37, 54, 976605, tzinfo=TzInfo(UTC)), last_scheduling_decision=datetime.datetime(2024, 8, 8, 16, 37, 55, 251889, tzinfo=TzInfo(UTC)), dag_hash='3fa8d0424231fccac18a9ca4d57850c5', updated_at=datetime.datetime(2024, 8, 8, 16, 37, 55, 253029, tzinfo=TzInfo(UTC)), dag=<DAG: context_sample>, consumed_dataset_events=[], log_template_id=2), 'data_interval_end': DateTime(2024, 8, 8, 16, 37, 54, 976605, tzinfo=Timezone('UTC')), 'data_interval_start': DateTime(2024, 8, 8, 16, 37, 54, 976605, tzinfo=Timezone('UTC')), 'outlet_events': <***.utils.context.OutletEventAccessors object at 0xf01c55562df0>, 'ds': '2024-08-08', 'ds_nodash': '20240808', 'execution_date': DateTime(2024, 8, 8, 16, 37, 54, 976605, tzinfo=Timezone('UTC')), 'expanded_ti_count': None, 'inlets': [], 'inlet_events': 'InletEventsAccessors(_inlets=[], _datasets={}, _dataset_aliases={}, _session=<sqlalchemy.orm.session.Session object at 0xf04f91c52d60>)', 'logical_date': DateTime(2024, 8, 8, 16, 37, 54, 976605, tzinfo=Timezone('UTC')), 'macros': <module '***.macros' from '/opt/***/***/macros/__init__.py'>, 'map_index_template': None, 'next_ds': '2024-08-08', 'next_ds_nodash': '20240808', 'next_execution_date': DateTime(2024, 8, 8, 16, 37, 54, 976605, tzinfo=Timezone('UTC')), 'outlets': [], 'params': {}, 'prev_data_interval_start_success': DateTime(2024, 8, 8, 14, 23, 57, 572148, tzinfo=Timezone('UTC')), 'prev_data_interval_end_success': DateTime(2024, 8, 8, 14, 23, 57, 572148, tzinfo=Timezone('UTC')), 'prev_ds': '2024-08-08', 'prev_ds_nodash': '20240808', 'prev_execution_date': DateTime(2024, 8, 8, 16, 37, 54, 976605, tzinfo=Timezone('UTC')), 'prev_execution_date_success': DateTime(2024, 8, 8, 14, 23, 57, 572148, tzinfo=Timezone('UTC')), 'prev_start_date_success': DateTime(2024, 8, 8, 14, 23, 58, 282287, tzinfo=Timezone('UTC')), 'prev_end_date_success': DateTime(2024, 8, 8, 14, 24, 8, 15923, tzinfo=Timezone('UTC')), 'run_id': 'manual__2024-08-08T16:37:54.976605+00:00', 'task_instance': TaskInstancePydantic(task_id='print_context', dag_id='context_sample', run_id='manual__2024-08-08T16:37:54.976605+00:00', map_index=-1, start_date=datetime.datetime(2024, 8, 8, 16, 37, 55, 408506, tzinfo=TzInfo(UTC)), end_date=None, execution_date=datetime.datetime(2024, 8, 8, 16, 37, 54, 976605, tzinfo=TzInfo(UTC)), duration=None, state='running', try_number=1, max_tries=0, hostname='c8f1266ef705', unixname='root', job_id=75, pool='default_pool', pool_slots=1, queue='default', priority_weight=1, operator='_PythonVirtualenvDecoratedOperator', custom_operator_name='@task.virtualenv', queued_dttm=datetime.datetime(2024, 8, 8, 16, 37, 55, 222750, tzinfo=TzInfo(UTC)), queued_by_job_id=73, pid=353, executor=None, executor_config={}, updated_at=datetime.datetime(2024, 8, 8, 16, 37, 55, 462200, tzinfo=TzInfo(UTC)), rendered_map_index=None, external_executor_id=None, trigger_id=None, trigger_timeout=None, next_method=None, next_kwargs=None, run_as_user=None, task=<Task(_PythonVirtualenvDecoratedOperator): print_context>, test_mode=False, dag_run=DagRunPydantic(id=25, dag_id='context_sample', queued_at=datetime.datetime(2024, 8, 8, 16, 37, 55, 1087, tzinfo=TzInfo(UTC)), execution_date=datetime.datetime(2024, 8, 8, 16, 37, 54, 976605, tzinfo=TzInfo(UTC)), start_date=datetime.datetime(2024, 8, 8, 16, 37, 55, 187374, tzinfo=TzInfo(UTC)), end_date=None, state='running', run_id='manual__2024-08-08T16:37:54.976605+00:00', creating_job_id=None, external_trigger=True, run_type='manual', conf={}, data_interval_start=datetime.datetime(2024, 8, 8, 16, 37, 54, 976605, tzinfo=TzInfo(UTC)), data_interval_end=datetime.datetime(2024, 8, 8, 16, 37, 54, 976605, tzinfo=TzInfo(UTC)), last_scheduling_decision=datetime.datetime(2024, 8, 8, 16, 37, 55, 251889, tzinfo=TzInfo(UTC)), dag_hash='3fa8d0424231fccac18a9ca4d57850c5', updated_at=datetime.datetime(2024, 8, 8, 16, 37, 55, 253029, tzinfo=TzInfo(UTC)), dag=<DAG: context_sample>, consumed_dataset_events=[], log_template_id=2), dag_model=DagModelPydantic(dag_id='context_sample', root_dag_id=None, is_paused_at_creation=True, is_paused=False, is_subdag=False, is_active=True, last_parsed_time=datetime.datetime(2024, 8, 8, 16, 37, 33, 404807, tzinfo=TzInfo(UTC)), last_pickled=None, last_expired=None, scheduler_lock=None, pickle_id=None, fileloc='/files/dags/context_sample.py', processor_subdir='/files/dags', owners='***', description=None, default_view='grid', schedule_interval=None, timetable_description='Never, external triggers only', tags=[], dag_owner_links=[], parent_dag=None, max_active_tasks=16, max_active_runs=16, max_consecutive_failed_dag_runs=0, has_task_concurrency_limits=False, has_import_errors=False), raw=True, is_trigger_log_context=False), 'task_instance_key_str': 'context_sample__print_context__20240808', 'test_mode': False, 'ti': TaskInstancePydantic(task_id='print_context', dag_id='context_sample', run_id='manual__2024-08-08T16:37:54.976605+00:00', map_index=-1, start_date=datetime.datetime(2024, 8, 8, 16, 37, 55, 408506, tzinfo=TzInfo(UTC)), end_date=None, execution_date=datetime.datetime(2024, 8, 8, 16, 37, 54, 976605, tzinfo=TzInfo(UTC)), duration=None, state='running', try_number=1, max_tries=0, hostname='c8f1266ef705', unixname='root', job_id=75, pool='default_pool', pool_slots=1, queue='default', priority_weight=1, operator='_PythonVirtualenvDecoratedOperator', custom_operator_name='@task.virtualenv', queued_dttm=datetime.datetime(2024, 8, 8, 16, 37, 55, 222750, tzinfo=TzInfo(UTC)), queued_by_job_id=73, pid=353, executor=None, executor_config={}, updated_at=datetime.datetime(2024, 8, 8, 16, 37, 55, 462200, tzinfo=TzInfo(UTC)), rendered_map_index=None, external_executor_id=None, trigger_id=None, trigger_timeout=None, next_method=None, next_kwargs=None, run_as_user=None, task=<Task(_PythonVirtualenvDecoratedOperator): print_context>, test_mode=False, dag_run=DagRunPydantic(id=25, dag_id='context_sample', queued_at=datetime.datetime(2024, 8, 8, 16, 37, 55, 1087, tzinfo=TzInfo(UTC)), execution_date=datetime.datetime(2024, 8, 8, 16, 37, 54, 976605, tzinfo=TzInfo(UTC)), start_date=datetime.datetime(2024, 8, 8, 16, 37, 55, 187374, tzinfo=TzInfo(UTC)), end_date=None, state='running', run_id='manual__2024-08-08T16:37:54.976605+00:00', creating_job_id=None, external_trigger=True, run_type='manual', conf={}, data_interval_start=datetime.datetime(2024, 8, 8, 16, 37, 54, 976605, tzinfo=TzInfo(UTC)), data_interval_end=datetime.datetime(2024, 8, 8, 16, 37, 54, 976605, tzinfo=TzInfo(UTC)), last_scheduling_decision=datetime.datetime(2024, 8, 8, 16, 37, 55, 251889, tzinfo=TzInfo(UTC)), dag_hash='3fa8d0424231fccac18a9ca4d57850c5', updated_at=datetime.datetime(2024, 8, 8, 16, 37, 55, 253029, tzinfo=TzInfo(UTC)), dag=<DAG: context_sample>, consumed_dataset_events=[], log_template_id=2), dag_model=DagModelPydantic(dag_id='context_sample', root_dag_id=None, is_paused_at_creation=True, is_paused=False, is_subdag=False, is_active=True, last_parsed_time=datetime.datetime(2024, 8, 8, 16, 37, 33, 404807, tzinfo=TzInfo(UTC)), last_pickled=None, last_expired=None, scheduler_lock=None, pickle_id=None, fileloc='/files/dags/context_sample.py', processor_subdir='/files/dags', owners='***', description=None, default_view='grid', schedule_interval=None, timetable_description='Never, external triggers only', tags=[], dag_owner_links=[], parent_dag=None, max_active_tasks=16, max_active_runs=16, max_consecutive_failed_dag_runs=0, has_task_concurrency_limits=False, has_import_errors=False), raw=True, is_trigger_log_context=False), 'tomorrow_ds': '2024-08-09', 'tomorrow_ds_nodash': '20240809', 'triggering_dataset_events': {}, 'ts': '2024-08-08T16:37:54.976605+00:00', 'ts_nodash': '20240808T163754', 'ts_nodash_with_tz': '20240808T163754.976605+0000', 'var': {'json': None, 'value': None}, 'conn': None, 'yesterday_ds': '2024-08-07', 'yesterday_ds_nodash': '20240807', 'task': <Task(_PythonVirtualenvDecoratedOperator): print_context>}
[2024-08-08, 16:38:03 UTC] {python.py:242} INFO - Done. Returned value was: None
[2024-08-08, 16:38:03 UTC] {taskinstance.py:340} ▶ Post task execution logs

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added the area:core-operators Operators, Sensors and hooks within Core Airflow label Jul 26, 2024
@phi-friday
Copy link
Contributor Author

The warning RemovedInAirflow3Warning is consistently occurring.
What can I modify to get it to pass...

@potiuk
Copy link
Member

potiuk commented Jul 26, 2024

The warning RemovedInAirflow3Warning is consistently occurring. What can I modify to get it to pass...

https://github.com/apache/airflow/blob/main/contributing-docs/testing/unit_tests.rst#handling-warnings

@phi-friday
Copy link
Contributor Author

The warning RemovedInAirflow3Warning is consistently occurring. What can I modify to get it to pass...

https://github.com/apache/airflow/blob/main/contributing-docs/testing/unit_tests.rst#handling-warnings

The warning seems to be occurring in a place that seems unrelated to the area I modified.

I'm not sure where to look, so can you give me a little hint?

self = <airflow.utils.decorators._autostacklevel_warn object at 0xf74179d49cd0>
message = '`soft_fail` is deprecated and will be removed in a future version. Please provide skip_policy=SkipPolicy.skip_on_soft_error instead.'
category = <class 'airflow.exceptions.RemovedInAirflow3Warning'>, stacklevel = 3
source = None

    def warn(self, message, category=None, stacklevel=1, source=None):
>       self.warnings.warn(message, category, stacklevel + 2, source)
E       airflow.exceptions.RemovedInAirflow3Warning: `soft_fail` is deprecated and will be removed in a future version. Please provide skip_policy=SkipPolicy.skip_on_soft_error instead.

airflow/utils/decorators.py:114: RemovedInAirflow3Warning
_________ TestSFTPSensor.test_sftp_failure[True-AirflowSkipException] __________

self = <tests.providers.sftp.sensors.test_sftp.TestSFTPSensor object at 0xf7414ac87100>
sftp_hook_mock = <MagicMock name='SFTPHook' id='271858442834320'>
soft_fail = True
expected_exception = <class 'airflow.exceptions.AirflowSkipException'>

    @pytest.mark.parametrize(
        "soft_fail, expected_exception", ((False, OSError), (True, AirflowSkipException))
    )
    @patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
    def test_sftp_failure(self, sftp_hook_mock, soft_fail: bool, expected_exception):
        sftp_hook_mock.return_value.get_mod_time.side_effect = OSError(SFTP_FAILURE, "SFTP failure")
>       sftp_sensor = SFTPSensor(
            task_id="unit_test", path="/path/to/file/1970-01-01.txt", soft_fail=soft_fail
        )

tests/providers/sftp/sensors/test_sftp.py:61: 

@potiuk
Copy link
Member

potiuk commented Jul 26, 2024

Always Be Rebased - this has been solved in main. Generally when in doubt, always rebase your PR. Your PR is 10 commits behind already:

image

@phi-friday
Copy link
Contributor Author

sample dag script:

from __future__ import annotations

from pendulum import datetime

from airflow.decorators import dag, task


@dag(start_date=datetime(2022, 1, 1), schedule=None, catchup=False)
def context_sample():
    @task.virtualenv(use_airflow_context=True)
    def print_context() -> None:
        from pprint import pprint

        from airflow.operators.python import get_current_context

        context = get_current_context()
        pprint(context)

    print_context()


context_sample()

output:

*** Found local files:
***   * /root/airflow/logs/dag_id=context_sample/run_id=manual__2024-07-26T14:57:31.727299+00:00/task_id=print_context/attempt=1.log
[2024-07-26, 14:57:32 UTC] {local_task_job_runner.py:123} ▼ Pre task execution logs
[2024-07-26, 14:57:32 UTC] {taskinstance.py:2559} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: context_sample.print_context manual__2024-07-26T14:57:31.727299+00:00 [queued]>
[2024-07-26, 14:57:32 UTC] {taskinstance.py:2559} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: context_sample.print_context manual__2024-07-26T14:57:31.727299+00:00 [queued]>
[2024-07-26, 14:57:32 UTC] {taskinstance.py:2812} INFO - Starting attempt 1 of 1
[2024-07-26, 14:57:33 UTC] {taskinstance.py:2835} INFO - Executing <Task(_PythonVirtualenvDecoratedOperator): print_context> on 2024-07-26 14:57:31.727299+00:00
[2024-07-26, 14:57:33 UTC] {standard_task_runner.py:72} INFO - Started process 307 to run task
[2024-07-26, 14:57:33 UTC] {standard_task_runner.py:104} INFO - Running: ['***', 'tasks', 'run', 'context_sample', 'print_context', 'manual__2024-07-26T14:57:31.727299+00:00', '--job-id', '3', '--raw', '--subdir', 'DAGS_FOLDER/context_sample.py', '--cfg-path', '/tmp/tmp8levtm38']
[2024-07-26, 14:57:33 UTC] {standard_task_runner.py:105} INFO - Job 3: Subtask print_context
[2024-07-26, 14:57:33 UTC] {task_command.py:467} INFO - Running <TaskInstance: context_sample.print_context manual__2024-07-26T14:57:31.727299+00:00 [running]> on host 2393e38fe647
[2024-07-26, 14:57:33 UTC] {taskinstance.py:3079} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='context_sample' AIRFLOW_CTX_TASK_ID='print_context' AIRFLOW_CTX_EXECUTION_DATE='2024-07-26T14:57:31.727299+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-07-26T14:57:31.727299+00:00'
[2024-07-26, 14:57:33 UTC] {taskinstance.py:732} ▲▲▲ Log group end
[2024-07-26, 14:57:33 UTC] {process_utils.py:183} INFO - Executing cmd: /usr/local/bin/python -m virtualenv /tmp/venvbvs9imwj --system-site-packages --python=python
[2024-07-26, 14:57:33 UTC] {process_utils.py:187} INFO - Output:
[2024-07-26, 14:57:33 UTC] {process_utils.py:191} INFO - created virtual environment CPython3.9.19.final.0-64 in 306ms
[2024-07-26, 14:57:33 UTC] {process_utils.py:191} INFO -   creator CPython3Posix(dest=/tmp/venvbvs9imwj, clear=False, no_vcs_ignore=False, global=True)
[2024-07-26, 14:57:33 UTC] {process_utils.py:191} INFO -   seeder FromAppData(download=False, pip=bundle, setuptools=bundle, wheel=bundle, via=copy, app_data_dir=/root/.local/share/virtualenv)
[2024-07-26, 14:57:33 UTC] {process_utils.py:191} INFO -     added seed packages: pip==24.1, setuptools==70.1.0, wheel==0.43.0
[2024-07-26, 14:57:33 UTC] {process_utils.py:191} INFO -   activators BashActivator,CShellActivator,FishActivator,NushellActivator,PowerShellActivator,PythonActivator
[2024-07-26, 14:57:33 UTC] {process_utils.py:183} INFO - Executing cmd: /tmp/venvbvs9imwj/bin/pip install -r /tmp/venvbvs9imwj/requirements.txt
[2024-07-26, 14:57:33 UTC] {process_utils.py:187} INFO - Output:
[2024-07-26, 14:57:37 UTC] {process_utils.py:191} INFO - 
[2024-07-26, 14:57:37 UTC] {process_utils.py:191} INFO - [notice] A new release of pip is available: 24.1 -> 24.1.2
[2024-07-26, 14:57:37 UTC] {process_utils.py:191} INFO - [notice] To update, run: python -m pip install --upgrade pip
[2024-07-26, 14:57:37 UTC] {process_utils.py:183} INFO - Executing cmd: /tmp/venvbvs9imwj/bin/python /tmp/venv-callq63jm1tl/script.py /tmp/venv-callq63jm1tl/script.in /tmp/venv-callq63jm1tl/script.out /tmp/venv-callq63jm1tl/string_args.txt /tmp/venv-callq63jm1tl/termination.log /tmp/venv-callq63jm1tl/***_context.json
[2024-07-26, 14:57:37 UTC] {process_utils.py:187} INFO - Output:
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO - {'dag': {'_dag_id': 'context_sample',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -          '_default_view': 'grid',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -          '_processor_dags_folder': '/files/dags',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -          '_task_group': {'_group_id': None,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                          'children': {'print_context': ['operator',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                                                         'print_context']},
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                          'downstream_group_ids': [],
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                          'downstream_task_ids': [],
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                          'prefix_group_id': True,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                          'tooltip': '',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                          'ui_color': 'CornflowerBlue',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                          'ui_fgcolor': '#000',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                          'upstream_group_ids': [],
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                          'upstream_task_ids': []},
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -          'catchup': False,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -          'dag_dependencies': [],
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -          'edge_info': {},
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -          'fileloc': '/files/dags/context_sample.py',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -          'params': [],
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -          'schedule_interval': None,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -          'start_date': 1640995200.0,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -          'tasks': [{'__type': 'operator',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                     '__var': {'_is_empty': False,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                               '_log_config_logger_name': '***.task.operators',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                               '_needs_expansion': False,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                               '_operator_name': '@task.virtualenv',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                               '_task_module': '***.decorators.python_virtualenv',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                               '_task_type': '_PythonVirtualenvDecoratedOperator',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                               'downstream_task_ids': [],
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                               'is_setup': False,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                               'is_teardown': False,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                               'on_failure_fail_dagrun': False,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                               'op_args': [],
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                               'op_kwargs': {},
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                               'pool': 'default_pool',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                               'requirements': [],
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                               'start_from_trigger': False,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                               'start_trigger_args': None,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                               'task_id': 'print_context',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                               'template_ext': ['.txt'],
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                               'template_fields': ['venv_cache_path',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                                                   'op_args',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                                                   'index_urls',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                                                   'requirements',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                                                   'templates_dict',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                                                   'op_kwargs'],
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                               'template_fields_renderers': {'op_args': 'py',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                                                             'op_kwargs': 'py',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                                                             'templates_dict': 'json'},
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                               'ui_color': '#ffefeb',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                               'ui_fgcolor': '#000',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                               'weight_rule': 'downstream'}}],
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -          'timezone': 'UTC'},
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -  'dag_run': {'clear_number': 0,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -              'conf': {},
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -              'creating_job_id': None,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -              'dag_hash': '331c7d69cf865719c0509d0b3d4e44f0',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -              'dag_id': 'context_sample',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -              'data_interval_end': '2024-07-26T14:57:31.727299+00:00',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -              'data_interval_start': '2024-07-26T14:57:31.727299+00:00',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -              'end_date': None,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -              'execution_date': '2024-07-26T14:57:31.727299+00:00',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -              'external_trigger': True,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -              'id': 1,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -              'last_scheduling_decision': '2024-07-26T14:57:32.851392+00:00',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -              'log_template_id': 2,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -              'queued_at': '2024-07-26T14:57:31.749764+00:00',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -              'run_id': 'manual__2024-07-26T14:57:31.727299+00:00',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -              'run_type': 'manual',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -              'start_date': '2024-07-26T14:57:32.796911+00:00',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -              'state': 'running',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -              'updated_at': '2024-07-26T14:57:32.853627+00:00'},
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -  'data_interval_end': '2024-07-26T14:57:31.727299+00:00',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -  'data_interval_start': '2024-07-26T14:57:31.727299+00:00',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -  'ds': '2024-07-26',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -  'ds_nodash': '20240726',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -  'expanded_ti_count': None,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -  'logical_date': '2024-07-26T14:57:31.727299+00:00',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -  'map_index_template': None,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -  'params': {},
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -  'prev_data_interval_end_success': None,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -  'prev_data_interval_start_success': None,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -  'prev_end_date_success': None,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -  'prev_start_date_success': None,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -  'run_id': 'manual__2024-07-26T14:57:31.727299+00:00',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -  'task': {'_is_empty': False,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -           '_log_config_logger_name': '***.task.operators',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -           '_needs_expansion': False,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -           '_operator_name': '@task.virtualenv',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -           '_task_module': '***.decorators.python_virtualenv',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -           '_task_type': '_PythonVirtualenvDecoratedOperator',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -           'downstream_task_ids': [],
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -           'is_setup': False,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -           'is_teardown': False,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -           'on_failure_fail_dagrun': False,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -           'op_args': [],
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -           'op_kwargs': {},
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -           'pool': 'default_pool',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -           'requirements': [],
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -           'start_from_trigger': False,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -           'start_trigger_args': None,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -           'task_id': 'print_context',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -           'template_ext': ['.txt'],
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -           'template_fields': ['venv_cache_path',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                               'op_args',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                               'index_urls',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                               'requirements',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                               'templates_dict',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                               'op_kwargs'],
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -           'template_fields_renderers': {'op_args': 'py',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                                         'op_kwargs': 'py',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                                         'templates_dict': 'json'},
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -           'ui_color': '#ffefeb',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -           'ui_fgcolor': '#000',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -           'weight_rule': 'downstream'},
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -  'task_instance': {'dag_id': 'context_sample',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                    'end_date': None,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                    'executor': None,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                    'executor_config': {},
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                    'key': ['context_sample',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                            'print_context',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                            'manual__2024-07-26T14:57:31.727299+00:00',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                            1,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                            -1],
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                    'map_index': -1,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                    'pool': 'default_pool',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                    'priority_weight': 1,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                    'queue': 'default',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                    'run_as_user': None,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                    'run_id': 'manual__2024-07-26T14:57:31.727299+00:00',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                    'start_date': '2024-07-26T14:57:32.992994+00:00',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                    'state': 'running',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                    'task_id': 'print_context',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                    'try_number': 1},
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -  'task_instance_key_str': 'context_sample__print_context__20240726',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -  'test_mode': False,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -  'ti': {'dag_id': 'context_sample',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -         'end_date': None,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -         'executor': None,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -         'executor_config': {},
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -         'key': ['context_sample',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                 'print_context',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                 'manual__2024-07-26T14:57:31.727299+00:00',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                 1,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -                 -1],
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -         'map_index': -1,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -         'pool': 'default_pool',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -         'priority_weight': 1,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -         'queue': 'default',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -         'run_as_user': None,
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -         'run_id': 'manual__2024-07-26T14:57:31.727299+00:00',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -         'start_date': '2024-07-26T14:57:32.992994+00:00',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -         'state': 'running',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -         'task_id': 'print_context',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -         'try_number': 1},
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -  'ts': '2024-07-26T14:57:31.727299+00:00',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -  'ts_nodash': '20240726T145731',
[2024-07-26, 14:57:41 UTC] {process_utils.py:191} INFO -  'ts_nodash_with_tz': '20240726T145731.727299+0000'}
[2024-07-26, 14:57:41 UTC] {python.py:240} INFO - Done. Returned value was: None
[2024-07-26, 14:57:41 UTC] {taskinstance.py:341} ▼ Post task execution logs
[2024-07-26, 14:57:41 UTC] {taskinstance.py:353} INFO - Marking task as SUCCESS. dag_id=context_sample, task_id=print_context, run_id=manual__2024-07-26T14:57:31.727299+00:00, execution_date=20240726T145731, start_date=20240726T145732, end_date=20240726T145741
[2024-07-26, 14:57:41 UTC] {local_task_job_runner.py:261} INFO - Task exited with return code 0
[2024-07-26, 14:57:41 UTC] {taskinstance.py:3847} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2024-07-26, 14:57:41 UTC] {local_task_job_runner.py:240} ▲▲▲ Log group end

@gyli
Copy link
Contributor

gyli commented Jul 31, 2024

This PR looks good to me, at least for Airflow 2.

While the down side is it creates another version of serialized context in a sort of manual way and outside of serializer. I think ideally in Airflow 3, it can utilize serde to serialize to entire context, and then translate it to a more human readable json in this case.

Copy link
Contributor

@shahar1 shahar1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall - had some minor comments in the code plus a couple of questions:

  1. Did you check that it works for both PythonVirtualenvOperator and ExternalPythonOperator? (both inherit from _BasePythonVirtualenvOperator)
  2. Is there a way to test both operators above with this feature as part of the tests in tests/operators/test_python.py?

Also ,it would be great to add a newsfragment, so other people would be aware of this feature.

airflow/utils/python_virtualenv.py Outdated Show resolved Hide resolved
airflow/utils/python_virtualenv.py Outdated Show resolved Hide resolved
airflow/operators/python.py Show resolved Hide resolved
@phi-friday
Copy link
Contributor Author

phi-friday commented Aug 3, 2024

LGTM overall - had some minor comments in the code plus a couple of questions:

  1. Did you check that it works for both PythonVirtualenvOperator and ExternalPythonOperator? (both inherit from _BasePythonVirtualenvOperator)
  2. Is there a way to test both operators above with this feature as part of the tests in tests/operators/test_python.py?

Also ,it would be great to add a newsfragment, so other people would be aware of this feature.

I moved test code from TestPythonVirtualenvOperator to BaseTestPythonVirtualenvOperator.
For BranchMixIn, the return value is fixed to list[str(task_id)], so I treated it as a special case.

Copy link
Contributor

@shahar1 shahar1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing!
To wrap it up, could you please add a newsfragment? (feature type)

@phi-friday
Copy link
Contributor Author

Amazing! To wrap it up, could you please add a newsfragment? (feature type)

Added a newsfragment.
Is this the right way to do it?

@uranusjr
Copy link
Member

uranusjr commented Aug 5, 2024

One question, why can we not always provide the context when possible? Does this have a negative effect when the context is not needed?

@phi-friday
Copy link
Contributor Author

One question, why can we not always provide the context when possible? Does this have a negative effect when the context is not needed?

I used use_airflow_context to separate them because of the serialization cost.
It's small compared to the cost of generating venv, so it doesn't seem to matter.
In any case, would it be better to provide it if possible?

@potiuk
Copy link
Member

potiuk commented Aug 5, 2024

One question, why can we not always provide the context when possible? Does this have a negative effect when the context is not needed?

I think it's a bit safer approach this way.

It could be, for example that we miss some context serialization "glitch" that will make serialization to crash. Or maybe a future version of Airflow will add a bug there, or maybe someones python code already created "MockPython" object and the script will fail to parse (not very likely but I've seen stranger things in Airflow's vast user base).

Python operators are used for various things and I think any change in their scripts should be made in a very "safe" way.

@phi-friday phi-friday force-pushed the feat-venv-context branch 2 times, most recently from 5752e4a to 4501304 Compare August 6, 2024 10:36
@shahar1 shahar1 merged commit da55393 into apache:main Aug 8, 2024
49 checks passed
@phi-friday phi-friday deleted the feat-venv-context branch August 9, 2024 04:07
ephraimbuddy added a commit to astronomer/airflow that referenced this pull request Aug 9, 2024
@phi-friday phi-friday restored the feat-venv-context branch August 9, 2024 16:37
ephraimbuddy added a commit that referenced this pull request Aug 9, 2024
ephraimbuddy added a commit that referenced this pull request Aug 9, 2024
This reverts commit da55393.

(cherry picked from commit 0a06cc6)
romsharon98 pushed a commit to romsharon98/airflow that referenced this pull request Aug 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core-operators Operators, Sensors and hooks within Core Airflow type:improvement Changelog: Improvements
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants