diff --git a/plugins/flytekit-bigquery/flytekitplugins/bigquery/task.py b/plugins/flytekit-bigquery/flytekitplugins/bigquery/task.py index 18e047ca070..8551f298212 100644 --- a/plugins/flytekit-bigquery/flytekitplugins/bigquery/task.py +++ b/plugins/flytekit-bigquery/flytekitplugins/bigquery/task.py @@ -3,6 +3,7 @@ from google.cloud import bigquery from google.protobuf.struct_pb2 import Struct +from google.protobuf import json_format from flytekit.extend import SerializationSettings, SQLTask from flytekit.models import task as _task_model @@ -15,8 +16,8 @@ class BigQueryConfig(object): BigQueryConfig should be used to configure a BigQuery Task. """ - Location: Optional[str] - ProjectID: Optional[str] + Location: str + ProjectID: Optional[str] = None QueryJobConfig: Optional[bigquery.QueryJobConfig] = None @@ -65,16 +66,15 @@ def __init__( self._output_schema_type = output_schema_type def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]: - config: dict = self.task_config.QueryJobConfig.to_api_repr()["query"] - config.update( - { + config = { "Location": self.task_config.Location, "ProjectID": self.task_config.ProjectID, } - ) + if self.task_config.QueryJobConfig is not None: + config.update(self.task_config.QueryJobConfig.to_api_repr()["query"]) s = Struct() s.update(config) - return s + return json_format.MessageToDict(s) def get_sql(self, settings: SerializationSettings) -> Optional[_task_model.Sql]: sql = _task_model.Sql(statement=self.query_template, dialect=_task_model.Sql.Dialect.ANSI)