-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix error when create external table using table resource #17998
Changes from 4 commits
7f2c5e7
985269e
2ccf1be
3af651f
2ae0ded
9f4ce24
c9a98fd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1119,7 +1119,6 @@ def __init__( | |
# BQ config | ||
kwargs_passed = any( | ||
[ | ||
destination_project_dataset_table, | ||
schema_fields, | ||
source_format, | ||
compression, | ||
|
@@ -1184,6 +1183,11 @@ def execute(self, context) -> None: | |
location=self.location, | ||
impersonation_chain=self.impersonation_chain, | ||
) | ||
if self.table_resource: | ||
bq_hook.create_empty_table( | ||
table_resource=self.table_resource, | ||
) | ||
return | ||
|
||
if not self.schema_fields and self.schema_object and self.source_format != 'DATASTORE_BACKUP': | ||
gcs_hook = GCSHook( | ||
|
@@ -1195,36 +1199,24 @@ def execute(self, context) -> None: | |
else: | ||
schema_fields = self.schema_fields | ||
|
||
if schema_fields and self.table_resource: | ||
self.table_resource["externalDataConfiguration"]["schema"] = schema_fields | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does removing this logic break backward compatibility? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Considering it was incorrectly introduced and table creation via table resource is not properly supported, I don't think this will break backwards compatibility There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree. |
||
|
||
if self.table_resource: | ||
tab_ref = TableReference.from_string(self.destination_project_dataset_table) | ||
bq_hook.create_empty_table( | ||
table_resource=self.table_resource, | ||
project_id=tab_ref.project, | ||
table_id=tab_ref.table_id, | ||
dataset_id=tab_ref.dataset_id, | ||
) | ||
else: | ||
source_uris = [f"gs://{self.bucket}/{source_object}" for source_object in self.source_objects] | ||
|
||
bq_hook.create_external_table( | ||
external_project_dataset_table=self.destination_project_dataset_table, | ||
schema_fields=schema_fields, | ||
source_uris=source_uris, | ||
source_format=self.source_format, | ||
compression=self.compression, | ||
skip_leading_rows=self.skip_leading_rows, | ||
field_delimiter=self.field_delimiter, | ||
max_bad_records=self.max_bad_records, | ||
quote_character=self.quote_character, | ||
allow_quoted_newlines=self.allow_quoted_newlines, | ||
allow_jagged_rows=self.allow_jagged_rows, | ||
src_fmt_configs=self.src_fmt_configs, | ||
labels=self.labels, | ||
encryption_configuration=self.encryption_configuration, | ||
) | ||
source_uris = [f"gs://{self.bucket}/{source_object}" for source_object in self.source_objects] | ||
|
||
bq_hook.create_external_table( | ||
external_project_dataset_table=self.destination_project_dataset_table, | ||
schema_fields=schema_fields, | ||
source_uris=source_uris, | ||
source_format=self.source_format, | ||
compression=self.compression, | ||
skip_leading_rows=self.skip_leading_rows, | ||
field_delimiter=self.field_delimiter, | ||
max_bad_records=self.max_bad_records, | ||
quote_character=self.quote_character, | ||
allow_quoted_newlines=self.allow_quoted_newlines, | ||
allow_jagged_rows=self.allow_jagged_rows, | ||
src_fmt_configs=self.src_fmt_configs, | ||
labels=self.labels, | ||
encryption_configuration=self.encryption_configuration, | ||
) | ||
|
||
|
||
class BigQueryDeleteDatasetOperator(BaseOperator): | ||
|
@@ -1492,7 +1484,6 @@ def __init__( | |
impersonation_chain: Optional[Union[str, Sequence[str]]] = None, | ||
**kwargs, | ||
) -> None: | ||
|
||
self.dataset_id = dataset_id | ||
self.project_id = project_id | ||
self.gcp_conn_id = gcp_conn_id | ||
|
@@ -1641,7 +1632,6 @@ def __init__( | |
impersonation_chain: Optional[Union[str, Sequence[str]]] = None, | ||
**kwargs, | ||
) -> None: | ||
|
||
warnings.warn( | ||
"This operator is deprecated. Please use BigQueryUpdateDatasetOperator.", | ||
DeprecationWarning, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this should be restored now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good call