diff --git a/sources/sdk/pygcp/gcp/bigquery/__init__.py b/sources/sdk/pygcp/gcp/bigquery/__init__.py index 6ec178629..95f1b2b39 100644 --- a/sources/sdk/pygcp/gcp/bigquery/__init__.py +++ b/sources/sdk/pygcp/gcp/bigquery/__init__.py @@ -100,25 +100,26 @@ def datasetname(project_id, dataset_id): """ Construct a DataSetName named tuple. Args: - project_id: the project ID - dataset_id: the dataset ID + project_id: the project ID. + dataset_id: the dataset ID. Returns: A DataSetName named-tuple. """ return _DataSetName(project_id, dataset_id) -def tablename(project_id, dataset_id, table_id): +def tablename(project_id, dataset_id, table_id, decorator=''): """ Construct a TableName named tuple. Args: - project_id: the project ID - dataset_id: the dataset ID - table_id: tha Table ID + project_id: the project ID. + dataset_id: the dataset ID. + table_id: tha Table ID. + decorator: the decorator part. Returns: A TableName named-tuple. """ - return _TableName(project_id, dataset_id, table_id) + return _TableName(project_id, dataset_id, table_id, decorator) def table(name, context=None): diff --git a/sources/sdk/pygcp/gcp/bigquery/_table.py b/sources/sdk/pygcp/gcp/bigquery/_table.py index dcbddc121..d3a7ab4c1 100644 --- a/sources/sdk/pygcp/gcp/bigquery/_table.py +++ b/sources/sdk/pygcp/gcp/bigquery/_table.py @@ -16,7 +16,7 @@ import codecs import csv -from datetime import datetime +from datetime import datetime, timedelta import math import pandas as pd import time @@ -288,6 +288,9 @@ class Table(object): # When fetching table contents, the max number of rows to fetch per HTTP request _DEFAULT_PAGE_SIZE = 1024 + # Milliseconds per week + _MSEC_PER_WEEK = 7 * 24 * 3600 * 1000 + def __init__(self, api, name): """Initializes an instance of a Table object. @@ -297,7 +300,7 @@ def __init__(self, api, name): """ self._api = api self._name_parts = _parse_table_name(name, api.project_id) - self._full_name = '%s:%s.%s' % self._name_parts + self._full_name = '%s:%s.%s%s' % self._name_parts self._info = None self._cached_page = None self._cached_page_index = 0 @@ -905,5 +908,97 @@ def __getitem__(self, item): return self._cached_page[item - self._cached_page_index] + @staticmethod + def _convert_decorator_time(when): + if isinstance(when, datetime): + value = 1000 * (when - datetime.utcfromtimestamp(0)).total_seconds() + elif isinstance(when, timedelta): + value = when.total_seconds() * 1000 + if value > 0: + raise Exception("Invalid snapshot relative when argument: %s" % str(when)) + else: + raise Exception("Invalid snapshot when argument type: %s" % str(when)) + + if value < -Table._MSEC_PER_WEEK: + raise Exception("Invalid snapshot relative when argument: must be within 7 days: %s" + % str(when)) + + if value > 0: + now = 1000 * (datetime.utcnow() - datetime.utcfromtimestamp(0)).total_seconds() + # Check that an abs value is not more than 7 days in the past and is + # not in the future + if not ((now - Table._MSEC_PER_WEEK) < value < now): + raise Exception("Invalid snapshot absolute when argument: %s" % str(when)) + + return int(value) + + def snapshot(self, at): + """ Return a new Table which is a snapshot of this table at the specified time. + + Args: + at: the time of the snapshot. This can be a Python datetime (absolute) or timedelta + (relative to current time). The result must be after the table was created and no more + than seven days in the past. Passing None will get a reference the oldest snapshot. + + Note that using a datetime will get a snapshot at an absolute point in time, while + a timedelta will provide a varying snapshot; any queries issued against such a Table + will be done against a snapshot that has an age relative to the execution time of the + query. + + Returns: + A new Table object referencing the snapshot. + + Raises: + An exception if this Table is already decorated, or if the time specified is invalid. + """ + if self._name_parts.decorator != '': + raise Exception("Cannot use snapshot() on an already decorated table") + + value = Table._convert_decorator_time(at) + return Table(self._api, "%s@%s" % (self.full_name, str(value))) + + def window(self, begin, end=None): + """ Return a new Table limited to the rows added to this Table during the specified time range. + + Args: + begin: the start time of the window. This can be a Python datetime (absolute) or timedelta + (relative to current time). The result must be after the table was created and no more + than seven days in the past. + + Note that using a relative value will provide a varying snapshot, not a fixed + snapshot; any queries issued against such a Table will be done against a snapshot + that has an age relative to the execution time of the query. + + end: the end time of the snapshot; if None, then the current time is used. The types and + interpretation of values is as for start. + + Returns: + A new Table object referencing the window. + + Raises: + An exception if this Table is already decorated, or if the time specified is invalid. + """ + if self._name_parts.decorator != '': + raise Exception("Cannot use window() on an already decorated table") + + start = Table._convert_decorator_time(begin) + if end is None: + if isinstance(begin, timedelta): + end = timedelta(0) + else: + end = datetime.utcnow() + stop = Table._convert_decorator_time(end) + + # Both values must have the same sign + if (start > 0 >= stop) or (stop > 0 >= start): + raise Exception("window: Between arguments must both be absolute or relative: %s, %s" % + (str(begin), str(end))) + + # start must be less than stop + if start > stop: + raise Exception("window: Between arguments: begin must be before end: %s, %s" % + (str(begin), str(end))) + + return Table(self._api, "%s@%s-%s" % (self.full_name, str(start), str(stop))) from ._query import Query as _Query diff --git a/sources/sdk/pygcp/gcp/bigquery/_utils.py b/sources/sdk/pygcp/gcp/bigquery/_utils.py index 7b858dda3..e450216ff 100644 --- a/sources/sdk/pygcp/gcp/bigquery/_utils.py +++ b/sources/sdk/pygcp/gcp/bigquery/_utils.py @@ -19,22 +19,23 @@ DataSetName = collections.namedtuple('DataSetName', ['project_id', 'dataset_id']) -TableName = collections.namedtuple('TableName', ['project_id', 'dataset_id', 'table_id']) +TableName = collections.namedtuple('TableName', + ['project_id', 'dataset_id', 'table_id', 'decorator']) # Absolute project-qualified name pattern: : -_ABS_DATASET_NAME_PATTERN = r'^([a-z0-9\-_\.:]+)\:([a-zA-Z0-9_]+)$' +_ABS_DATASET_NAME_PATTERN = r'^([a-z\d\-_\.:]+)\:(\w+)$' # Relative name pattern: -_REL_DATASET_NAME_PATTERN = r'^([a-zA-Z0-9_]+)$' +_REL_DATASET_NAME_PATTERN = r'^(\w+)$' # Absolute project-qualified name pattern: :. -_ABS_TABLE_NAME_PATTERN = r'^([a-z0-9\-_\.:]+)\:([a-zA-Z0-9_]+)\.([a-zA-Z0-9_]+)$' +_ABS_TABLE_NAME_PATTERN = r'^([a-z\d\-_\.:]+)\:(\w+)\.(\w+)(@[\d\-]+)?$' # Relative name pattern: .
-_REL_TABLE_NAME_PATTERN = r'^([a-zA-Z0-9_]+)\.([a-zA-Z0-9_]+)$' +_REL_TABLE_NAME_PATTERN = r'^(\w+)\.(\w+)(@[\d\-]+)?$' -# Table-only name pattern:
-_TABLE_NAME_PATTERN = r'^([a-zA-Z0-9_]+)$' +# Table-only name pattern:
. Includes an optional decorator. +_TABLE_NAME_PATTERN = r'^(\w+)(@[\d\-]+)$' def parse_dataset_name(name, project_id=None): @@ -99,24 +100,26 @@ def parse_table_name(name, project_id=None, dataset_id=None): Raises: Exception: raised if the name doesn't match the expected formats. """ - _project_id = _dataset_id = _table_id = None + _project_id = _dataset_id = _table_id = _decorator = None if isinstance(name, basestring): # Try to parse as absolute name first. m = re.match(_ABS_TABLE_NAME_PATTERN, name, re.IGNORECASE) if m is not None: - _project_id, _dataset_id, _table_id = m.groups() + _project_id, _dataset_id, _table_id, _decorator = m.groups() else: # Next try to match as a relative name implicitly scoped within current project. m = re.match(_REL_TABLE_NAME_PATTERN, name) if m is not None: groups = m.groups() - _project_id, _dataset_id, _table_id = project_id, groups[0], groups[1] + _project_id, _dataset_id, _table_id, _decorator =\ + project_id, groups[0], groups[1], groups[2] else: # Finally try to match as a table name only. m = re.match(_TABLE_NAME_PATTERN, name) if m is not None: groups = m.groups() - _project_id, _dataset_id, _table_id = project_id, dataset_id, groups[0] + _project_id, _dataset_id, _table_id, _decorator =\ + project_id, dataset_id, groups[0], groups[1] elif isinstance(name, dict): try: _table_id = name['table_id'] @@ -126,7 +129,9 @@ def parse_table_name(name, project_id=None, dataset_id=None): pass else: # Try treat as an array or tuple - if len(name) == 3: + if len(name) == 4: + _project_id, _dataset_id, _table_id, _decorator = name + elif len(name) == 3: _project_id, _dataset_id, _table_id = name elif len(name) == 2: _dataset_id, _table_id = name @@ -136,5 +141,7 @@ def parse_table_name(name, project_id=None, dataset_id=None): _project_id = project_id if not _dataset_id: _dataset_id = dataset_id + if not _decorator: + _decorator = '' - return TableName(_project_id, _dataset_id, _table_id) + return TableName(_project_id, _dataset_id, _table_id, _decorator) diff --git a/sources/sdk/pygcp/tests/bq_table_tests.py b/sources/sdk/pygcp/tests/bq_table_tests.py index bdc0387b0..9e078fcc8 100644 --- a/sources/sdk/pygcp/tests/bq_table_tests.py +++ b/sources/sdk/pygcp/tests/bq_table_tests.py @@ -27,6 +27,7 @@ def _check_name_parts(self, table): self.assertEqual('test', parsed_name[0]) self.assertEqual('requestlogs', parsed_name[1]) self.assertEqual('today', parsed_name[2]) + self.assertEqual('', parsed_name[3]) self.assertEqual('[test:requestlogs.today]', table._repr_sql_()) def test_parse_full_name(self): @@ -47,7 +48,7 @@ def test_parse_dict_local_name(self): self._check_name_parts(table) def test_parse_named_tuple_name(self): - table = self._create_table(gcp.bigquery.tablename('test', 'requestlogs', 'today')) + table = self._create_table(gcp.bigquery.tablename('test', 'requestlogs', 'today', '')) self._check_name_parts(table) def test_parse_tuple_full_name(self): @@ -316,7 +317,7 @@ def test_insertAll_dataframe(self, result = table.insertAll(df) self.assertIsNotNone(result, "insertAll should return the table object") - mock_api_tabledata_insertAll.assert_called_with(('test', 'testds', 'testTable0'), [ + mock_api_tabledata_insertAll.assert_called_with(('test', 'testds', 'testTable0', ''), [ {'insertId': '#0', 'json': {u'column': 'r0', u'headers': 10.0, u'some': 0}}, {'insertId': '#1', 'json': {u'column': 'r1', u'headers': 10.0, u'some': 1}}, {'insertId': '#2', 'json': {u'column': 'r2', u'headers': 10.0, u'some': 2}}, @@ -356,7 +357,7 @@ def test_insertAll_dictlist(self, {u'column': 'r3', u'headers': 10.0, u'some': 3} ]) self.assertIsNotNone(result, "insertAll should return the table object") - mock_api_tabledata_insertAll.assert_called_with(('test', 'testds', 'testTable0'), [ + mock_api_tabledata_insertAll.assert_called_with(('test', 'testds', 'testTable0', ''), [ {'insertId': '#0', 'json': {u'column': 'r0', u'headers': 10.0, u'some': 0}}, {'insertId': '#1', 'json': {u'column': 'r1', u'headers': 10.0, u'some': 1}}, {'insertId': '#2', 'json': {u'column': 'r2', u'headers': 10.0, u'some': 2}}, @@ -396,7 +397,7 @@ def test_insertAll_dictlist_index(self, {u'column': 'r3', u'headers': 10.0, u'some': 3} ], include_index=True) self.assertIsNotNone(result, "insertAll should return the table object") - mock_api_tabledata_insertAll.assert_called_with(('test', 'testds', 'testTable0'), [ + mock_api_tabledata_insertAll.assert_called_with(('test', 'testds', 'testTable0', ''), [ {'insertId': '#0', 'json': {u'column': 'r0', u'headers': 10.0, u'some': 0, 'Index': 0}}, {'insertId': '#1', 'json': {u'column': 'r1', u'headers': 10.0, u'some': 1, 'Index': 1}}, {'insertId': '#2', 'json': {u'column': 'r2', u'headers': 10.0, u'some': 2, 'Index': 2}}, @@ -436,7 +437,7 @@ def test_insertAll_dictlist_named_index(self, {u'column': 'r3', u'headers': 10.0, u'some': 3} ], include_index=True, index_name='Row') self.assertIsNotNone(result, "insertAll should return the table object") - mock_api_tabledata_insertAll.assert_called_with(('test', 'testds', 'testTable0'), [ + mock_api_tabledata_insertAll.assert_called_with(('test', 'testds', 'testTable0', ''), [ {'insertId': '#0', 'json': {u'column': 'r0', u'headers': 10.0, u'some': 0, 'Row': 0}}, {'insertId': '#1', 'json': {u'column': 'r1', u'headers': 10.0, u'some': 1, 'Row': 1}}, {'insertId': '#2', 'json': {u'column': 'r2', u'headers': 10.0, u'some': 2, 'Row': 2}}, @@ -493,6 +494,83 @@ def test_encode_dict_as_row(self): row = gcp.bigquery._Table._encode_dict_as_row({'fo@o': 'b@r', 'b+ar': when}, {}) self.assertEqual({'foo': 'b@r', 'bar': '2001-02-03T04:05:06.000007'}, row) + def test_decorators(self): + tbl = gcp.bigquery.table('testds.testTable0', context=self._create_context()) + tbl2 = tbl.snapshot(dt.timedelta(hours=-1)) + self.assertEquals('test:testds.testTable0@-3600000', tbl2.full_name) + + with self.assertRaises(Exception) as error: + tbl2 = tbl2.snapshot(dt.timedelta(hours=-2)) + self.assertEqual('Cannot use snapshot() on an already decorated table', + error.exception[0]) + + with self.assertRaises(Exception) as error: + tbl2 = tbl2.window(dt.timedelta(hours=-2), 0) + self.assertEqual('Cannot use window() on an already decorated table', + error.exception[0]) + + with self.assertRaises(Exception) as error: + tbl2 = tbl.snapshot(dt.timedelta(days=-8)) + self.assertEqual('Invalid snapshot relative when argument: must be within 7 days: -8 days, 0:00:00', + error.exception[0]) + + with self.assertRaises(Exception) as error: + tbl2 = tbl.snapshot(dt.timedelta(days=-8)) + self.assertEqual('Invalid snapshot relative when argument: must be within 7 days: -8 days, 0:00:00', + error.exception[0]) + + tbl2 = tbl.snapshot(dt.timedelta(days=-1)) + self.assertEquals('test:testds.testTable0@-86400000', tbl2.full_name) + + with self.assertRaises(Exception) as error: + tbl2 = tbl.snapshot(dt.timedelta(days=1)) + self.assertEqual('Invalid snapshot relative when argument: 1 day, 0:00:00', + error.exception[0]) + + with self.assertRaises(Exception) as error: + tbl2 = tbl.snapshot(1000) + self.assertEqual('Invalid snapshot when argument type: 1000', + error.exception[0]) + + when = dt.datetime.utcnow() - dt.timedelta(1) + self.assertEquals('test:testds.testTable0@-86400000', tbl2.full_name) + + when = dt.datetime.utcnow() + dt.timedelta(1) + with self.assertRaises(Exception) as error: + tbl2 = tbl.snapshot(when) + self.assertEqual('Invalid snapshot absolute when argument: %s' % when, + error.exception[0]) + + when = dt.datetime.utcnow() - dt.timedelta(8) + with self.assertRaises(Exception) as error: + tbl2 = tbl.snapshot(when) + self.assertEqual('Invalid snapshot absolute when argument: %s' % when, + error.exception[0]) + + def test_window_decorators(self): + # The at test above already tests many of the conversion cases. The extra things we + # have to test are that we can use two values, we get a meaningful default for the second + # if we pass None, and that the first time comes before the second. + tbl = gcp.bigquery.table('testds.testTable0', context=self._create_context()) + + tbl2 = tbl.window(dt.timedelta(hours=-1)) + self.assertEquals('test:testds.testTable0@-3600000-0', tbl2.full_name) + + with self.assertRaises(Exception) as error: + tbl2 = tbl2.window(-400000, 0) + self.assertEqual('Cannot use window() on an already decorated table', + error.exception[0]) + + with self.assertRaises(Exception) as error: + tbl2 = tbl2.snapshot(-400000) + self.assertEqual('Cannot use snapshot() on an already decorated table', + error.exception[0]) + + with self.assertRaises(Exception) as error: + tbl2 = tbl.window(dt.timedelta(0), dt.timedelta(hours=-1)) + self.assertEqual('window: Between arguments: begin must be before end: 0:00:00, -1 day, 23:00:00', + error.exception[0]) + def _create_context(self): project_id = 'test' creds = AccessTokenCredentials('test_token', 'test_ua')