From aab79fb3dfcd98ba2dbbb9e3932ee46182e5045d Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 24 Dec 2021 00:23:38 +0800 Subject: [PATCH] update get_custom Signed-off-by: Kevin Su --- .../flytekitplugins/bigquery/task.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/plugins/flytekit-bigquery/flytekitplugins/bigquery/task.py b/plugins/flytekit-bigquery/flytekitplugins/bigquery/task.py index 18e047ca070..8551f298212 100644 --- a/plugins/flytekit-bigquery/flytekitplugins/bigquery/task.py +++ b/plugins/flytekit-bigquery/flytekitplugins/bigquery/task.py @@ -3,6 +3,7 @@ from google.cloud import bigquery from google.protobuf.struct_pb2 import Struct +from google.protobuf import json_format from flytekit.extend import SerializationSettings, SQLTask from flytekit.models import task as _task_model @@ -15,8 +16,8 @@ class BigQueryConfig(object): BigQueryConfig should be used to configure a BigQuery Task. """ - Location: Optional[str] - ProjectID: Optional[str] + Location: str + ProjectID: Optional[str] = None QueryJobConfig: Optional[bigquery.QueryJobConfig] = None @@ -65,16 +66,15 @@ def __init__( self._output_schema_type = output_schema_type def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]: - config: dict = self.task_config.QueryJobConfig.to_api_repr()["query"] - config.update( - { + config = { "Location": self.task_config.Location, "ProjectID": self.task_config.ProjectID, } - ) + if self.task_config.QueryJobConfig is not None: + config.update(self.task_config.QueryJobConfig.to_api_repr()["query"]) s = Struct() s.update(config) - return s + return json_format.MessageToDict(s) def get_sql(self, settings: SerializationSettings) -> Optional[_task_model.Sql]: sql = _task_model.Sql(statement=self.query_template, dialect=_task_model.Sql.Dialect.ANSI)