Skip to content

Commit

Permalink
Sync v2-10-stable with v2-10-test to release python client v2.10.0 (#…
Browse files Browse the repository at this point in the history
…41610)

* Enable pull requests to be run from v*test branches (#41474) (#41476)

Since we switch from direct push of cherry-picking to open PRs
against v*test branch, we should enable PRs to run for the target
branch.

(cherry picked from commit a9363e6)

* Prevent provider lowest-dependency tests to run in non-main branch (#41478) (#41481)

When running tests in v2-10-test branch, lowest depenency tests
are run for providers - because when calculating separate tests,
the "skip_provider_tests" has not been used to filter them out.

This PR fixes it.

(cherry picked from commit 75da507)

* Make PROD image building works in non-main PRs (#41480) (#41484)

The PROD image building fails currently in non-main because it
attempts to build source provider packages rather than use them from
PyPi when PR is run against "v-test" branch.

This PR fixes it:

* PROD images in non-main-targetted build will pull providers from
  PyPI rather than build them
* they use PyPI constraints to install the providers
* they use UV - which should speed up building of the images

(cherry picked from commit 4d5f1c4)

* Add WebEncoder for trigger page rendering to avoid render failure (#41350) (#41485)

Co-authored-by: M. Olcay Tercanlı <[email protected]>

* Incorrect try number subtraction producing invalid span id for OTEL airflow (issue #41501) (#41502) (#41535)

* Fix for issue #39336

* removed unnecessary import

(cherry picked from commit dd3c3a7)

Co-authored-by: Howard Yoo <[email protected]>

* Fix failing pydantic v1 tests (#41534) (#41541)

We need to exclude some versions of Pydantic v1 because it conflicts
with aws provider.

(cherry picked from commit a033c5f)

* Fix Non-DB test calculation for main builds (#41499) (#41543)

Pytest has a weird behaviour that it will not collect tests
from parent folder when subfolder of it is specified after the
parent folder. This caused some non-db tests from providers folder
have been skipped during main build.

The issue in Pytest 8.2 (used to work before) is tracked at
pytest-dev/pytest#12605

(cherry picked from commit d489826)

* Add changelog for airflow python client 2.10.0 (#41583) (#41584)

* Add changelog for airflow python client 2.10.0

* Update client version

(cherry picked from commit 317a28e)

* Make all test pass in Database Isolation mode (#41567)

This adds dedicated "DatabaseIsolation" test to airflow v2-10-test
branch..

The DatabaseIsolation test will run all "db-tests" with enabled
DB isolation mode and running `internal-api` component - groups
of tests marked with "skip-if-database-isolation" will be skipped.

* Upgrade build and chart dependencies (#41570) (#41588)

(cherry picked from commit c88192c)

Co-authored-by: Jarek Potiuk <[email protected]>

* Limit watchtower as depenendcy as 3.3.0 breaks moin. (#41612)

(cherry picked from commit 1b602d5)

* Enable running Pull Requests against v2-10-stable branch (#41624)

(cherry picked from commit e306e7f)

* Fix tests/models/test_variable.py for database isolation mode (#41414)

* Fix tests/models/test_variable.py for database isolation mode

* Review feedback

(cherry picked from commit 736ebfe)

* Make latest botocore tests green (#41626)

The latest botocore tests are conflicting with a few requirements
and until apache-beam upcoming version is released we need to do
some manual exclusions. Those exclusions should make latest botocore
test green again.

(cherry picked from commit a13ccbb)

* Simpler task retrieval for taskinstance test (#41389)

The test has been updated for DB isolation but the retrieval of
task was not intuitive and it could lead to flaky tests possibly

(cherry picked from commit f25adf1)

* Skip  database isolation case for task mapping taskinstance tests (#41471)

Related: #41067
(cherry picked from commit 7718bd7)

* Skipping tests for db isolation because similar tests were skipped (#41450)

(cherry picked from commit e94b508)

---------

Co-authored-by: Jarek Potiuk <[email protected]>
Co-authored-by: Brent Bovenzi <[email protected]>
Co-authored-by: M. Olcay Tercanlı <[email protected]>
Co-authored-by: Howard Yoo <[email protected]>
Co-authored-by: Jens Scheffler <[email protected]>
Co-authored-by: Bugra Ozturk <[email protected]>
  • Loading branch information
7 people authored Aug 22, 2024
1 parent 535f27b commit 91f8265
Show file tree
Hide file tree
Showing 54 changed files with 350 additions and 98 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ on: # yamllint disable-line rule:truthy
push:
branches: ['v[0-9]+-[0-9]+-test']
pull_request:
branches: ['main']
branches: ['main', 'v[0-9]+-[0-9]+-test', 'v[0-9]+-[0-9]+-stable']
workflow_dispatch:
permissions:
# All other permissions are set to none
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/run-unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ on: # yamllint disable-line rule:truthy
required: false
default: "true"
type: string
database-isolation:
description: "Whether to enable database isolattion or not (true/false)"
required: false
default: "false"
type: string
force-lowest-dependencies:
description: "Whether to force lowest dependencies for the tests or not (true/false)"
required: false
Expand Down Expand Up @@ -152,6 +157,7 @@ jobs:
PYTHON_MAJOR_MINOR_VERSION: "${{ matrix.python-version }}"
UPGRADE_BOTO: "${{ inputs.upgrade-boto }}"
AIRFLOW_MONITOR_DELAY_TIME_IN_SECONDS: "${{inputs.monitor-delay-time-in-seconds}}"
DATABASE_ISOLATION: "${{ inputs.database-isolation }}"
VERBOSE: "true"
steps:
- name: "Cleanup repo"
Expand Down
23 changes: 23 additions & 0 deletions .github/workflows/special-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,29 @@ jobs:
run-coverage: ${{ inputs.run-coverage }}
debug-resources: ${{ inputs.debug-resources }}

tests-database-isolation:
name: "Database isolation test"
uses: ./.github/workflows/run-unit-tests.yml
permissions:
contents: read
packages: read
secrets: inherit
with:
runs-on-as-json-default: ${{ inputs.runs-on-as-json-default }}
enable-aip-44: "true"
database-isolation: "true"
test-name: "DatabaseIsolation-Postgres"
test-scope: "DB"
backend: "postgres"
image-tag: ${{ inputs.image-tag }}
python-versions: "['${{ inputs.default-python-version }}']"
backend-versions: "['${{ inputs.default-postgres-version }}']"
excludes: "[]"
parallel-test-types-list-as-string: ${{ inputs.parallel-test-types-list-as-string }}
include-success-outputs: ${{ needs.build-info.outputs.include-success-outputs }}
run-coverage: ${{ inputs.run-coverage }}
debug-resources: ${{ inputs.debug-resources }}

tests-quarantined:
name: "Quarantined test"
uses: ./.github/workflows/run-unit-tests.yml
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ ARG AIRFLOW_VERSION="2.9.3"
ARG PYTHON_BASE_IMAGE="python:3.8-slim-bookworm"

ARG AIRFLOW_PIP_VERSION=24.2
ARG AIRFLOW_UV_VERSION=0.2.34
ARG AIRFLOW_UV_VERSION=0.2.37
ARG AIRFLOW_USE_UV="false"
ARG UV_HTTP_TIMEOUT="300"
ARG AIRFLOW_IMAGE_REPOSITORY="https://github.com/apache/airflow"
Expand Down
16 changes: 9 additions & 7 deletions Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -1022,16 +1022,17 @@ function check_boto_upgrade() {
echo "${COLOR_BLUE}Upgrading boto3, botocore to latest version to run Amazon tests with them${COLOR_RESET}"
echo
# shellcheck disable=SC2086
${PACKAGING_TOOL_CMD} uninstall ${EXTRA_UNINSTALL_FLAGS} aiobotocore s3fs yandexcloud || true
${PACKAGING_TOOL_CMD} uninstall ${EXTRA_UNINSTALL_FLAGS} aiobotocore s3fs yandexcloud opensearch-py || true
# We need to include few dependencies to pass pip check with other dependencies:
# * oss2 as dependency as otherwise jmespath will be bumped (sync with alibaba provider)
# * gcloud-aio-auth limit is needed to be included as it bumps cryptography (sync with google provider)
# * cryptography is kept for snowflake-connector-python limitation (sync with snowflake provider)
# * requests needs to be limited to be compatible with apache beam (sync with apache-beam provider)
# * yandexcloud requirements for requests does not match those of apache.beam and latest botocore
# Both requests and yandexcloud exclusion above might be removed after
# https://github.com/apache/beam/issues/32080 is addressed
# When you remove yandexcloud from the above list, also remove it from "test_example_dags.py"
# in "tests/always".
# This is already addressed and planned for 2.59.0 release.
# When you remove yandexcloud and opensearch from the above list, you can also remove the
# optional providers_dependencies exclusions from "test_example_dags.py" in "tests/always".
set -x
# shellcheck disable=SC2086
${PACKAGING_TOOL_CMD} install ${EXTRA_INSTALL_FLAGS} --upgrade boto3 botocore \
Expand Down Expand Up @@ -1068,8 +1069,9 @@ function check_pydantic() {
echo
echo "${COLOR_YELLOW}Downgrading Pydantic to < 2${COLOR_RESET}"
echo
# Pydantic 1.10.17/1.10.15 conflicts with aws-sam-translator so we need to exclude it
# shellcheck disable=SC2086
${PACKAGING_TOOL_CMD} install ${EXTRA_INSTALL_FLAGS} --upgrade "pydantic<2.0.0"
${PACKAGING_TOOL_CMD} install ${EXTRA_INSTALL_FLAGS} --upgrade "pydantic<2.0.0,!=1.10.17,!=1.10.15"
pip check
else
echo
Expand Down Expand Up @@ -1310,7 +1312,7 @@ ARG DEFAULT_CONSTRAINTS_BRANCH="constraints-main"
ARG AIRFLOW_CI_BUILD_EPOCH="10"
ARG AIRFLOW_PRE_CACHED_PIP_PACKAGES="true"
ARG AIRFLOW_PIP_VERSION=24.2
ARG AIRFLOW_UV_VERSION=0.2.34
ARG AIRFLOW_UV_VERSION=0.2.37
ARG AIRFLOW_USE_UV="true"
# Setup PIP
# By default PIP install run without cache to make image smaller
Expand All @@ -1334,7 +1336,7 @@ ARG AIRFLOW_VERSION=""
ARG ADDITIONAL_PIP_INSTALL_FLAGS=""

ARG AIRFLOW_PIP_VERSION=24.2
ARG AIRFLOW_UV_VERSION=0.2.34
ARG AIRFLOW_UV_VERSION=0.2.37
ARG AIRFLOW_USE_UV="true"

ENV AIRFLOW_REPO=${AIRFLOW_REPO}\
Expand Down
15 changes: 8 additions & 7 deletions airflow/api_internal/endpoints/rpc_api_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ def initialize_method_map() -> dict[str, Callable]:
# XCom.get_many, # Not supported because it returns query
XCom.clear,
XCom.set,
Variable.set,
Variable.update,
Variable.delete,
Variable._set,
Variable._update,
Variable._delete,
DAG.fetch_callback,
DAG.fetch_dagrun,
DagRun.fetch_task_instances,
Expand Down Expand Up @@ -228,19 +228,20 @@ def internal_airflow_api(body: dict[str, Any]) -> APIResponse:
except Exception:
return log_and_build_error_response(message="Error deserializing parameters.", status=400)

log.info("Calling method %s\nparams: %s", method_name, params)
log.debug("Calling method %s\nparams: %s", method_name, params)
try:
# Session must be created there as it may be needed by serializer for lazy-loaded fields.
with create_session() as session:
output = handler(**params, session=session)
output_json = BaseSerialization.serialize(output, use_pydantic_models=True)
response = json.dumps(output_json) if output_json is not None else None
log.info("Sending response: %s", response)
log.debug("Sending response: %s", response)
return Response(response=response, headers={"Content-Type": "application/json"})
except AirflowException as e: # In case of AirflowException transport the exception class back to caller
# In case of AirflowException or other selective known types, transport the exception class back to caller
except (KeyError, AttributeError, AirflowException) as e:
exception_json = BaseSerialization.serialize(e, use_pydantic_models=True)
response = json.dumps(exception_json)
log.info("Sending exception response: %s", response)
log.debug("Sending exception response: %s", response)
return Response(response=response, headers={"Content-Type": "application/json"})
except Exception:
return log_and_build_error_response(message=f"Error executing method '{method_name}'.", status=500)
2 changes: 1 addition & 1 deletion airflow/api_internal/internal_api_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def wrapper(*args, **kwargs):
if result is None or result == b"":
return None
result = BaseSerialization.deserialize(json.loads(result), use_pydantic_models=True)
if isinstance(result, AirflowException):
if isinstance(result, (KeyError, AttributeError, AirflowException)):
raise result
return result

Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ def success(self, key: TaskInstanceKey, info=None) -> None:
span.set_attribute("dag_id", key.dag_id)
span.set_attribute("run_id", key.run_id)
span.set_attribute("task_id", key.task_id)
span.set_attribute("try_number", key.try_number - 1)
span.set_attribute("try_number", key.try_number)

self.change_state(key, TaskInstanceState.SUCCESS, info)

Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ def execute_async(
span.set_attribute("dag_id", key.dag_id)
span.set_attribute("run_id", key.run_id)
span.set_attribute("task_id", key.task_id)
span.set_attribute("try_number", key.try_number - 1)
span.set_attribute("try_number", key.try_number)
span.set_attribute("commands_to_run", str(command))

local_worker = LocalWorker(self.executor.result_queue, key=key, command=command)
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/sequential_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def execute_async(
span.set_attribute("dag_id", key.dag_id)
span.set_attribute("run_id", key.run_id)
span.set_attribute("task_id", key.task_id)
span.set_attribute("try_number", key.try_number - 1)
span.set_attribute("try_number", key.try_number)
span.set_attribute("commands_to_run", str(self.commands_to_run))

def sync(self) -> None:
Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ def _process_executor_events(self, executor: BaseExecutor, session: Session) ->
span.set_attribute("hostname", ti.hostname)
span.set_attribute("log_url", ti.log_url)
span.set_attribute("operator", str(ti.operator))
span.set_attribute("try_number", ti.try_number - 1)
span.set_attribute("try_number", ti.try_number)
span.set_attribute("executor_state", state)
span.set_attribute("job_id", ti.job_id)
span.set_attribute("pool", ti.pool)
Expand Down
3 changes: 2 additions & 1 deletion airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1516,10 +1516,11 @@ def run(
data_interval=info.data_interval,
)
ti = TaskInstance(self, run_id=dr.run_id)
session.add(ti)
ti.dag_run = dr
session.add(dr)
session.flush()

session.commit()
ti.run(
mark_success=mark_success,
ignore_depends_on_past=ignore_depends_on_past,
Expand Down
66 changes: 63 additions & 3 deletions airflow/models/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ def get(

@staticmethod
@provide_session
@internal_api_call
def set(
key: str,
value: Any,
Expand All @@ -167,6 +166,35 @@ def set(
This operation overwrites an existing variable.
:param key: Variable Key
:param value: Value to set for the Variable
:param description: Description of the Variable
:param serialize_json: Serialize the value to a JSON string
:param session: Session
"""
Variable._set(
key=key, value=value, description=description, serialize_json=serialize_json, session=session
)
# invalidate key in cache for faster propagation
# we cannot save the value set because it's possible that it's shadowed by a custom backend
# (see call to check_for_write_conflict above)
SecretCache.invalidate_variable(key)

@staticmethod
@provide_session
@internal_api_call
def _set(
key: str,
value: Any,
description: str | None = None,
serialize_json: bool = False,
session: Session = None,
) -> None:
"""
Set a value for an Airflow Variable with a given Key.
This operation overwrites an existing variable.
:param key: Variable Key
:param value: Value to set for the Variable
:param description: Description of the Variable
Expand All @@ -190,7 +218,6 @@ def set(

@staticmethod
@provide_session
@internal_api_call
def update(
key: str,
value: Any,
Expand All @@ -200,6 +227,27 @@ def update(
"""
Update a given Airflow Variable with the Provided value.
:param key: Variable Key
:param value: Value to set for the Variable
:param serialize_json: Serialize the value to a JSON string
:param session: Session
"""
Variable._update(key=key, value=value, serialize_json=serialize_json, session=session)
# We need to invalidate the cache for internal API cases on the client side
SecretCache.invalidate_variable(key)

@staticmethod
@provide_session
@internal_api_call
def _update(
key: str,
value: Any,
serialize_json: bool = False,
session: Session = None,
) -> None:
"""
Update a given Airflow Variable with the Provided value.
:param key: Variable Key
:param value: Value to set for the Variable
:param serialize_json: Serialize the value to a JSON string
Expand All @@ -219,11 +267,23 @@ def update(

@staticmethod
@provide_session
@internal_api_call
def delete(key: str, session: Session = None) -> int:
"""
Delete an Airflow Variable for a given key.
:param key: Variable Keys
"""
rows = Variable._delete(key=key, session=session)
SecretCache.invalidate_variable(key)
return rows

@staticmethod
@provide_session
@internal_api_call
def _delete(key: str, session: Session = None) -> int:
"""
Delete an Airflow Variable for a given key.
:param key: Variable Keys
"""
rows = session.execute(delete(Variable).where(Variable.key == key)).rowcount
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/amazon/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ dependencies:
- botocore>=1.34.90
- inflection>=0.5.1
# Allow a wider range of watchtower versions for flexibility among users
- watchtower>=3.0.0,<4
- watchtower>=3.0.0,!=3.3.0,<4
- jsonpath_ng>=1.5.3
- redshift_connector>=2.0.918
- sqlalchemy_redshift>=0.8.6
Expand Down
1 change: 1 addition & 0 deletions airflow/serialization/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class DagAttributeTypes(str, Enum):
RELATIVEDELTA = "relativedelta"
BASE_TRIGGER = "base_trigger"
AIRFLOW_EXC_SER = "airflow_exc_ser"
BASE_EXC_SER = "base_exc_ser"
DICT = "dict"
SET = "set"
TUPLE = "tuple"
Expand Down
16 changes: 14 additions & 2 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,15 @@ def serialize(
),
type_=DAT.AIRFLOW_EXC_SER,
)
elif isinstance(var, (KeyError, AttributeError)):
return cls._encode(
cls.serialize(
{"exc_cls_name": var.__class__.__name__, "args": [var.args], "kwargs": {}},
use_pydantic_models=use_pydantic_models,
strict=strict,
),
type_=DAT.BASE_EXC_SER,
)
elif isinstance(var, BaseTrigger):
return cls._encode(
cls.serialize(var.serialize(), use_pydantic_models=use_pydantic_models, strict=strict),
Expand Down Expand Up @@ -834,13 +843,16 @@ def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any:
return decode_timezone(var)
elif type_ == DAT.RELATIVEDELTA:
return decode_relativedelta(var)
elif type_ == DAT.AIRFLOW_EXC_SER:
elif type_ == DAT.AIRFLOW_EXC_SER or type_ == DAT.BASE_EXC_SER:
deser = cls.deserialize(var, use_pydantic_models=use_pydantic_models)
exc_cls_name = deser["exc_cls_name"]
args = deser["args"]
kwargs = deser["kwargs"]
del deser
exc_cls = import_string(exc_cls_name)
if type_ == DAT.AIRFLOW_EXC_SER:
exc_cls = import_string(exc_cls_name)
else:
exc_cls = import_string(f"builtins.{exc_cls_name}")
return exc_cls(*args, **kwargs)
elif type_ == DAT.BASE_TRIGGER:
tr_cls_name, kwargs = cls.deserialize(var, use_pydantic_models=use_pydantic_models)
Expand Down
Loading

0 comments on commit 91f8265

Please sign in to comment.