-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implementation of DB-API for BigQuery. #2921
Conversation
|
||
apilevel = "2.0" | ||
|
||
# Threads may share the module, but not connections. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
||
|
||
class Connection(object): | ||
"""Connection to Google BigQuery. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
return None | ||
|
||
rows, _, page_token = self._query_results.fetch_data( | ||
max_results=1, page_token=self._page_token) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
# infer types from parameter inputs. | ||
query_job = client.run_async_query(job_id, operation) | ||
query_job.use_legacy_sql = False | ||
query_job.begin() |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
self._has_fetched_all_rows = False | ||
client = self.connection._client | ||
job_id = str(uuid.uuid4()) | ||
# TODO: parameters: if not ``None``, check if ``dict`` or sequence and |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
if self._has_fetched_all_rows: | ||
return None | ||
|
||
rows, _, page_token = self._query_results.fetch_data( |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@@ -0,0 +1,49 @@ | |||
# Copyright 2016 Google Inc. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
thanks for the effort! i want to use big query with superset via sqlalchemy, i believe this is a good starting point, im willing to help out if needed! |
if job.state == 'DONE': | ||
if job.error_result: | ||
# TODO: raise a more specific exception, based on the error. | ||
# See: https://cloud.google.com/bigquery/troubleshooting-errors |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
||
def close(self): | ||
"""No-op.""" | ||
pass |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
pass | ||
|
||
def _set_description(self, schema): | ||
"""Set description from schema.""" |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
for field in schema: | ||
desc.append(tuple([ | ||
field.name, | ||
None, |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
self.description = tuple(desc) | ||
|
||
def execute(self, operation): | ||
"""Prepare and execute a database operation.""" |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
query_job.begin() | ||
_helpers.wait_for_job(query_job) | ||
self._query_results = query_job.results() | ||
_, total_rows, _ = self._query_results.fetch_data(max_results=0) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
self._has_fetched_all_rows = True | ||
|
||
self._page_token = page_token | ||
return rows[0] |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
bigquery/tests/system.py
Outdated
self.assertEqual(len(row), 1) | ||
self.assertEqual(row[0], example['expected']) | ||
row = Config.CURSOR.fetchone() | ||
self.assertIsNone(row) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
from google.cloud.bigquery import Client | ||
from google.cloud.bigquery.dbapi import connect | ||
from google.cloud.bigquery.dbapi import Connection | ||
connection = connect() |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
from google.cloud.bigquery.dbapi import Cursor | ||
connection = connect(_Client()) | ||
cursor = connection.cursor() | ||
row = cursor.fetchone() |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
Hold off on reviewing. I still need to address a few things from your last review & implement query parameters. |
I've pushed a new commit. Should be ready to review. (I'll be making a couple extra unit tests to make coverage report happy, but with the integration tests, I'm pretty confident this is working.) I believe I've addressed most of your comments. I'll file issues for the TODOs once we're confident the PR won't change much before merging. |
Coverage back @ 100%. @jonparrott PTAL |
or deprecation policy. | ||
""" | ||
|
||
from google.cloud.bigquery.dbapi.connection import connect # noqa |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
apilevel = "2.0" | ||
|
||
# Threads may share the module, but not connections. | ||
threadsafety = 1 |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
def scalar_to_query_parameter(name=None, value=None): | ||
"""Convert a scalar value into a query parameter. | ||
|
||
Note: the bytes type cannot be distinguished from a string in Python 2. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
for value in parameters: | ||
query_parameters.append(scalar_to_query_parameter(value=value)) | ||
|
||
return query_parameters |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
value = parameters[name] | ||
query_parameters.append(scalar_to_query_parameter(name, value)) | ||
|
||
return query_parameters |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
self.rowcount = total_rows | ||
|
||
def _format_operation_list(self, operation, parameters): | ||
"""Formats parameters in operation in way BigQuery expects. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
raise exceptions.ProgrammingError(ex) | ||
|
||
def _format_operation_dict(self, operation, parameters): | ||
"""Formats parameters in operation in way BigQuery expects. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
Timestamp = datetime.datetime | ||
DateFromTicks = datetime.date.fromtimestamp | ||
TimestampFromTicks = datetime.datetime.fromtimestamp | ||
Binary = bytes |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
bigquery/tests/system.py
Outdated
@@ -819,6 +901,93 @@ def test_sync_query_w_query_params(self): | |||
self.assertEqual(len(query.rows[0]), 1) | |||
self.assertEqual(query.rows[0][0], example['expected']) | |||
|
|||
def test_dbapi_w_query_parameters(self): | |||
EXAMPLES = [ |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
self.assertEqual(named_parameter.type_, expected_type, msg=msg) | ||
self.assertEqual(named_parameter.value, value, msg=msg) | ||
|
||
@unittest.skipIf(six.PY2, 'Bytes cannot be distinguished from string.') |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
d5e3c3e
to
1eddf20
Compare
return | ||
|
||
self.description = tuple([ | ||
Column( |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still reviewing (just got to cursor.py
) but here is some high-level janitorial type feedback:
General fixes needed "everywhere".
-
Make sure the copyright year is 2017
-
Use the name of the variable
:type foo: int :param foo: A foo to be ``bar``-ed.
instead of the current "everywhere" usage
:type: int :param foo: A foo to be ``bar``-ed.
-
Use a
:returns:
section everywhere you have
an:rtype:
-
Convert all of your "Raises ..." prose into
:raises:
Sphinx directives
@@ -0,0 +1,70 @@ | |||
# Copyright 2016 Google Inc. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
apilevel = "2.0" | ||
|
||
# Threads may share the module, but not connections. | ||
threadsafety = 1 |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
from google.cloud.bigquery.dbapi.types import STRING | ||
|
||
|
||
apilevel = "2.0" |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
# Threads may share the module, but not connections. | ||
threadsafety = 1 | ||
|
||
paramstyle = "pyformat" |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@@ -0,0 +1,131 @@ | |||
# Copyright 2016 Google Inc. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
:param parameters: Sequence of query parameter values. | ||
|
||
:rtype: | ||
list of :class:`~google.cloud.bigquery._helpers.AbstractQueryParameter` |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
def to_query_parameters_dict(parameters): | ||
"""Converts a dictionary of parameter values into query parameters. | ||
|
||
:type: Mapping[str, Any] |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
return [ | ||
scalar_to_query_parameter(value, name=name) | ||
for name, value | ||
in six.iteritems(parameters)] |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@@ -0,0 +1,56 @@ | |||
# Copyright 2016 Google Inc. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
elif isinstance(value, six.binary_type): | ||
parameter_type = 'BYTES' | ||
elif isinstance(value, datetime.datetime): | ||
parameter_type = 'TIMESTAMP' if value.tzinfo else 'DATETIME' |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am now down to the unit tests, do I have to look at them?
internal_size=None, | ||
precision=None, | ||
scale=None, | ||
null_ok=field.mode == 'NULLABLE') |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
||
try: | ||
return operation % tuple(formatted_params) | ||
except TypeError as ex: |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
total_rows = num_dml_affected_rows | ||
self.rowcount = total_rows | ||
|
||
def _format_operation_list(self, operation, parameters): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
:type: Sequence[Any] | ||
:param parameters: Sequence of parameter values. | ||
""" | ||
formatted_params = ['?' for _ in parameters] |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
""" | ||
formatted_params = {} | ||
for name in parameters: | ||
formatted_params[name] = '@{}'.format(name) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
bigquery/tests/system.py
Outdated
'UPDATE {}.{} ' | ||
'SET greeting = \'Guten Tag\' ' | ||
'WHERE greeting = \'Hello World\''.format( | ||
dataset_name, table_name)) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
bigquery/tests/system.py
Outdated
with _NamedTemporaryFile() as temp: | ||
with open(temp.name, 'w') as csv_write: | ||
writer = csv.writer(csv_write) | ||
writer.writerow(('Greeting')) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
bigquery/tests/system.py
Outdated
Config.CURSOR.execute( | ||
example['sql'], example['query_parameters']) | ||
except dbapi.DatabaseError as ex: | ||
raise dbapi.DatabaseError('{} {}'.format(ex, msg)) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
self.assertEqual(len(row), 1, msg=msg) | ||
self.assertEqual(row[0], example['expected'], msg=msg) | ||
row = Config.CURSOR.fetchone() | ||
self.assertIsNone(row, msg=msg) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
bigquery/tests/system.py
Outdated
@@ -838,7 +1009,6 @@ def test_large_query_w_public_data(self): | |||
SQL = 'SELECT * from `{}.{}.{}` LIMIT {}'.format( | |||
PUBLIC, DATASET_NAME, TABLE_NAME, LIMIT) | |||
|
|||
dataset = Config.CLIENT.dataset(DATASET_NAME, project=PUBLIC) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
Oops, I think GitHub sent my review early. I haven't uploaded my fixes yet. |
Okay. I just pushed my latest changes. |
Ready for another review pass when you get a chance. |
I believe this commit now covers all of the required implementation details in the PEP-249 DB-API specification.
- improved docstring formatting - used namedtuple for column descriptions
Docstring formatting.
I've rebased on the latest master. Okay to merge? |
The `google.cloud.bigquery.dbapi` package covers all of the required implementation details in the PEP-249 DB-API specification.
The `google.cloud.bigquery.dbapi` package covers all of the required implementation details in the PEP-249 DB-API specification.
The `google.cloud.bigquery.dbapi` package covers all of the required implementation details in the PEP-249 DB-API specification.
Implements
Cursor.execute()
andCursor.fetchone()
without supportfor query parameters.
Tested manually with a Jupyter notebook
Makes progress on #2434