Skip to content

Commit

Permalink
Merge pull request #1205 from mtagle/insert_table
Browse files Browse the repository at this point in the history
Add two methods to bigquery hook's base cursor…
  • Loading branch information
criccomini committed Mar 25, 2016
2 parents 7e50742 + aa890bb commit 7a632e2
Showing 1 changed file with 87 additions and 0 deletions.
87 changes: 87 additions & 0 deletions airflow/contrib/hooks/bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,93 @@ def get_tabledata(self, dataset_id, table_id,
.execute()
)

def run_table_upsert(self, dataset_id, table_resource, project_id=None):
"""
creates a new, empty table in the dataset;
If the table already exists, update the existing table.
Since BigQuery does not natively allow table upserts, this is not an
atomic operation.
:param dataset_id: the dataset to upsert the table into.
:type dataset_id: str
:param table_resource: a table resource. see https://cloud.google.com/bigquery/docs/reference/v2/tables#resource
:type table_resource: dict
:param project_id: the project to upsert the table into. If None,
project will be self.project_id.
:return:
"""
# check to see if the table exists
table_id = table_resource['tableReference']['tableId']
table_exists = False
project_id = project_id if project_id is not None else self.project_id
tables_list_resp = self.service.tables().list(projectId=project_id,
datasetId=dataset_id).execute()
if 'tables' in tables_list_resp:
for table in tables_list_resp['tables']:
if table['tableReference']['tableId'] == table_id:
table_exists = True
break

# do update if table exists
if table_exists:
logging.info('table %s:%s.%s exists, updating.', project_id, dataset_id, table_id)
return self.service.tables().update(projectId=project_id,
datasetId=dataset_id,
tableId=table_id,
body=table_resource).execute()
# do insert if table does not exist
else:
logging.info('table %s:%s.%s does not exist. creating.', project_id, dataset_id, table_id)
return self.service.tables().insert(projectId=project_id,
datasetId=dataset_id,
body=table_resource).execute()

def run_grant_dataset_view_access(self,
source_project,
source_dataset,
view_project,
view_dataset,
view_table):
"""
Grant authorized view access of a dataset to a view table.
If this view has already been granted access to the dataset, do nothing.
This method is not atomic. Running it may clobber a simultaneous update.
:param source_project: the project of the source dataset
:type source_project: str
:param source_dataset: the source dataset
:type source_dataset: str
:param view_project: the project that the view is in
:type view_project: str
:param view_dataset: the dataset that the view is in
:type view_dataset: str
:param view_table: the table of the view
:type view_table: str
:return: the datasets resource of the source dataset.
"""

# we don't want to clobber any existing accesses, so we have to get
# info on the dataset before we can add view access
source_dataset_resource = self.service.datasets().get(projectId=source_project,
datasetId=source_dataset).execute()
access = source_dataset_resource['access'] if 'access' in source_dataset_resource else []
view_access = {'view': {'projectId': view_project,
'datasetId': view_dataset,
'tableId': view_table}}
# check to see if the view we want to add already exists.
if view_access not in access:
logging.info('granting table %s:%s.%s authorized view access to %s:%s dataset.',
view_project, view_dataset, view_table,
source_project, source_dataset)
access.append(view_access)
return self.service.datasets().patch(projectId=source_project,
datasetId=source_dataset,
body={'access': access}).execute()
else:
# if view is already in access, do nothing.
logging.info('table %s:%s.%s already has authorized view access to %s:%s dataset.',
view_project, view_dataset, view_table,
source_project, source_dataset)
return source_dataset_resource


class BigQueryCursor(BigQueryBaseCursor):
"""
Expand Down

0 comments on commit 7a632e2

Please sign in to comment.