Skip to content

Commit

Permalink
Remove all deprecation warnings in providers
Browse files Browse the repository at this point in the history
There were a number of deprecation warnings when just importing
certain providers or their examples. Some of them came from still
using 1.10 classes, some from using still `apply_defaults` and
some from badly implemented fallback mechanism to support
backwards compatibility.

This has all been fixed and our documentation generation step
for providers in CI will also fail in case there are some new
DeprecationWarnings generated, which means that they will have
to be fixed before merging.

While checking that we found that BigQuery table_resource new
approach had some inconsistencies when passing new `table_resource`
parameters. Some deprecated parameters should be None by default,
otherwise you'd have to explicitly set them to None when
`table_resource` is set but when it is not set, the backwards
compatible behaviour is maintained - the parameters get the same.
default values set.

Several deprecation warnings had wrong stack-level - this has been
fixed as well.

The "legitimate" warnings are now filtered out and they are only
filtered out for the appropriate modules we know they are generated
from or in case the warnings result from direct import of the
deprecated module.
  • Loading branch information
potiuk committed Aug 30, 2021
1 parent 95279bd commit 27b1991
Show file tree
Hide file tree
Showing 34 changed files with 607 additions and 441 deletions.
2 changes: 0 additions & 2 deletions airflow/providers/amazon/aws/operators/dms_create_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.dms import DmsHook
from airflow.utils.decorators import apply_defaults


class DmsCreateTaskOperator(BaseOperator):
Expand Down Expand Up @@ -68,7 +67,6 @@ class DmsCreateTaskOperator(BaseOperator):
"create_task_kwargs": "json",
}

@apply_defaults
def __init__(
self,
*,
Expand Down
2 changes: 0 additions & 2 deletions airflow/providers/amazon/aws/operators/dms_delete_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.dms import DmsHook
from airflow.utils.decorators import apply_defaults


class DmsDeleteTaskOperator(BaseOperator):
Expand All @@ -45,7 +44,6 @@ class DmsDeleteTaskOperator(BaseOperator):
template_ext = ()
template_fields_renderers = {}

@apply_defaults
def __init__(
self,
*,
Expand Down
2 changes: 0 additions & 2 deletions airflow/providers/amazon/aws/operators/dms_describe_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.dms import DmsHook
from airflow.utils.decorators import apply_defaults


class DmsDescribeTasksOperator(BaseOperator):
Expand All @@ -41,7 +40,6 @@ class DmsDescribeTasksOperator(BaseOperator):
template_ext = ()
template_fields_renderers = {'describe_tasks_kwargs': 'json'}

@apply_defaults
def __init__(
self,
*,
Expand Down
2 changes: 0 additions & 2 deletions airflow/providers/amazon/aws/operators/dms_start_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.dms import DmsHook
from airflow.utils.decorators import apply_defaults


class DmsStartTaskOperator(BaseOperator):
Expand Down Expand Up @@ -54,7 +53,6 @@ class DmsStartTaskOperator(BaseOperator):
template_ext = ()
template_fields_renderers = {'start_task_kwargs': 'json'}

@apply_defaults
def __init__(
self,
*,
Expand Down
2 changes: 0 additions & 2 deletions airflow/providers/amazon/aws/operators/dms_stop_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.dms import DmsHook
from airflow.utils.decorators import apply_defaults


class DmsStopTaskOperator(BaseOperator):
Expand All @@ -41,7 +40,6 @@ class DmsStopTaskOperator(BaseOperator):
template_ext = ()
template_fields_renderers = {}

@apply_defaults
def __init__(
self,
*,
Expand Down
2 changes: 0 additions & 2 deletions airflow/providers/amazon/aws/operators/emr_containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.emr_containers import EMRContainerHook
from airflow.utils.decorators import apply_defaults


class EMRContainerOperator(BaseOperator):
Expand Down Expand Up @@ -63,7 +62,6 @@ class EMRContainerOperator(BaseOperator):
template_fields = ["name", "virtual_cluster_id", "execution_role_arn", "release_label", "job_driver"]
ui_color = "#f9c915"

@apply_defaults
def __init__( # pylint: disable=too-many-arguments
self,
*,
Expand Down
3 changes: 0 additions & 3 deletions airflow/providers/amazon/aws/sensors/dms_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.dms import DmsHook
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults


class DmsTaskBaseSensor(BaseSensorOperator):
Expand All @@ -45,7 +44,6 @@ class DmsTaskBaseSensor(BaseSensorOperator):
template_fields = ['replication_task_arn']
template_ext = ()

@apply_defaults
def __init__(
self,
replication_task_arn: str,
Expand Down Expand Up @@ -104,7 +102,6 @@ class DmsTaskCompletedSensor(DmsTaskBaseSensor):
template_fields = ['replication_task_arn']
template_ext = ()

@apply_defaults
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.target_statuses = ['stopped']
Expand Down
2 changes: 0 additions & 2 deletions airflow/providers/apache/drill/operators/drill.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

from airflow.models import BaseOperator
from airflow.providers.apache.drill.hooks.drill import DrillHook
from airflow.utils.decorators import apply_defaults


class DrillOperator(BaseOperator):
Expand All @@ -48,7 +47,6 @@ class DrillOperator(BaseOperator):
template_ext = ('.sql',)
ui_color = '#ededed'

@apply_defaults
def __init__(
self,
*,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
from airflow.exceptions import AirflowException
from airflow.providers.cncf.kubernetes.backcompat.pod import Port, Resources
from airflow.providers.cncf.kubernetes.backcompat.pod_runtime_info_env import PodRuntimeInfoEnv
from airflow.providers.cncf.kubernetes.backcompat.volume import Volume
from airflow.providers.cncf.kubernetes.backcompat.volume_mount import VolumeMount


def _convert_kube_model_object(obj, old_class, new_class):
Expand Down Expand Up @@ -54,6 +52,8 @@ def convert_volume(volume) -> k8s.V1Volume:
:param volume:
:return: k8s.V1Volume
"""
from airflow.providers.cncf.kubernetes.backcompat.volume import Volume

return _convert_kube_model_object(volume, Volume, k8s.V1Volume)


Expand All @@ -64,6 +64,8 @@ def convert_volume_mount(volume_mount) -> k8s.V1VolumeMount:
:param volume_mount:
:return: k8s.V1VolumeMount
"""
from airflow.providers.cncf.kubernetes.backcompat.volume_mount import VolumeMount

return _convert_kube_model_object(volume_mount, VolumeMount, k8s.V1VolumeMount)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
BigQueryDeleteTableOperator,
BigQueryGetDatasetOperator,
BigQueryGetDatasetTablesOperator,
BigQueryPatchDatasetOperator,
BigQueryUpdateDatasetOperator,
BigQueryUpdateTableOperator,
BigQueryUpdateTableSchemaOperator,
Expand Down Expand Up @@ -134,14 +133,26 @@
# [START howto_operator_bigquery_create_external_table]
create_external_table = BigQueryCreateExternalTableOperator(
task_id="create_external_table",
table_resource={
"tableReference": {
"projectId": PROJECT_ID,
"datasetId": DATASET_NAME,
"tableId": "external_table",
},
"schema": {
"fields": [
{"name": "name", "type": "STRING"},
{"name": "post_abbr", "type": "STRING"},
]
},
"externalDataConfiguration": {
"sourceFormat": "CSV",
"compression": "NONE",
"csvOptions": {"skipLeadingRows": 1},
},
},
bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
source_objects=[DATA_SAMPLE_GCS_OBJECT_NAME],
destination_project_dataset_table=f"{DATASET_NAME}.external_table",
skip_leading_rows=1,
schema_fields=[
{"name": "name", "type": "STRING"},
{"name": "post_abbr", "type": "STRING"},
],
)
# [END howto_operator_bigquery_create_external_table]

Expand Down Expand Up @@ -191,17 +202,6 @@
)
# [END howto_operator_bigquery_update_table]

# [START howto_operator_bigquery_patch_dataset]
patch_dataset = BigQueryPatchDatasetOperator(
task_id="patch_dataset",
dataset_id=DATASET_NAME,
dataset_resource={
"friendlyName": "Patched Dataset",
"description": "Patched dataset",
},
)
# [END howto_operator_bigquery_patch_dataset]

# [START howto_operator_bigquery_update_dataset]
update_dataset = BigQueryUpdateDatasetOperator(
task_id="update_dataset",
Expand All @@ -216,7 +216,7 @@
)
# [END howto_operator_bigquery_delete_dataset]

create_dataset >> patch_dataset >> update_dataset >> get_dataset >> get_dataset_result >> delete_dataset
create_dataset >> update_dataset >> get_dataset >> get_dataset_result >> delete_dataset

(
update_dataset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
BigQueryCreateEmptyDatasetOperator,
BigQueryCreateEmptyTableOperator,
BigQueryDeleteDatasetOperator,
BigQueryExecuteQueryOperator,
BigQueryGetDataOperator,
BigQueryInsertJobOperator,
BigQueryIntervalCheckOperator,
Expand Down Expand Up @@ -125,25 +124,40 @@
)
# [END howto_operator_bigquery_select_job]

execute_insert_query = BigQueryExecuteQueryOperator(
task_id="execute_insert_query", sql=INSERT_ROWS_QUERY, use_legacy_sql=False, location=location
execute_insert_query = BigQueryInsertJobOperator(
task_id="execute_insert_query",
configuration={
"query": {
"query": INSERT_ROWS_QUERY,
"useLegacySql": False,
}
},
location=location,
)

bigquery_execute_multi_query = BigQueryExecuteQueryOperator(
bigquery_execute_multi_query = BigQueryInsertJobOperator(
task_id="execute_multi_query",
sql=[
f"SELECT * FROM {DATASET_NAME}.{TABLE_2}",
f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_2}",
],
use_legacy_sql=False,
configuration={
"query": {
"query": [
f"SELECT * FROM {DATASET_NAME}.{TABLE_2}",
f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_2}",
],
"useLegacySql": False,
}
},
location=location,
)

execute_query_save = BigQueryExecuteQueryOperator(
execute_query_save = BigQueryInsertJobOperator(
task_id="execute_query_save",
sql=f"SELECT * FROM {DATASET_NAME}.{TABLE_1}",
use_legacy_sql=False,
destination_dataset_table=f"{DATASET_NAME}.{TABLE_2}",
configuration={
"query": {
"query": f"SELECT * FROM {DATASET_NAME}.{TABLE_1}",
"useLegacySql": False,
"destinationTable": f"{DATASET_NAME}.{TABLE_2}",
}
},
location=location,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
BigQueryCreateEmptyDatasetOperator,
BigQueryCreateEmptyTableOperator,
BigQueryDeleteDatasetOperator,
BigQueryExecuteQueryOperator,
BigQueryInsertJobOperator,
)
from airflow.providers.google.cloud.sensors.bigquery import (
BigQueryTableExistenceSensor,
Expand Down Expand Up @@ -80,8 +80,14 @@
)
# [END howto_sensor_bigquery_table]

execute_insert_query = BigQueryExecuteQueryOperator(
task_id="execute_insert_query", sql=INSERT_ROWS_QUERY, use_legacy_sql=False
execute_insert_query = BigQueryInsertJobOperator(
task_id="execute_insert_query",
configuration={
"query": {
"query": INSERT_ROWS_QUERY,
"useLegacySql": False,
}
},
)

# [START howto_sensor_bigquery_table_partition]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from google.cloud.datacatalog_v1beta1 import FieldType, TagField, TagTemplateField

from airflow import models
from airflow.models.baseoperator import chain
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.datacatalog import (
CloudDataCatalogCreateEntryGroupOperator,
Expand All @@ -47,7 +48,6 @@
CloudDataCatalogUpdateTagTemplateOperator,
)
from airflow.utils.dates import days_ago
from airflow.utils.helpers import chain

PROJECT_ID = "polidea-airflow"
LOCATION = "us-central1"
Expand Down
Loading

0 comments on commit 27b1991

Please sign in to comment.