Skip to content

Commit

Permalink
update get_custom
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw committed Dec 28, 2021
1 parent 09a1de3 commit aab79fb
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions plugins/flytekit-bigquery/flytekitplugins/bigquery/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from google.cloud import bigquery
from google.protobuf.struct_pb2 import Struct
from google.protobuf import json_format

from flytekit.extend import SerializationSettings, SQLTask
from flytekit.models import task as _task_model
Expand All @@ -15,8 +16,8 @@ class BigQueryConfig(object):
BigQueryConfig should be used to configure a BigQuery Task.
"""

Location: Optional[str]
ProjectID: Optional[str]
Location: str
ProjectID: Optional[str] = None
QueryJobConfig: Optional[bigquery.QueryJobConfig] = None


Expand Down Expand Up @@ -65,16 +66,15 @@ def __init__(
self._output_schema_type = output_schema_type

def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]:
config: dict = self.task_config.QueryJobConfig.to_api_repr()["query"]
config.update(
{
config = {
"Location": self.task_config.Location,
"ProjectID": self.task_config.ProjectID,
}
)
if self.task_config.QueryJobConfig is not None:
config.update(self.task_config.QueryJobConfig.to_api_repr()["query"])
s = Struct()
s.update(config)
return s
return json_format.MessageToDict(s)

def get_sql(self, settings: SerializationSettings) -> Optional[_task_model.Sql]:
sql = _task_model.Sql(statement=self.query_template, dialect=_task_model.Sql.Dialect.ANSI)
Expand Down

0 comments on commit aab79fb

Please sign in to comment.