Skip to content
This repository has been archived by the owner on Sep 3, 2022. It is now read-only.

Commit

Permalink
Merge pull request #331 from GoogleCloudPlatform/pygcp/issue204
Browse files Browse the repository at this point in the history
Add support for table decorating methods (snapshot() and window()).
  • Loading branch information
Graham Wheeler committed May 27, 2015
2 parents 9d1f5a2 + 0a2f013 commit 638560e
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 27 deletions.
15 changes: 8 additions & 7 deletions sources/sdk/pygcp/gcp/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,25 +100,26 @@ def datasetname(project_id, dataset_id):
""" Construct a DataSetName named tuple.
Args:
project_id: the project ID
dataset_id: the dataset ID
project_id: the project ID.
dataset_id: the dataset ID.
Returns:
A DataSetName named-tuple.
"""
return _DataSetName(project_id, dataset_id)


def tablename(project_id, dataset_id, table_id):
def tablename(project_id, dataset_id, table_id, decorator=''):
""" Construct a TableName named tuple.
Args:
project_id: the project ID
dataset_id: the dataset ID
table_id: tha Table ID
project_id: the project ID.
dataset_id: the dataset ID.
table_id: tha Table ID.
decorator: the decorator part.
Returns:
A TableName named-tuple.
"""
return _TableName(project_id, dataset_id, table_id)
return _TableName(project_id, dataset_id, table_id, decorator)


def table(name, context=None):
Expand Down
99 changes: 97 additions & 2 deletions sources/sdk/pygcp/gcp/bigquery/_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import codecs
import csv
from datetime import datetime
from datetime import datetime, timedelta
import math
import pandas as pd
import time
Expand Down Expand Up @@ -288,6 +288,9 @@ class Table(object):
# When fetching table contents, the max number of rows to fetch per HTTP request
_DEFAULT_PAGE_SIZE = 1024

# Milliseconds per week
_MSEC_PER_WEEK = 7 * 24 * 3600 * 1000

def __init__(self, api, name):
"""Initializes an instance of a Table object.
Expand All @@ -297,7 +300,7 @@ def __init__(self, api, name):
"""
self._api = api
self._name_parts = _parse_table_name(name, api.project_id)
self._full_name = '%s:%s.%s' % self._name_parts
self._full_name = '%s:%s.%s%s' % self._name_parts
self._info = None
self._cached_page = None
self._cached_page_index = 0
Expand Down Expand Up @@ -905,5 +908,97 @@ def __getitem__(self, item):

return self._cached_page[item - self._cached_page_index]

@staticmethod
def _convert_decorator_time(when):
if isinstance(when, datetime):
value = 1000 * (when - datetime.utcfromtimestamp(0)).total_seconds()
elif isinstance(when, timedelta):
value = when.total_seconds() * 1000
if value > 0:
raise Exception("Invalid snapshot relative when argument: %s" % str(when))
else:
raise Exception("Invalid snapshot when argument type: %s" % str(when))

if value < -Table._MSEC_PER_WEEK:
raise Exception("Invalid snapshot relative when argument: must be within 7 days: %s"
% str(when))

if value > 0:
now = 1000 * (datetime.utcnow() - datetime.utcfromtimestamp(0)).total_seconds()
# Check that an abs value is not more than 7 days in the past and is
# not in the future
if not ((now - Table._MSEC_PER_WEEK) < value < now):
raise Exception("Invalid snapshot absolute when argument: %s" % str(when))

return int(value)

def snapshot(self, at):
""" Return a new Table which is a snapshot of this table at the specified time.
Args:
at: the time of the snapshot. This can be a Python datetime (absolute) or timedelta
(relative to current time). The result must be after the table was created and no more
than seven days in the past. Passing None will get a reference the oldest snapshot.
Note that using a datetime will get a snapshot at an absolute point in time, while
a timedelta will provide a varying snapshot; any queries issued against such a Table
will be done against a snapshot that has an age relative to the execution time of the
query.
Returns:
A new Table object referencing the snapshot.
Raises:
An exception if this Table is already decorated, or if the time specified is invalid.
"""
if self._name_parts.decorator != '':
raise Exception("Cannot use snapshot() on an already decorated table")

value = Table._convert_decorator_time(at)
return Table(self._api, "%s@%s" % (self.full_name, str(value)))

def window(self, begin, end=None):
""" Return a new Table limited to the rows added to this Table during the specified time range.
Args:
begin: the start time of the window. This can be a Python datetime (absolute) or timedelta
(relative to current time). The result must be after the table was created and no more
than seven days in the past.
Note that using a relative value will provide a varying snapshot, not a fixed
snapshot; any queries issued against such a Table will be done against a snapshot
that has an age relative to the execution time of the query.
end: the end time of the snapshot; if None, then the current time is used. The types and
interpretation of values is as for start.
Returns:
A new Table object referencing the window.
Raises:
An exception if this Table is already decorated, or if the time specified is invalid.
"""
if self._name_parts.decorator != '':
raise Exception("Cannot use window() on an already decorated table")

start = Table._convert_decorator_time(begin)
if end is None:
if isinstance(begin, timedelta):
end = timedelta(0)
else:
end = datetime.utcnow()
stop = Table._convert_decorator_time(end)

# Both values must have the same sign
if (start > 0 >= stop) or (stop > 0 >= start):
raise Exception("window: Between arguments must both be absolute or relative: %s, %s" %
(str(begin), str(end)))

# start must be less than stop
if start > stop:
raise Exception("window: Between arguments: begin must be before end: %s, %s" %
(str(begin), str(end)))

return Table(self._api, "%s@%s-%s" % (self.full_name, str(start), str(stop)))

from ._query import Query as _Query
33 changes: 20 additions & 13 deletions sources/sdk/pygcp/gcp/bigquery/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,23 @@


DataSetName = collections.namedtuple('DataSetName', ['project_id', 'dataset_id'])
TableName = collections.namedtuple('TableName', ['project_id', 'dataset_id', 'table_id'])
TableName = collections.namedtuple('TableName',
['project_id', 'dataset_id', 'table_id', 'decorator'])

# Absolute project-qualified name pattern: <project>:<dataset>
_ABS_DATASET_NAME_PATTERN = r'^([a-z0-9\-_\.:]+)\:([a-zA-Z0-9_]+)$'
_ABS_DATASET_NAME_PATTERN = r'^([a-z\d\-_\.:]+)\:(\w+)$'

# Relative name pattern: <dataset>
_REL_DATASET_NAME_PATTERN = r'^([a-zA-Z0-9_]+)$'
_REL_DATASET_NAME_PATTERN = r'^(\w+)$'

# Absolute project-qualified name pattern: <project>:<dataset>.<table>
_ABS_TABLE_NAME_PATTERN = r'^([a-z0-9\-_\.:]+)\:([a-zA-Z0-9_]+)\.([a-zA-Z0-9_]+)$'
_ABS_TABLE_NAME_PATTERN = r'^([a-z\d\-_\.:]+)\:(\w+)\.(\w+)(@[\d\-]+)?$'

# Relative name pattern: <dataset>.<table>
_REL_TABLE_NAME_PATTERN = r'^([a-zA-Z0-9_]+)\.([a-zA-Z0-9_]+)$'
_REL_TABLE_NAME_PATTERN = r'^(\w+)\.(\w+)(@[\d\-]+)?$'

# Table-only name pattern: <table>
_TABLE_NAME_PATTERN = r'^([a-zA-Z0-9_]+)$'
# Table-only name pattern: <table>. Includes an optional decorator.
_TABLE_NAME_PATTERN = r'^(\w+)(@[\d\-]+)$'


def parse_dataset_name(name, project_id=None):
Expand Down Expand Up @@ -99,24 +100,26 @@ def parse_table_name(name, project_id=None, dataset_id=None):
Raises:
Exception: raised if the name doesn't match the expected formats.
"""
_project_id = _dataset_id = _table_id = None
_project_id = _dataset_id = _table_id = _decorator = None
if isinstance(name, basestring):
# Try to parse as absolute name first.
m = re.match(_ABS_TABLE_NAME_PATTERN, name, re.IGNORECASE)
if m is not None:
_project_id, _dataset_id, _table_id = m.groups()
_project_id, _dataset_id, _table_id, _decorator = m.groups()
else:
# Next try to match as a relative name implicitly scoped within current project.
m = re.match(_REL_TABLE_NAME_PATTERN, name)
if m is not None:
groups = m.groups()
_project_id, _dataset_id, _table_id = project_id, groups[0], groups[1]
_project_id, _dataset_id, _table_id, _decorator =\
project_id, groups[0], groups[1], groups[2]
else:
# Finally try to match as a table name only.
m = re.match(_TABLE_NAME_PATTERN, name)
if m is not None:
groups = m.groups()
_project_id, _dataset_id, _table_id = project_id, dataset_id, groups[0]
_project_id, _dataset_id, _table_id, _decorator =\
project_id, dataset_id, groups[0], groups[1]
elif isinstance(name, dict):
try:
_table_id = name['table_id']
Expand All @@ -126,7 +129,9 @@ def parse_table_name(name, project_id=None, dataset_id=None):
pass
else:
# Try treat as an array or tuple
if len(name) == 3:
if len(name) == 4:
_project_id, _dataset_id, _table_id, _decorator = name
elif len(name) == 3:
_project_id, _dataset_id, _table_id = name
elif len(name) == 2:
_dataset_id, _table_id = name
Expand All @@ -136,5 +141,7 @@ def parse_table_name(name, project_id=None, dataset_id=None):
_project_id = project_id
if not _dataset_id:
_dataset_id = dataset_id
if not _decorator:
_decorator = ''

return TableName(_project_id, _dataset_id, _table_id)
return TableName(_project_id, _dataset_id, _table_id, _decorator)
88 changes: 83 additions & 5 deletions sources/sdk/pygcp/tests/bq_table_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def _check_name_parts(self, table):
self.assertEqual('test', parsed_name[0])
self.assertEqual('requestlogs', parsed_name[1])
self.assertEqual('today', parsed_name[2])
self.assertEqual('', parsed_name[3])
self.assertEqual('[test:requestlogs.today]', table._repr_sql_())

def test_parse_full_name(self):
Expand All @@ -47,7 +48,7 @@ def test_parse_dict_local_name(self):
self._check_name_parts(table)

def test_parse_named_tuple_name(self):
table = self._create_table(gcp.bigquery.tablename('test', 'requestlogs', 'today'))
table = self._create_table(gcp.bigquery.tablename('test', 'requestlogs', 'today', ''))
self._check_name_parts(table)

def test_parse_tuple_full_name(self):
Expand Down Expand Up @@ -316,7 +317,7 @@ def test_insertAll_dataframe(self,

result = table.insertAll(df)
self.assertIsNotNone(result, "insertAll should return the table object")
mock_api_tabledata_insertAll.assert_called_with(('test', 'testds', 'testTable0'), [
mock_api_tabledata_insertAll.assert_called_with(('test', 'testds', 'testTable0', ''), [
{'insertId': '#0', 'json': {u'column': 'r0', u'headers': 10.0, u'some': 0}},
{'insertId': '#1', 'json': {u'column': 'r1', u'headers': 10.0, u'some': 1}},
{'insertId': '#2', 'json': {u'column': 'r2', u'headers': 10.0, u'some': 2}},
Expand Down Expand Up @@ -356,7 +357,7 @@ def test_insertAll_dictlist(self,
{u'column': 'r3', u'headers': 10.0, u'some': 3}
])
self.assertIsNotNone(result, "insertAll should return the table object")
mock_api_tabledata_insertAll.assert_called_with(('test', 'testds', 'testTable0'), [
mock_api_tabledata_insertAll.assert_called_with(('test', 'testds', 'testTable0', ''), [
{'insertId': '#0', 'json': {u'column': 'r0', u'headers': 10.0, u'some': 0}},
{'insertId': '#1', 'json': {u'column': 'r1', u'headers': 10.0, u'some': 1}},
{'insertId': '#2', 'json': {u'column': 'r2', u'headers': 10.0, u'some': 2}},
Expand Down Expand Up @@ -396,7 +397,7 @@ def test_insertAll_dictlist_index(self,
{u'column': 'r3', u'headers': 10.0, u'some': 3}
], include_index=True)
self.assertIsNotNone(result, "insertAll should return the table object")
mock_api_tabledata_insertAll.assert_called_with(('test', 'testds', 'testTable0'), [
mock_api_tabledata_insertAll.assert_called_with(('test', 'testds', 'testTable0', ''), [
{'insertId': '#0', 'json': {u'column': 'r0', u'headers': 10.0, u'some': 0, 'Index': 0}},
{'insertId': '#1', 'json': {u'column': 'r1', u'headers': 10.0, u'some': 1, 'Index': 1}},
{'insertId': '#2', 'json': {u'column': 'r2', u'headers': 10.0, u'some': 2, 'Index': 2}},
Expand Down Expand Up @@ -436,7 +437,7 @@ def test_insertAll_dictlist_named_index(self,
{u'column': 'r3', u'headers': 10.0, u'some': 3}
], include_index=True, index_name='Row')
self.assertIsNotNone(result, "insertAll should return the table object")
mock_api_tabledata_insertAll.assert_called_with(('test', 'testds', 'testTable0'), [
mock_api_tabledata_insertAll.assert_called_with(('test', 'testds', 'testTable0', ''), [
{'insertId': '#0', 'json': {u'column': 'r0', u'headers': 10.0, u'some': 0, 'Row': 0}},
{'insertId': '#1', 'json': {u'column': 'r1', u'headers': 10.0, u'some': 1, 'Row': 1}},
{'insertId': '#2', 'json': {u'column': 'r2', u'headers': 10.0, u'some': 2, 'Row': 2}},
Expand Down Expand Up @@ -493,6 +494,83 @@ def test_encode_dict_as_row(self):
row = gcp.bigquery._Table._encode_dict_as_row({'fo@o': 'b@r', 'b+ar': when}, {})
self.assertEqual({'foo': 'b@r', 'bar': '2001-02-03T04:05:06.000007'}, row)

def test_decorators(self):
tbl = gcp.bigquery.table('testds.testTable0', context=self._create_context())
tbl2 = tbl.snapshot(dt.timedelta(hours=-1))
self.assertEquals('test:testds.testTable0@-3600000', tbl2.full_name)

with self.assertRaises(Exception) as error:
tbl2 = tbl2.snapshot(dt.timedelta(hours=-2))
self.assertEqual('Cannot use snapshot() on an already decorated table',
error.exception[0])

with self.assertRaises(Exception) as error:
tbl2 = tbl2.window(dt.timedelta(hours=-2), 0)
self.assertEqual('Cannot use window() on an already decorated table',
error.exception[0])

with self.assertRaises(Exception) as error:
tbl2 = tbl.snapshot(dt.timedelta(days=-8))
self.assertEqual('Invalid snapshot relative when argument: must be within 7 days: -8 days, 0:00:00',
error.exception[0])

with self.assertRaises(Exception) as error:
tbl2 = tbl.snapshot(dt.timedelta(days=-8))
self.assertEqual('Invalid snapshot relative when argument: must be within 7 days: -8 days, 0:00:00',
error.exception[0])

tbl2 = tbl.snapshot(dt.timedelta(days=-1))
self.assertEquals('test:testds.testTable0@-86400000', tbl2.full_name)

with self.assertRaises(Exception) as error:
tbl2 = tbl.snapshot(dt.timedelta(days=1))
self.assertEqual('Invalid snapshot relative when argument: 1 day, 0:00:00',
error.exception[0])

with self.assertRaises(Exception) as error:
tbl2 = tbl.snapshot(1000)
self.assertEqual('Invalid snapshot when argument type: 1000',
error.exception[0])

when = dt.datetime.utcnow() - dt.timedelta(1)
self.assertEquals('test:testds.testTable0@-86400000', tbl2.full_name)

when = dt.datetime.utcnow() + dt.timedelta(1)
with self.assertRaises(Exception) as error:
tbl2 = tbl.snapshot(when)
self.assertEqual('Invalid snapshot absolute when argument: %s' % when,
error.exception[0])

when = dt.datetime.utcnow() - dt.timedelta(8)
with self.assertRaises(Exception) as error:
tbl2 = tbl.snapshot(when)
self.assertEqual('Invalid snapshot absolute when argument: %s' % when,
error.exception[0])

def test_window_decorators(self):
# The at test above already tests many of the conversion cases. The extra things we
# have to test are that we can use two values, we get a meaningful default for the second
# if we pass None, and that the first time comes before the second.
tbl = gcp.bigquery.table('testds.testTable0', context=self._create_context())

tbl2 = tbl.window(dt.timedelta(hours=-1))
self.assertEquals('test:testds.testTable0@-3600000-0', tbl2.full_name)

with self.assertRaises(Exception) as error:
tbl2 = tbl2.window(-400000, 0)
self.assertEqual('Cannot use window() on an already decorated table',
error.exception[0])

with self.assertRaises(Exception) as error:
tbl2 = tbl2.snapshot(-400000)
self.assertEqual('Cannot use snapshot() on an already decorated table',
error.exception[0])

with self.assertRaises(Exception) as error:
tbl2 = tbl.window(dt.timedelta(0), dt.timedelta(hours=-1))
self.assertEqual('window: Between arguments: begin must be before end: 0:00:00, -1 day, 23:00:00',
error.exception[0])

def _create_context(self):
project_id = 'test'
creds = AccessTokenCredentials('test_token', 'test_ua')
Expand Down

0 comments on commit 638560e

Please sign in to comment.