diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index e89d6a861343e..f7c13352614b3 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -29,7 +29,6 @@ import attr from google.api_core.exceptions import Conflict -from google.cloud.bigquery import TableReference from airflow.exceptions import AirflowException from airflow.models import BaseOperator, BaseOperatorLink @@ -1184,6 +1183,11 @@ def execute(self, context) -> None: location=self.location, impersonation_chain=self.impersonation_chain, ) + if self.table_resource: + bq_hook.create_empty_table( + table_resource=self.table_resource, + ) + return if not self.schema_fields and self.schema_object and self.source_format != 'DATASTORE_BACKUP': gcs_hook = GCSHook( @@ -1195,36 +1199,24 @@ def execute(self, context) -> None: else: schema_fields = self.schema_fields - if schema_fields and self.table_resource: - self.table_resource["externalDataConfiguration"]["schema"] = schema_fields - - if self.table_resource: - tab_ref = TableReference.from_string(self.destination_project_dataset_table) - bq_hook.create_empty_table( - table_resource=self.table_resource, - project_id=tab_ref.project, - table_id=tab_ref.table_id, - dataset_id=tab_ref.dataset_id, - ) - else: - source_uris = [f"gs://{self.bucket}/{source_object}" for source_object in self.source_objects] - - bq_hook.create_external_table( - external_project_dataset_table=self.destination_project_dataset_table, - schema_fields=schema_fields, - source_uris=source_uris, - source_format=self.source_format, - compression=self.compression, - skip_leading_rows=self.skip_leading_rows, - field_delimiter=self.field_delimiter, - max_bad_records=self.max_bad_records, - quote_character=self.quote_character, - allow_quoted_newlines=self.allow_quoted_newlines, - allow_jagged_rows=self.allow_jagged_rows, - src_fmt_configs=self.src_fmt_configs, - labels=self.labels, - encryption_configuration=self.encryption_configuration, - ) + source_uris = [f"gs://{self.bucket}/{source_object}" for source_object in self.source_objects] + + bq_hook.create_external_table( + external_project_dataset_table=self.destination_project_dataset_table, + schema_fields=schema_fields, + source_uris=source_uris, + source_format=self.source_format, + compression=self.compression, + skip_leading_rows=self.skip_leading_rows, + field_delimiter=self.field_delimiter, + max_bad_records=self.max_bad_records, + quote_character=self.quote_character, + allow_quoted_newlines=self.allow_quoted_newlines, + allow_jagged_rows=self.allow_jagged_rows, + src_fmt_configs=self.src_fmt_configs, + labels=self.labels, + encryption_configuration=self.encryption_configuration, + ) class BigQueryDeleteDatasetOperator(BaseOperator): @@ -1492,7 +1484,6 @@ def __init__( impersonation_chain: Optional[Union[str, Sequence[str]]] = None, **kwargs, ) -> None: - self.dataset_id = dataset_id self.project_id = project_id self.gcp_conn_id = gcp_conn_id @@ -1641,7 +1632,6 @@ def __init__( impersonation_chain: Optional[Union[str, Sequence[str]]] = None, **kwargs, ) -> None: - warnings.warn( "This operator is deprecated. Please use BigQueryUpdateDatasetOperator.", DeprecationWarning,