Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix setting project_id for gs to bq and bq to gs #30053

Merged
merged 8 commits into from
Mar 20, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def _submit_job(

return hook.insert_job(
configuration=configuration,
project_id=hook.project_id,
project_id=configuration["extract"]["sourceTable"]["projectId"],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yaro1 May I know the reason why we hard code the project id from sourceTable? We got the issue when we try to extract data from Project A but we want to submit job by using our own Project B. This line does not allow us to use our default project id.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sleepy-tiger I agree, we also have this issue. I think the original bug report was based on a misunderstanding of the error, and while this fix does incidentally support the reporter's gcp configuration, I think that is mostly an accident, and it also breaks many other use cases.

see also https://github.com/apache/airflow/pull/30053/files#diff-875bf3d1bfbba7067dc754732c0e416b8ebe7a5b722bc9ac428b98934f04a16fR512 and https://github.com/apache/airflow/pull/30053/files#diff-875bf3d1bfbba7067dc754732c0e416b8ebe7a5b722bc9ac428b98934f04a16fR587, which override the project_id that the user passes in, making it impossible to use a project_id other than what is specified in the source or destination tables. In general, more clarity is needed in distinguishing between which projects are being used for storage, and which are being used for compute.

I plan on filing an issue about this later today if one doesn't already exist, and I'll update here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was closing old tabs and realized I never updated here -- the issue I filed is here: #32106, and it has been resolved to my satisfaction. You can find links to the relevant conversations from that issue, it got kind of complicated with multiple issues filed and such

location=self.location,
job_id=job_id,
timeout=self.result_timeout,
Expand Down
21 changes: 13 additions & 8 deletions airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ def __init__(
job_id: str | None = None,
force_rerun: bool = True,
reattach_states: set[str] | None = None,
project_id: str | None = None,
**kwargs,
) -> None:

Expand All @@ -249,6 +250,7 @@ def __init__(

# BQ config
self.destination_project_dataset_table = destination_project_dataset_table
self.project_id = project_id
self.schema_fields = schema_fields
if source_format.upper() not in ALLOWED_FORMATS:
raise ValueError(
Expand Down Expand Up @@ -306,7 +308,7 @@ def _submit_job(
# Submit a new job without waiting for it to complete.
return hook.insert_job(
configuration=self.configuration,
project_id=hook.project_id,
project_id=self.project_id,
location=self.location,
job_id=job_id,
timeout=self.result_timeout,
Expand Down Expand Up @@ -507,9 +509,9 @@ def _find_max_value_in_column(self):
raise RuntimeError(f"The {select_command} returned no rows!")

def _create_empty_table(self):
project_id, dataset_id, table_id = self.hook.split_tablename(
self.project_id, dataset_id, table_id = self.hook.split_tablename(
table_input=self.destination_project_dataset_table,
default_project_id=self.hook.project_id or "",
default_project_id=self.project_id or self.hook.project_id,
)

external_config_api_repr = {
Expand Down Expand Up @@ -556,7 +558,7 @@ def _create_empty_table(self):

# build table definition
table = Table(
table_ref=TableReference.from_string(self.destination_project_dataset_table, project_id)
table_ref=TableReference.from_string(self.destination_project_dataset_table, self.project_id)
)
table.external_data_configuration = external_config
if self.labels:
Expand All @@ -573,15 +575,18 @@ def _create_empty_table(self):

self.log.info("Creating external table: %s", self.destination_project_dataset_table)
self.hook.create_empty_table(
table_resource=table_obj_api_repr, project_id=project_id, location=self.location, exists_ok=True
table_resource=table_obj_api_repr,
project_id=self.project_id,
location=self.location,
exists_ok=True,
)
self.log.info("External table created successfully: %s", self.destination_project_dataset_table)
return table_obj_api_repr

def _use_existing_table(self):
destination_project, destination_dataset, destination_table = self.hook.split_tablename(
self.project_id, destination_dataset, destination_table = self.hook.split_tablename(
table_input=self.destination_project_dataset_table,
default_project_id=self.hook.project_id or "",
default_project_id=self.project_id or self.hook.project_id,
var_name="destination_project_dataset_table",
)

Expand All @@ -601,7 +606,7 @@ def _use_existing_table(self):
"autodetect": self.autodetect,
"createDisposition": self.create_disposition,
"destinationTable": {
"projectId": destination_project,
"projectId": self.project_id,
"datasetId": destination_dataset,
"tableId": destination_table,
},
Expand Down
20 changes: 10 additions & 10 deletions tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def test_max_value_without_external_table_should_execute_successfully(self, hook
job_id=pytest.real_job_id,
location=None,
nowait=True,
project_id=hook.return_value.project_id,
project_id=hook.return_value.split_tablename.return_value[0],
retry=DEFAULT_RETRY,
timeout=None,
),
Expand Down Expand Up @@ -233,7 +233,7 @@ def test_max_value_should_throw_ex_when_query_returns_no_rows(self, hook):
job_id=pytest.real_job_id,
location=None,
nowait=True,
project_id=hook.return_value.project_id,
project_id=hook.return_value.split_tablename.return_value[0],
retry=DEFAULT_RETRY,
timeout=None,
),
Expand Down Expand Up @@ -342,7 +342,7 @@ def test_labels_without_external_table_should_execute_successfully(self, hook):
job_id=pytest.real_job_id,
location=None,
nowait=True,
project_id=hook.return_value.project_id,
project_id=hook.return_value.split_tablename.return_value[0],
retry=DEFAULT_RETRY,
timeout=None,
)
Expand Down Expand Up @@ -441,7 +441,7 @@ def test_description_without_external_table_should_execute_successfully(self, ho
fieldDelimiter=",",
),
},
project_id=hook.return_value.project_id,
project_id=hook.return_value.split_tablename.return_value[0],
location=None,
job_id=pytest.real_job_id,
timeout=None,
Expand Down Expand Up @@ -545,7 +545,7 @@ def test_source_objs_as_list_without_external_table_should_execute_successfully(
job_id=pytest.real_job_id,
location=None,
nowait=True,
project_id=hook.return_value.project_id,
project_id=hook.return_value.split_tablename.return_value[0],
retry=DEFAULT_RETRY,
timeout=None,
)
Expand Down Expand Up @@ -645,7 +645,7 @@ def test_source_objs_as_string_without_external_table_should_execute_successfull
job_id=pytest.real_job_id,
location=None,
nowait=True,
project_id=hook.return_value.project_id,
project_id=hook.return_value.split_tablename.return_value[0],
retry=DEFAULT_RETRY,
timeout=None,
)
Expand Down Expand Up @@ -746,7 +746,7 @@ def test_schema_obj_without_external_table_should_execute_successfully(self, bq_
"encoding": "UTF-8",
}
},
project_id=bq_hook.return_value.project_id,
project_id=bq_hook.return_value.split_tablename.return_value[0],
location=None,
job_id=pytest.real_job_id,
timeout=None,
Expand Down Expand Up @@ -842,7 +842,7 @@ def test_autodetect_none_without_external_table_should_execute_successfully(self
"encoding": "UTF-8",
}
},
project_id=hook.return_value.project_id,
project_id=hook.return_value.split_tablename.return_value[0],
location=None,
job_id=pytest.real_job_id,
timeout=None,
Expand Down Expand Up @@ -1067,7 +1067,7 @@ def test_schema_fields_integer_scanner_without_external_table_should_execute_suc
job_id=pytest.real_job_id,
location=None,
nowait=True,
project_id=bq_hook.return_value.project_id,
project_id=bq_hook.return_value.split_tablename.return_value[0],
retry=DEFAULT_RETRY,
timeout=None,
),
Expand Down Expand Up @@ -1129,7 +1129,7 @@ def test_schema_fields_without_external_table_should_execute_successfully(self,
job_id=pytest.real_job_id,
location=None,
nowait=True,
project_id=hook.return_value.project_id,
project_id=hook.return_value.split_tablename.return_value[0],
retry=DEFAULT_RETRY,
timeout=None,
)
Expand Down