Skip to content
This repository has been archived by the owner on Sep 3, 2022. It is now read-only.

Add support for table decorating methods (snapshot() and window()). #331

Merged
merged 1 commit into from
May 27, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions sources/sdk/pygcp/gcp/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,25 +100,26 @@ def datasetname(project_id, dataset_id):
""" Construct a DataSetName named tuple.

Args:
project_id: the project ID
dataset_id: the dataset ID
project_id: the project ID.
dataset_id: the dataset ID.
Returns:
A DataSetName named-tuple.
"""
return _DataSetName(project_id, dataset_id)


def tablename(project_id, dataset_id, table_id):
def tablename(project_id, dataset_id, table_id, decorator=''):
""" Construct a TableName named tuple.

Args:
project_id: the project ID
dataset_id: the dataset ID
table_id: tha Table ID
project_id: the project ID.
dataset_id: the dataset ID.
table_id: tha Table ID.
decorator: the decorator part.
Returns:
A TableName named-tuple.
"""
return _TableName(project_id, dataset_id, table_id)
return _TableName(project_id, dataset_id, table_id, decorator)


def table(name, context=None):
Expand Down
99 changes: 97 additions & 2 deletions sources/sdk/pygcp/gcp/bigquery/_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import codecs
import csv
from datetime import datetime
from datetime import datetime, timedelta
import math
import pandas as pd
import time
Expand Down Expand Up @@ -288,6 +288,9 @@ class Table(object):
# When fetching table contents, the max number of rows to fetch per HTTP request
_DEFAULT_PAGE_SIZE = 1024

# Milliseconds per week
_MSEC_PER_WEEK = 7 * 24 * 3600 * 1000

def __init__(self, api, name):
"""Initializes an instance of a Table object.

Expand All @@ -297,7 +300,7 @@ def __init__(self, api, name):
"""
self._api = api
self._name_parts = _parse_table_name(name, api.project_id)
self._full_name = '%s:%s.%s' % self._name_parts
self._full_name = '%s:%s.%s%s' % self._name_parts
self._info = None
self._cached_page = None
self._cached_page_index = 0
Expand Down Expand Up @@ -905,5 +908,97 @@ def __getitem__(self, item):

return self._cached_page[item - self._cached_page_index]

@staticmethod
def _convert_decorator_time(when):
if isinstance(when, datetime):
value = 1000 * (when - datetime.utcfromtimestamp(0)).total_seconds()
elif isinstance(when, timedelta):
value = when.total_seconds() * 1000
if value > 0:
raise Exception("Invalid snapshot relative when argument: %s" % str(when))
else:
raise Exception("Invalid snapshot when argument type: %s" % str(when))

if value < -Table._MSEC_PER_WEEK:
raise Exception("Invalid snapshot relative when argument: must be within 7 days: %s"
% str(when))

if value > 0:
now = 1000 * (datetime.utcnow() - datetime.utcfromtimestamp(0)).total_seconds()
# Check that an abs value is not more than 7 days in the past and is
# not in the future
if not ((now - Table._MSEC_PER_WEEK) < value < now):
raise Exception("Invalid snapshot absolute when argument: %s" % str(when))

return int(value)

def snapshot(self, at):
""" Return a new Table which is a snapshot of this table at the specified time.

Args:
at: the time of the snapshot. This can be a Python datetime (absolute) or timedelta
(relative to current time). The result must be after the table was created and no more
than seven days in the past. Passing None will get a reference the oldest snapshot.

Note that using a datetime will get a snapshot at an absolute point in time, while
a timedelta will provide a varying snapshot; any queries issued against such a Table
will be done against a snapshot that has an age relative to the execution time of the
query.

Returns:
A new Table object referencing the snapshot.

Raises:
An exception if this Table is already decorated, or if the time specified is invalid.
"""
if self._name_parts.decorator != '':
raise Exception("Cannot use snapshot() on an already decorated table")

value = Table._convert_decorator_time(at)
return Table(self._api, "%s@%s" % (self.full_name, str(value)))

def window(self, begin, end=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Along the same lines as the arg name for snapshot, suggestion:
begin -> from
end -> to

Does it read better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"from" is a reserved word in Python so can't do that change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, begin/end it is then.

""" Return a new Table limited to the rows added to this Table during the specified time range.

Args:
begin: the start time of the window. This can be a Python datetime (absolute) or timedelta
(relative to current time). The result must be after the table was created and no more
than seven days in the past.

Note that using a relative value will provide a varying snapshot, not a fixed
snapshot; any queries issued against such a Table will be done against a snapshot
that has an age relative to the execution time of the query.

end: the end time of the snapshot; if None, then the current time is used. The types and
interpretation of values is as for start.

Returns:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returns and raises docs need to be updated to refer to window.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

A new Table object referencing the window.

Raises:
An exception if this Table is already decorated, or if the time specified is invalid.
"""
if self._name_parts.decorator != '':
raise Exception("Cannot use window() on an already decorated table")

start = Table._convert_decorator_time(begin)
if end is None:
if isinstance(begin, timedelta):
end = timedelta(0)
else:
end = datetime.utcnow()
stop = Table._convert_decorator_time(end)

# Both values must have the same sign
if (start > 0 >= stop) or (stop > 0 >= start):
raise Exception("window: Between arguments must both be absolute or relative: %s, %s" %
(str(begin), str(end)))

# start must be less than stop
if start > stop:
raise Exception("window: Between arguments: begin must be before end: %s, %s" %
(str(begin), str(end)))

return Table(self._api, "%s@%s-%s" % (self.full_name, str(start), str(stop)))

from ._query import Query as _Query
33 changes: 20 additions & 13 deletions sources/sdk/pygcp/gcp/bigquery/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,23 @@


DataSetName = collections.namedtuple('DataSetName', ['project_id', 'dataset_id'])
TableName = collections.namedtuple('TableName', ['project_id', 'dataset_id', 'table_id'])
TableName = collections.namedtuple('TableName',
['project_id', 'dataset_id', 'table_id', 'decorator'])

# Absolute project-qualified name pattern: <project>:<dataset>
_ABS_DATASET_NAME_PATTERN = r'^([a-z0-9\-_\.:]+)\:([a-zA-Z0-9_]+)$'
_ABS_DATASET_NAME_PATTERN = r'^([a-z\d\-_\.:]+)\:(\w+)$'

# Relative name pattern: <dataset>
_REL_DATASET_NAME_PATTERN = r'^([a-zA-Z0-9_]+)$'
_REL_DATASET_NAME_PATTERN = r'^(\w+)$'

# Absolute project-qualified name pattern: <project>:<dataset>.<table>
_ABS_TABLE_NAME_PATTERN = r'^([a-z0-9\-_\.:]+)\:([a-zA-Z0-9_]+)\.([a-zA-Z0-9_]+)$'
_ABS_TABLE_NAME_PATTERN = r'^([a-z\d\-_\.:]+)\:(\w+)\.(\w+)(@[\d\-]+)?$'

# Relative name pattern: <dataset>.<table>
_REL_TABLE_NAME_PATTERN = r'^([a-zA-Z0-9_]+)\.([a-zA-Z0-9_]+)$'
_REL_TABLE_NAME_PATTERN = r'^(\w+)\.(\w+)(@[\d\-]+)?$'

# Table-only name pattern: <table>
_TABLE_NAME_PATTERN = r'^([a-zA-Z0-9_]+)$'
# Table-only name pattern: <table>. Includes an optional decorator.
_TABLE_NAME_PATTERN = r'^(\w+)(@[\d\-]+)$'


def parse_dataset_name(name, project_id=None):
Expand Down Expand Up @@ -99,24 +100,26 @@ def parse_table_name(name, project_id=None, dataset_id=None):
Raises:
Exception: raised if the name doesn't match the expected formats.
"""
_project_id = _dataset_id = _table_id = None
_project_id = _dataset_id = _table_id = _decorator = None
if isinstance(name, basestring):
# Try to parse as absolute name first.
m = re.match(_ABS_TABLE_NAME_PATTERN, name, re.IGNORECASE)
if m is not None:
_project_id, _dataset_id, _table_id = m.groups()
_project_id, _dataset_id, _table_id, _decorator = m.groups()
else:
# Next try to match as a relative name implicitly scoped within current project.
m = re.match(_REL_TABLE_NAME_PATTERN, name)
if m is not None:
groups = m.groups()
_project_id, _dataset_id, _table_id = project_id, groups[0], groups[1]
_project_id, _dataset_id, _table_id, _decorator =\
project_id, groups[0], groups[1], groups[2]
else:
# Finally try to match as a table name only.
m = re.match(_TABLE_NAME_PATTERN, name)
if m is not None:
groups = m.groups()
_project_id, _dataset_id, _table_id = project_id, dataset_id, groups[0]
_project_id, _dataset_id, _table_id, _decorator =\
project_id, dataset_id, groups[0], groups[1]
elif isinstance(name, dict):
try:
_table_id = name['table_id']
Expand All @@ -126,7 +129,9 @@ def parse_table_name(name, project_id=None, dataset_id=None):
pass
else:
# Try treat as an array or tuple
if len(name) == 3:
if len(name) == 4:
_project_id, _dataset_id, _table_id, _decorator = name
elif len(name) == 3:
_project_id, _dataset_id, _table_id = name
elif len(name) == 2:
_dataset_id, _table_id = name
Expand All @@ -136,5 +141,7 @@ def parse_table_name(name, project_id=None, dataset_id=None):
_project_id = project_id
if not _dataset_id:
_dataset_id = dataset_id
if not _decorator:
_decorator = ''

return TableName(_project_id, _dataset_id, _table_id)
return TableName(_project_id, _dataset_id, _table_id, _decorator)
88 changes: 83 additions & 5 deletions sources/sdk/pygcp/tests/bq_table_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def _check_name_parts(self, table):
self.assertEqual('test', parsed_name[0])
self.assertEqual('requestlogs', parsed_name[1])
self.assertEqual('today', parsed_name[2])
self.assertEqual('', parsed_name[3])
self.assertEqual('[test:requestlogs.today]', table._repr_sql_())

def test_parse_full_name(self):
Expand All @@ -47,7 +48,7 @@ def test_parse_dict_local_name(self):
self._check_name_parts(table)

def test_parse_named_tuple_name(self):
table = self._create_table(gcp.bigquery.tablename('test', 'requestlogs', 'today'))
table = self._create_table(gcp.bigquery.tablename('test', 'requestlogs', 'today', ''))
self._check_name_parts(table)

def test_parse_tuple_full_name(self):
Expand Down Expand Up @@ -316,7 +317,7 @@ def test_insertAll_dataframe(self,

result = table.insertAll(df)
self.assertIsNotNone(result, "insertAll should return the table object")
mock_api_tabledata_insertAll.assert_called_with(('test', 'testds', 'testTable0'), [
mock_api_tabledata_insertAll.assert_called_with(('test', 'testds', 'testTable0', ''), [
{'insertId': '#0', 'json': {u'column': 'r0', u'headers': 10.0, u'some': 0}},
{'insertId': '#1', 'json': {u'column': 'r1', u'headers': 10.0, u'some': 1}},
{'insertId': '#2', 'json': {u'column': 'r2', u'headers': 10.0, u'some': 2}},
Expand Down Expand Up @@ -356,7 +357,7 @@ def test_insertAll_dictlist(self,
{u'column': 'r3', u'headers': 10.0, u'some': 3}
])
self.assertIsNotNone(result, "insertAll should return the table object")
mock_api_tabledata_insertAll.assert_called_with(('test', 'testds', 'testTable0'), [
mock_api_tabledata_insertAll.assert_called_with(('test', 'testds', 'testTable0', ''), [
{'insertId': '#0', 'json': {u'column': 'r0', u'headers': 10.0, u'some': 0}},
{'insertId': '#1', 'json': {u'column': 'r1', u'headers': 10.0, u'some': 1}},
{'insertId': '#2', 'json': {u'column': 'r2', u'headers': 10.0, u'some': 2}},
Expand Down Expand Up @@ -396,7 +397,7 @@ def test_insertAll_dictlist_index(self,
{u'column': 'r3', u'headers': 10.0, u'some': 3}
], include_index=True)
self.assertIsNotNone(result, "insertAll should return the table object")
mock_api_tabledata_insertAll.assert_called_with(('test', 'testds', 'testTable0'), [
mock_api_tabledata_insertAll.assert_called_with(('test', 'testds', 'testTable0', ''), [
{'insertId': '#0', 'json': {u'column': 'r0', u'headers': 10.0, u'some': 0, 'Index': 0}},
{'insertId': '#1', 'json': {u'column': 'r1', u'headers': 10.0, u'some': 1, 'Index': 1}},
{'insertId': '#2', 'json': {u'column': 'r2', u'headers': 10.0, u'some': 2, 'Index': 2}},
Expand Down Expand Up @@ -436,7 +437,7 @@ def test_insertAll_dictlist_named_index(self,
{u'column': 'r3', u'headers': 10.0, u'some': 3}
], include_index=True, index_name='Row')
self.assertIsNotNone(result, "insertAll should return the table object")
mock_api_tabledata_insertAll.assert_called_with(('test', 'testds', 'testTable0'), [
mock_api_tabledata_insertAll.assert_called_with(('test', 'testds', 'testTable0', ''), [
{'insertId': '#0', 'json': {u'column': 'r0', u'headers': 10.0, u'some': 0, 'Row': 0}},
{'insertId': '#1', 'json': {u'column': 'r1', u'headers': 10.0, u'some': 1, 'Row': 1}},
{'insertId': '#2', 'json': {u'column': 'r2', u'headers': 10.0, u'some': 2, 'Row': 2}},
Expand Down Expand Up @@ -493,6 +494,83 @@ def test_encode_dict_as_row(self):
row = gcp.bigquery._Table._encode_dict_as_row({'fo@o': 'b@r', 'b+ar': when}, {})
self.assertEqual({'foo': 'b@r', 'bar': '2001-02-03T04:05:06.000007'}, row)

def test_decorators(self):
tbl = gcp.bigquery.table('testds.testTable0', context=self._create_context())
tbl2 = tbl.snapshot(dt.timedelta(hours=-1))
self.assertEquals('test:testds.testTable0@-3600000', tbl2.full_name)

with self.assertRaises(Exception) as error:
tbl2 = tbl2.snapshot(dt.timedelta(hours=-2))
self.assertEqual('Cannot use snapshot() on an already decorated table',
error.exception[0])

with self.assertRaises(Exception) as error:
tbl2 = tbl2.window(dt.timedelta(hours=-2), 0)
self.assertEqual('Cannot use window() on an already decorated table',
error.exception[0])

with self.assertRaises(Exception) as error:
tbl2 = tbl.snapshot(dt.timedelta(days=-8))
self.assertEqual('Invalid snapshot relative when argument: must be within 7 days: -8 days, 0:00:00',
error.exception[0])

with self.assertRaises(Exception) as error:
tbl2 = tbl.snapshot(dt.timedelta(days=-8))
self.assertEqual('Invalid snapshot relative when argument: must be within 7 days: -8 days, 0:00:00',
error.exception[0])

tbl2 = tbl.snapshot(dt.timedelta(days=-1))
self.assertEquals('test:testds.testTable0@-86400000', tbl2.full_name)

with self.assertRaises(Exception) as error:
tbl2 = tbl.snapshot(dt.timedelta(days=1))
self.assertEqual('Invalid snapshot relative when argument: 1 day, 0:00:00',
error.exception[0])

with self.assertRaises(Exception) as error:
tbl2 = tbl.snapshot(1000)
self.assertEqual('Invalid snapshot when argument type: 1000',
error.exception[0])

when = dt.datetime.utcnow() - dt.timedelta(1)
self.assertEquals('test:testds.testTable0@-86400000', tbl2.full_name)

when = dt.datetime.utcnow() + dt.timedelta(1)
with self.assertRaises(Exception) as error:
tbl2 = tbl.snapshot(when)
self.assertEqual('Invalid snapshot absolute when argument: %s' % when,
error.exception[0])

when = dt.datetime.utcnow() - dt.timedelta(8)
with self.assertRaises(Exception) as error:
tbl2 = tbl.snapshot(when)
self.assertEqual('Invalid snapshot absolute when argument: %s' % when,
error.exception[0])

def test_window_decorators(self):
# The at test above already tests many of the conversion cases. The extra things we
# have to test are that we can use two values, we get a meaningful default for the second
# if we pass None, and that the first time comes before the second.
tbl = gcp.bigquery.table('testds.testTable0', context=self._create_context())

tbl2 = tbl.window(dt.timedelta(hours=-1))
self.assertEquals('test:testds.testTable0@-3600000-0', tbl2.full_name)

with self.assertRaises(Exception) as error:
tbl2 = tbl2.window(-400000, 0)
self.assertEqual('Cannot use window() on an already decorated table',
error.exception[0])

with self.assertRaises(Exception) as error:
tbl2 = tbl2.snapshot(-400000)
self.assertEqual('Cannot use snapshot() on an already decorated table',
error.exception[0])

with self.assertRaises(Exception) as error:
tbl2 = tbl.window(dt.timedelta(0), dt.timedelta(hours=-1))
self.assertEqual('window: Between arguments: begin must be before end: 0:00:00, -1 day, 23:00:00',
error.exception[0])

def _create_context(self):
project_id = 'test'
creds = AccessTokenCredentials('test_token', 'test_ua')
Expand Down