From a04996c537e9d8847411fcbb1b05da5f175b339e Mon Sep 17 00:00:00 2001 From: Aravin <34178459+aravinsiva@users.noreply.github.com> Date: Fri, 21 Aug 2020 14:49:13 -0400 Subject: [PATCH] feat: add opentelemetry tracing (#215) * testing first trace export * instrumention client.py * instrumenting job.py and adding documentation * reconfiguring imports * quick cleanup of unused variable * adding more attributes in module and limiting complexity of instrumentation * adding tests, nox and correct attribute additions in client & job * adding tests, nox and correct attribute additions in client & job (left out of last commit) * linting * reformatting noxfile.[y * addressing suggested changes * adding suggested changes * removing print statements * setting same version across all OT [ackages and other reccommended changes * suggested changes * fixing packages issue in nox and updating documentation * fixing module install issue * restructuring design for testing adding first layer of tests (some still failing) * adding reamining client tests and all job tests * fixing linting issues * fixing trace not defined issue * fixing lint issues * fixing documentation issues and python2 testing issue * linting and fixing coverage issues * adding suggested changes * linting * adding Shawn's suggested changes * fixing _default_span_attribute_bug * reverting uneccesxsary changes * adding more tests for all job_ref parameters * removing dependecny, ordering imports and other changes * addressing Shawn concerns * adding test and suggested changes * adding opentelemetry to setup.py and other suggested changes * adding reasoning for not adding to [all] * linting * adding Tim suggested changes Co-authored-by: Tim Swast --- README.rst | 38 + google/cloud/bigquery/client.py | 317 +++++--- google/cloud/bigquery/job.py | 71 +- .../cloud/bigquery/opentelemetry_tracing.py | 122 ++++ noxfile.py | 5 + setup.py | 11 +- tests/unit/test_client.py | 689 ++++++++++++++---- tests/unit/test_job.py | 360 +++++++-- tests/unit/test_opentelemetry_tracing.py | 212 ++++++ 9 files changed, 1506 insertions(+), 319 deletions(-) create mode 100644 google/cloud/bigquery/opentelemetry_tracing.py create mode 100644 tests/unit/test_opentelemetry_tracing.py diff --git a/README.rst b/README.rst index 8f73576d6..c6bc17834 100644 --- a/README.rst +++ b/README.rst @@ -102,3 +102,41 @@ Perform a query for row in rows: print(row.name) + +Instrumenting With OpenTelemetry +-------------------------------- + +This application uses `OpenTelemetry`_ to output tracing data from +API calls to BigQuery. To enable OpenTelemetry tracing in +the BigQuery client the following PyPI packages need to be installed: + +.. _OpenTelemetry: https://opentelemetry.io + +.. code-block:: console + + pip install google-cloud-bigquery[opentelemetry] opentelemetry-exporter-google-cloud + +After installation, OpenTelemetry can be used in the BigQuery +client and in BigQuery jobs. First, however, an exporter must be +specified for where the trace data will be outputted to. An +example of this can be found here: + +.. code-block:: python + + from opentelemetry import trace + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import BatchExportSpanProcessor + from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter + trace.set_tracer_provider(TracerProvider()) + trace.get_tracer_provider().add_span_processor( + BatchExportSpanProcessor(CloudTraceSpanExporter()) + ) + +In this example all tracing data will be published to the Google +`Cloud Trace`_ console. For more information on OpenTelemetry, please consult the `OpenTelemetry documentation`_. + +.. _OpenTelemetry documentation: https://opentelemetry-python.readthedocs.io +.. _Cloud Trace: https://cloud.google.com/trace + + + diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 52ddffe7d..fbbfda051 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -63,6 +63,7 @@ from google.cloud.bigquery.dataset import DatasetListItem from google.cloud.bigquery.dataset import DatasetReference from google.cloud.bigquery.exceptions import PyarrowMissingWarning +from google.cloud.bigquery.opentelemetry_tracing import create_span from google.cloud.bigquery import job from google.cloud.bigquery.model import Model from google.cloud.bigquery.model import ModelReference @@ -246,8 +247,15 @@ def get_service_account_email( if project is None: project = self.project path = "/projects/%s/serviceAccount" % (project,) - - api_response = self._call_api(retry, method="GET", path=path, timeout=timeout) + span_attributes = {"path": path} + with create_span( + name="BigQuery.getServiceAccountEmail", + attributes=span_attributes, + client=self, + ): + api_response = self._call_api( + retry, method="GET", path=path, timeout=timeout + ) return api_response["email"] def list_projects( @@ -471,9 +479,13 @@ def create_dataset( data["location"] = self.location try: - api_response = self._call_api( - retry, method="POST", path=path, data=data, timeout=timeout - ) + span_attributes = {"path": path} + with create_span( + name="BigQuery.createDataset", attributes=span_attributes, client=self + ): + api_response = self._call_api( + retry, method="POST", path=path, data=data, timeout=timeout + ) return Dataset.from_api_repr(api_response) except google.api_core.exceptions.Conflict: if not exists_ok: @@ -515,9 +527,13 @@ def create_routine( ) resource = routine.to_api_repr() try: - api_response = self._call_api( - retry, method="POST", path=path, data=resource, timeout=timeout - ) + span_attributes = {"path": path} + with create_span( + name="BigQuery.createRoutine", attributes=span_attributes, client=self + ): + api_response = self._call_api( + retry, method="POST", path=path, data=resource, timeout=timeout + ) return Routine.from_api_repr(api_response) except google.api_core.exceptions.Conflict: if not exists_ok: @@ -558,13 +574,17 @@ def create_table(self, table, exists_ok=False, retry=DEFAULT_RETRY, timeout=None If the table already exists. """ table = _table_arg_to_table(table, default_project=self.project) - - path = "/projects/%s/datasets/%s/tables" % (table.project, table.dataset_id) + dataset_id = table.dataset_id + path = "/projects/%s/datasets/%s/tables" % (table.project, dataset_id) data = table.to_api_repr() try: - api_response = self._call_api( - retry, method="POST", path=path, data=data, timeout=timeout - ) + span_attributes = {"path": path, "dataset_id": dataset_id} + with create_span( + name="BigQuery.createTable", attributes=span_attributes, client=self + ): + api_response = self._call_api( + retry, method="POST", path=path, data=data, timeout=timeout + ) return Table.from_api_repr(api_response) except google.api_core.exceptions.Conflict: if not exists_ok: @@ -603,10 +623,14 @@ def get_dataset(self, dataset_ref, retry=DEFAULT_RETRY, timeout=None): dataset_ref = DatasetReference.from_string( dataset_ref, default_project=self.project ) - - api_response = self._call_api( - retry, method="GET", path=dataset_ref.path, timeout=timeout - ) + path = dataset_ref.path + span_attributes = {"path": path} + with create_span( + name="BigQuery.getDataset", attributes=span_attributes, client=self + ): + api_response = self._call_api( + retry, method="GET", path=path, timeout=timeout + ) return Dataset.from_api_repr(api_response) def get_iam_policy( @@ -621,10 +645,13 @@ def get_iam_policy( body = {"options": {"requestedPolicyVersion": 1}} path = "{}:getIamPolicy".format(table.path) - - response = self._call_api( - retry, method="POST", path=path, data=body, timeout=timeout, - ) + span_attributes = {"path": path} + with create_span( + name="BigQuery.getIamPolicy", attributes=span_attributes, client=self + ): + response = self._call_api( + retry, method="POST", path=path, data=body, timeout=timeout, + ) return Policy.from_api_repr(response) @@ -643,10 +670,13 @@ def set_iam_policy( body["updateMask"] = updateMask path = "{}:setIamPolicy".format(table.path) - - response = self._call_api( - retry, method="POST", path=path, data=body, timeout=timeout, - ) + span_attributes = {"path": path} + with create_span( + name="BigQuery.setIamPolicy", attributes=span_attributes, client=self + ): + response = self._call_api( + retry, method="POST", path=path, data=body, timeout=timeout, + ) return Policy.from_api_repr(response) @@ -659,10 +689,13 @@ def test_iam_permissions( body = {"permissions": permissions} path = "{}:testIamPermissions".format(table.path) - - response = self._call_api( - retry, method="POST", path=path, data=body, timeout=timeout, - ) + span_attributes = {"path": path} + with create_span( + name="BigQuery.testIamPermissions", attributes=span_attributes, client=self + ): + response = self._call_api( + retry, method="POST", path=path, data=body, timeout=timeout, + ) return response @@ -691,10 +724,14 @@ def get_model(self, model_ref, retry=DEFAULT_RETRY, timeout=None): model_ref = ModelReference.from_string( model_ref, default_project=self.project ) - - api_response = self._call_api( - retry, method="GET", path=model_ref.path, timeout=timeout - ) + path = model_ref.path + span_attributes = {"path": path} + with create_span( + name="BigQuery.getModel", attributes=span_attributes, client=self + ): + api_response = self._call_api( + retry, method="GET", path=path, timeout=timeout + ) return Model.from_api_repr(api_response) def get_routine(self, routine_ref, retry=DEFAULT_RETRY, timeout=None): @@ -724,10 +761,14 @@ def get_routine(self, routine_ref, retry=DEFAULT_RETRY, timeout=None): routine_ref = RoutineReference.from_string( routine_ref, default_project=self.project ) - - api_response = self._call_api( - retry, method="GET", path=routine_ref.path, timeout=timeout - ) + path = routine_ref.path + span_attributes = {"path": path} + with create_span( + name="BigQuery.getRoutine", attributes=span_attributes, client=self + ): + api_response = self._call_api( + retry, method="GET", path=path, timeout=timeout + ) return Routine.from_api_repr(api_response) def get_table(self, table, retry=DEFAULT_RETRY, timeout=None): @@ -754,9 +795,14 @@ def get_table(self, table, retry=DEFAULT_RETRY, timeout=None): A ``Table`` instance. """ table_ref = _table_arg_to_table_ref(table, default_project=self.project) - api_response = self._call_api( - retry, method="GET", path=table_ref.path, timeout=timeout - ) + path = table_ref.path + span_attributes = {"path": path} + with create_span( + name="BigQuery.getTable", attributes=span_attributes, client=self + ): + api_response = self._call_api( + retry, method="GET", path=path, timeout=timeout + ) return Table.from_api_repr(api_response) def update_dataset(self, dataset, fields, retry=DEFAULT_RETRY, timeout=None): @@ -793,14 +839,20 @@ def update_dataset(self, dataset, fields, retry=DEFAULT_RETRY, timeout=None): headers = {"If-Match": dataset.etag} else: headers = None - api_response = self._call_api( - retry, - method="PATCH", - path=dataset.path, - data=partial, - headers=headers, - timeout=timeout, - ) + path = dataset.path + span_attributes = {"path": path, "fields": fields} + + with create_span( + name="BigQuery.updateDataset", attributes=span_attributes, client=self + ): + api_response = self._call_api( + retry, + method="PATCH", + path=path, + data=partial, + headers=headers, + timeout=timeout, + ) return Dataset.from_api_repr(api_response) def update_model(self, model, fields, retry=DEFAULT_RETRY, timeout=None): @@ -836,14 +888,20 @@ def update_model(self, model, fields, retry=DEFAULT_RETRY, timeout=None): headers = {"If-Match": model.etag} else: headers = None - api_response = self._call_api( - retry, - method="PATCH", - path=model.path, - data=partial, - headers=headers, - timeout=timeout, - ) + path = model.path + span_attributes = {"path": path, "fields": fields} + + with create_span( + name="BigQuery.updateModel", attributes=span_attributes, client=self + ): + api_response = self._call_api( + retry, + method="PATCH", + path=path, + data=partial, + headers=headers, + timeout=timeout, + ) return Model.from_api_repr(api_response) def update_routine(self, routine, fields, retry=DEFAULT_RETRY, timeout=None): @@ -890,14 +948,20 @@ def update_routine(self, routine, fields, retry=DEFAULT_RETRY, timeout=None): # TODO: remove when routines update supports partial requests. partial["routineReference"] = routine.reference.to_api_repr() - api_response = self._call_api( - retry, - method="PUT", - path=routine.path, - data=partial, - headers=headers, - timeout=timeout, - ) + path = routine.path + span_attributes = {"path": path, "fields": fields} + + with create_span( + name="BigQuery.updateRoutine", attributes=span_attributes, client=self + ): + api_response = self._call_api( + retry, + method="PUT", + path=path, + data=partial, + headers=headers, + timeout=timeout, + ) return Routine.from_api_repr(api_response) def update_table(self, table, fields, retry=DEFAULT_RETRY, timeout=None): @@ -933,14 +997,21 @@ def update_table(self, table, fields, retry=DEFAULT_RETRY, timeout=None): headers = {"If-Match": table.etag} else: headers = None - api_response = self._call_api( - retry, - method="PATCH", - path=table.path, - data=partial, - headers=headers, - timeout=timeout, - ) + + path = table.path + span_attributes = {"path": path, "fields": fields} + + with create_span( + name="BigQuery.updateTable", attributes=span_attributes, client=self + ): + api_response = self._call_api( + retry, + method="PATCH", + path=path, + data=partial, + headers=headers, + timeout=timeout, + ) return Table.from_api_repr(api_response) def list_models( @@ -1183,17 +1254,24 @@ def delete_dataset( raise TypeError("dataset must be a Dataset or a DatasetReference") params = {} + path = dataset.path if delete_contents: params["deleteContents"] = "true" + span_attributes = {"path": path, "deleteContents": delete_contents} + else: + span_attributes = {"path": path} try: - self._call_api( - retry, - method="DELETE", - path=dataset.path, - query_params=params, - timeout=timeout, - ) + with create_span( + name="BigQuery.deleteDataset", attributes=span_attributes, client=self + ): + self._call_api( + retry, + method="DELETE", + path=path, + query_params=params, + timeout=timeout, + ) except google.api_core.exceptions.NotFound: if not not_found_ok: raise @@ -1231,8 +1309,13 @@ def delete_model( if not isinstance(model, (Model, ModelReference)): raise TypeError("model must be a Model or a ModelReference") + path = model.path try: - self._call_api(retry, method="DELETE", path=model.path, timeout=timeout) + span_attributes = {"path": path} + with create_span( + name="BigQuery.deleteModel", attributes=span_attributes, client=self + ): + self._call_api(retry, method="DELETE", path=path, timeout=timeout) except google.api_core.exceptions.NotFound: if not not_found_ok: raise @@ -1268,12 +1351,17 @@ def delete_routine( routine = RoutineReference.from_string( routine, default_project=self.project ) + path = routine.path if not isinstance(routine, (Routine, RoutineReference)): raise TypeError("routine must be a Routine or a RoutineReference") try: - self._call_api(retry, method="DELETE", path=routine.path, timeout=timeout) + span_attributes = {"path": path} + with create_span( + name="BigQuery.deleteRoutine", attributes=span_attributes, client=self + ): + self._call_api(retry, method="DELETE", path=path, timeout=timeout) except google.api_core.exceptions.NotFound: if not not_found_ok: raise @@ -1310,7 +1398,12 @@ def delete_table( raise TypeError("Unable to get TableReference for table '{}'".format(table)) try: - self._call_api(retry, method="DELETE", path=table.path, timeout=timeout) + path = table.path + span_attributes = {"path": path} + with create_span( + name="BigQuery.deleteTable", attributes=span_attributes, client=self + ): + self._call_api(retry, method="DELETE", path=path, timeout=timeout) except google.api_core.exceptions.NotFound: if not not_found_ok: raise @@ -1358,9 +1451,18 @@ def _get_query_results( # This call is typically made in a polling loop that checks whether the # job is complete (from QueryJob.done(), called ultimately from # QueryJob.result()). So we don't need to poll here. - resource = self._call_api( - retry, method="GET", path=path, query_params=extra_params, timeout=timeout - ) + span_attributes = {"path": path} + + with create_span( + name="BigQuery.getQueryResults", attributes=span_attributes, client=self + ): + resource = self._call_api( + retry, + method="GET", + path=path, + query_params=extra_params, + timeout=timeout, + ) return _QueryResults.from_api_repr(resource) def job_from_resource(self, resource): @@ -1504,9 +1606,18 @@ def get_job( path = "/projects/{}/jobs/{}".format(project, job_id) - resource = self._call_api( - retry, method="GET", path=path, query_params=extra_params, timeout=timeout - ) + span_attributes = {"path": path, "job_id": job_id, "location": location} + + with create_span( + name="BigQuery.getJob", attributes=span_attributes, client=self + ): + resource = self._call_api( + retry, + method="GET", + path=path, + query_params=extra_params, + timeout=timeout, + ) return self.job_from_resource(resource) @@ -1553,9 +1664,18 @@ def cancel_job( path = "/projects/{}/jobs/{}/cancel".format(project, job_id) - resource = self._call_api( - retry, method="POST", path=path, query_params=extra_params, timeout=timeout - ) + span_attributes = {"path": path, "job_id": job_id, "location": location} + + with create_span( + name="BigQuery.cancelJob", attributes=span_attributes, client=self + ): + resource = self._call_api( + retry, + method="POST", + path=path, + query_params=extra_params, + timeout=timeout, + ) return self.job_from_resource(resource["job"]) @@ -2730,14 +2850,15 @@ def insert_rows_json( if template_suffix is not None: data["templateSuffix"] = template_suffix + path = "%s/insertAll" % table.path # We can always retry, because every row has an insert ID. - response = self._call_api( - retry, - method="POST", - path="%s/insertAll" % table.path, - data=data, - timeout=timeout, - ) + span_attributes = {"path": path} + with create_span( + name="BigQuery.insertRowsJson", attributes=span_attributes, client=self + ): + response = self._call_api( + retry, method="POST", path=path, data=data, timeout=timeout, + ) errors = [] for error in response.get("insertErrors", ()): diff --git a/google/cloud/bigquery/job.py b/google/cloud/bigquery/job.py index 753307b2a..a8e0c25ed 100644 --- a/google/cloud/bigquery/job.py +++ b/google/cloud/bigquery/job.py @@ -34,6 +34,7 @@ from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration from google.cloud.bigquery.external_config import ExternalConfig from google.cloud.bigquery.external_config import HivePartitioningOptions +from google.cloud.bigquery.opentelemetry_tracing import create_span from google.cloud.bigquery import _helpers from google.cloud.bigquery.query import _query_param_from_api_repr from google.cloud.bigquery.query import ArrayQueryParameter @@ -634,9 +635,17 @@ def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None): # jobs.insert is idempotent because we ensure that every new # job has an ID. - api_response = client._call_api( - retry, method="POST", path=path, data=self.to_api_repr(), timeout=timeout - ) + span_attributes = {"path": path} + with create_span( + name="BigQuery.job.begin", attributes=span_attributes, job_ref=self + ): + api_response = client._call_api( + retry, + method="POST", + path=path, + data=self.to_api_repr(), + timeout=timeout, + ) self._set_properties(api_response) def exists(self, client=None, retry=DEFAULT_RETRY, timeout=None): @@ -665,13 +674,17 @@ def exists(self, client=None, retry=DEFAULT_RETRY, timeout=None): extra_params["location"] = self.location try: - client._call_api( - retry, - method="GET", - path=self.path, - query_params=extra_params, - timeout=timeout, - ) + span_attributes = {"path": self.path} + with create_span( + name="BigQuery.job.exists", attributes=span_attributes, job_ref=self + ): + client._call_api( + retry, + method="GET", + path=self.path, + query_params=extra_params, + timeout=timeout, + ) except NotFound: return False else: @@ -698,14 +711,17 @@ def reload(self, client=None, retry=DEFAULT_RETRY, timeout=None): extra_params = {} if self.location: extra_params["location"] = self.location - - api_response = client._call_api( - retry, - method="GET", - path=self.path, - query_params=extra_params, - timeout=timeout, - ) + span_attributes = {"path": self.path} + with create_span( + name="BigQuery.job.reload", attributes=span_attributes, job_ref=self + ): + api_response = client._call_api( + retry, + method="GET", + path=self.path, + query_params=extra_params, + timeout=timeout, + ) self._set_properties(api_response) def cancel(self, client=None, retry=DEFAULT_RETRY, timeout=None): @@ -732,13 +748,18 @@ def cancel(self, client=None, retry=DEFAULT_RETRY, timeout=None): if self.location: extra_params["location"] = self.location - api_response = client._call_api( - retry, - method="POST", - path="{}/cancel".format(self.path), - query_params=extra_params, - timeout=timeout, - ) + path = "{}/cancel".format(self.path) + span_attributes = {"path": path} + with create_span( + name="BigQuery.job.cancel", attributes=span_attributes, job_ref=self + ): + api_response = client._call_api( + retry, + method="POST", + path=path, + query_params=extra_params, + timeout=timeout, + ) self._set_properties(api_response["job"]) # The Future interface requires that we return True if the *attempt* # to cancel was successful. diff --git a/google/cloud/bigquery/opentelemetry_tracing.py b/google/cloud/bigquery/opentelemetry_tracing.py new file mode 100644 index 000000000..f7375c346 --- /dev/null +++ b/google/cloud/bigquery/opentelemetry_tracing.py @@ -0,0 +1,122 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from contextlib import contextmanager +from google.api_core.exceptions import GoogleAPICallError + +logger = logging.getLogger(__name__) +try: + from opentelemetry import trace + from opentelemetry.instrumentation.utils import http_status_to_canonical_code + from opentelemetry.trace.status import Status + + HAS_OPENTELEMETRY = True + +except ImportError: + logger.info( + "This service is instrumented using OpenTelemetry." + "OpenTelemetry could not be imported; please" + "add opentelemetry-api and opentelemetry-instrumentation" + "packages in order to get BigQuery Tracing data." + ) + + HAS_OPENTELEMETRY = False + +_default_attributes = { + "db.system": "BigQuery" +} # static, default values assigned to all spans + + +@contextmanager +def create_span(name, attributes=None, client=None, job_ref=None): + """Creates a ContextManager for a Span to be exported to the configured exporter. + If no configuration exists yields None. + + Args: + name (str): Name that will be set for the span being created + attributes (Optional[dict]): + Additional attributes that pertain to + the specific API call (i.e. not a default attribute) + client (Optional[google.cloud.bigquery.client.Client]): + Pass in a Client object to extract any attributes that may be + relevant to it and add them to the created spans. + job_ref (Optional[google.cloud.bigquery.job._AsyncJob]) + Pass in a _AsyncJob object to extract any attributes that may be + relevant to it and add them to the created spans. + + Yields: + opentelemetry.trace.Span: Yields the newly created Span. + + Raises: + google.api_core.exceptions.GoogleAPICallError: + Raised if a span could not be yielded or issue with call to + OpenTelemetry. + """ + final_attributes = _get_final_span_attributes(attributes, client, job_ref) + if not HAS_OPENTELEMETRY: + yield None + return + tracer = trace.get_tracer(__name__) + + # yield new span value + with tracer.start_as_current_span(name=name, attributes=final_attributes) as span: + try: + yield span + except GoogleAPICallError as error: + if error.code is not None: + span.set_status(Status(http_status_to_canonical_code(error.code))) + raise + + +def _get_final_span_attributes(attributes=None, client=None, job_ref=None): + final_attributes = {} + final_attributes.update(_default_attributes.copy()) + if client: + client_attributes = _set_client_attributes(client) + final_attributes.update(client_attributes) + if job_ref: + job_attributes = _set_job_attributes(job_ref) + final_attributes.update(job_attributes) + if attributes: + final_attributes.update(attributes) + return final_attributes + + +def _set_client_attributes(client): + return {"db.name": client.project, "location": client.location} + + +def _set_job_attributes(job_ref): + job_attributes = { + "db.name": job_ref.project, + "location": job_ref.location, + "num_child_jobs": job_ref.num_child_jobs, + "job_id": job_ref.job_id, + "parent_job_id": job_ref.parent_job_id, + "state": job_ref.state, + } + + job_attributes["hasErrors"] = job_ref.error_result is not None + + if job_ref.created is not None: + job_attributes["timeCreated"] = job_ref.created.isoformat() + + if job_ref.started is not None: + job_attributes["timeStarted"] = job_ref.started.isoformat() + + if job_ref.ended is not None: + job_attributes["timeEnded"] = job_ref.ended.isoformat() + + return job_attributes diff --git a/noxfile.py b/noxfile.py index 4664278f1..5db14c31f 100644 --- a/noxfile.py +++ b/noxfile.py @@ -48,6 +48,11 @@ def default(session): else: session.install("ipython") + # opentelemetry was not added to [all] because opentelemetry does not support Python 2. + # Exporter does not need to be in nox thus it has been added to README documentation + if session.python != "2.7": + session.install("-e", ".[opentelemetry]") + # Run py.test against the unit tests. session.run( "py.test", diff --git a/setup.py b/setup.py index 18bb78926..f30968364 100644 --- a/setup.py +++ b/setup.py @@ -62,15 +62,24 @@ "llvmlite<=0.34.0;python_version>='3.6'", "llvmlite<=0.31.0;python_version<'3.6'", ], + "opentelemetry": [ + "opentelemetry-api==0.9b0", + "opentelemetry-sdk==0.9b0", + "opentelemetry-instrumentation==0.9b0 ", + ], } all_extras = [] for extra in extras: - if extra == "fastparquet": + if extra in ( # Skip fastparquet from "all" because it is redundant with pyarrow and # creates a dependency on pre-release versions of numpy. See: # https://github.com/googleapis/google-cloud-python/issues/8549 + "fastparquet", + # Skip opentelemetry because the library is not compatible with Python 2. + "opentelemetry", + ): continue all_extras.extend(extras[extra]) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 8b63f7e57..271640dd5 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -40,6 +40,16 @@ import pandas except (ImportError, AttributeError): # pragma: NO COVER pandas = None +try: + import opentelemetry + from opentelemetry import trace + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import SimpleExportSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, + ) +except (ImportError, AttributeError): + opentelemetry = None try: import pyarrow except (ImportError, AttributeError): # pragma: NO COVER @@ -246,20 +256,25 @@ def test__get_query_results_miss_w_explicit_project_and_timeout(self): creds = _make_credentials() client = self._make_one(self.PROJECT, creds) conn = client._connection = make_connection() - + path = "/projects/other-project/queries/nothere" with self.assertRaises(NotFound): - client._get_query_results( - "nothere", - None, - project="other-project", - location=self.LOCATION, - timeout_ms=500, - timeout=42, - ) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + client._get_query_results( + "nothere", + None, + project="other-project", + location=self.LOCATION, + timeout_ms=500, + timeout=42, + ) + + final_attributes.assert_called_once_with({"path": path}, client, None) conn.api_request.assert_called_once_with( method="GET", - path="/projects/other-project/queries/nothere", + path=path, query_params={"maxResults": 0, "timeoutMs": 500, "location": self.LOCATION}, timeout=42, ) @@ -315,9 +330,12 @@ def test_get_service_account_email(self): email = "bq-123@bigquery-encryption.iam.gserviceaccount.com" resource = {"kind": "bigquery#getServiceAccountResponse", "email": email} conn = client._connection = make_connection(resource) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + service_account_email = client.get_service_account_email(timeout=7.5) - service_account_email = client.get_service_account_email(timeout=7.5) - + final_attributes.assert_called_once_with({"path": path}, client, None) conn.api_request.assert_called_once_with(method="GET", path=path, timeout=7.5) self.assertEqual(service_account_email, email) @@ -330,9 +348,12 @@ def test_get_service_account_email_w_alternate_project(self): email = "bq-123@bigquery-encryption.iam.gserviceaccount.com" resource = {"kind": "bigquery#getServiceAccountResponse", "email": email} conn = client._connection = make_connection(resource) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + service_account_email = client.get_service_account_email(project=project) - service_account_email = client.get_service_account_email(project=project) - + final_attributes.assert_called_once_with({"path": path}, client, None) conn.api_request.assert_called_once_with(method="GET", path=path, timeout=None) self.assertEqual(service_account_email, email) @@ -357,10 +378,14 @@ def test_get_service_account_email_w_custom_retry(self): ) with api_request_patcher as fake_api_request: - service_account_email = client.get_service_account_email( - retry=retry, timeout=7.5 - ) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + service_account_email = client.get_service_account_email( + retry=retry, timeout=7.5 + ) + final_attributes.assert_called_once_with({"path": api_path}, client, None) self.assertEqual( service_account_email, "bq-123@bigquery-encryption.iam.gserviceaccount.com" ) @@ -612,8 +637,12 @@ def test_get_dataset(self): } conn = client._connection = make_connection(resource) dataset_ref = DatasetReference(self.PROJECT, self.DS_ID) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + dataset = client.get_dataset(dataset_ref, timeout=7.5) - dataset = client.get_dataset(dataset_ref, timeout=7.5) + final_attributes.assert_called_once_with({"path": "/%s" % path}, client, None) conn.api_request.assert_called_once_with( method="GET", path="/%s" % path, timeout=7.5 @@ -625,35 +654,61 @@ def test_get_dataset(self): # Not a cloud API exception (missing 'errors' field). client._connection = make_connection(Exception(""), resource) with self.assertRaises(Exception): - client.get_dataset(dataset_ref) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + client.get_dataset(dataset_ref) + + final_attributes.assert_called_once_with({"path": "/%s" % path}, client, None) # Zero-length errors field. client._connection = make_connection(ServerError(""), resource) with self.assertRaises(ServerError): - client.get_dataset(dataset_ref) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + client.get_dataset(dataset_ref) + + final_attributes.assert_called_once_with({"path": "/%s" % path}, client, None) # Non-retryable reason. client._connection = make_connection( ServerError("", errors=[{"reason": "serious"}]), resource ) with self.assertRaises(ServerError): - client.get_dataset(dataset_ref) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + client.get_dataset(dataset_ref) + + final_attributes.assert_called_once_with({"path": "/%s" % path}, client, None) # Retryable reason, but retry is disabled. client._connection = make_connection( ServerError("", errors=[{"reason": "backendError"}]), resource ) with self.assertRaises(ServerError): - client.get_dataset(dataset_ref, retry=None) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + client.get_dataset(dataset_ref, retry=None) + + final_attributes.assert_called_once_with({"path": "/%s" % path}, client, None) # Retryable reason, default retry: success. client._connection = make_connection( ServerError("", errors=[{"reason": "backendError"}]), resource ) - dataset = client.get_dataset( - # Test with a string for dataset ID. - dataset_ref.dataset_id - ) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + dataset = client.get_dataset( + # Test with a string for dataset ID. + dataset_ref.dataset_id + ) + + final_attributes.assert_called_once_with({"path": "/%s" % path}, client, None) + self.assertEqual(dataset.dataset_id, self.DS_ID) @unittest.skipIf( @@ -713,8 +768,12 @@ def test_create_dataset_minimal(self): ds_ref = DatasetReference(self.PROJECT, self.DS_ID) before = Dataset(ds_ref) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + after = client.create_dataset(before, timeout=7.5) - after = client.create_dataset(before, timeout=7.5) + final_attributes.assert_called_once_with({"path": "/%s" % PATH}, client, None) self.assertEqual(after.dataset_id, self.DS_ID) self.assertEqual(after.project, self.PROJECT) @@ -775,8 +834,12 @@ def test_create_dataset_w_attrs(self): before.default_table_expiration_ms = 3600 before.location = LOCATION before.labels = LABELS + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + after = client.create_dataset(before) - after = client.create_dataset(before) + final_attributes.assert_called_once_with({"path": "/%s" % PATH}, client, None) self.assertEqual(after.dataset_id, self.DS_ID) self.assertEqual(after.project, self.PROJECT) @@ -826,8 +889,12 @@ def test_create_dataset_w_custom_property(self): ds_ref = DatasetReference(self.PROJECT, self.DS_ID) before = Dataset(ds_ref) before._properties["newAlphaProperty"] = "unreleased property" + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + after = client.create_dataset(before) - after = client.create_dataset(before) + final_attributes.assert_called_once_with({"path": path}, client, None) self.assertEqual(after.dataset_id, self.DS_ID) self.assertEqual(after.project, self.PROJECT) @@ -865,8 +932,12 @@ def test_create_dataset_w_client_location_wo_dataset_location(self): ds_ref = DatasetReference(self.PROJECT, self.DS_ID) before = Dataset(ds_ref) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + after = client.create_dataset(before) - after = client.create_dataset(before) + final_attributes.assert_called_once_with({"path": "/%s" % PATH}, client, None) self.assertEqual(after.dataset_id, self.DS_ID) self.assertEqual(after.project, self.PROJECT) @@ -908,8 +979,12 @@ def test_create_dataset_w_client_location_w_dataset_location(self): ds_ref = DatasetReference(self.PROJECT, self.DS_ID) before = Dataset(ds_ref) before.location = OTHER_LOCATION + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + after = client.create_dataset(before) - after = client.create_dataset(before) + final_attributes.assert_called_once_with({"path": "/%s" % PATH}, client, None) self.assertEqual(after.dataset_id, self.DS_ID) self.assertEqual(after.project, self.PROJECT) @@ -944,8 +1019,12 @@ def test_create_dataset_w_reference(self): project=self.PROJECT, credentials=creds, location=self.LOCATION ) conn = client._connection = make_connection(resource) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + dataset = client.create_dataset(DatasetReference(self.PROJECT, self.DS_ID)) - dataset = client.create_dataset(DatasetReference(self.PROJECT, self.DS_ID)) + final_attributes.assert_called_once_with({"path": path}, client, None) self.assertEqual(dataset.dataset_id, self.DS_ID) self.assertEqual(dataset.project, self.PROJECT) @@ -980,8 +1059,12 @@ def test_create_dataset_w_fully_qualified_string(self): project=self.PROJECT, credentials=creds, location=self.LOCATION ) conn = client._connection = make_connection(resource) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + dataset = client.create_dataset("{}.{}".format(self.PROJECT, self.DS_ID)) - dataset = client.create_dataset("{}.{}".format(self.PROJECT, self.DS_ID)) + final_attributes.assert_called_once_with({"path": path}, client, None) self.assertEqual(dataset.dataset_id, self.DS_ID) self.assertEqual(dataset.project, self.PROJECT) @@ -1016,8 +1099,12 @@ def test_create_dataset_w_string(self): project=self.PROJECT, credentials=creds, location=self.LOCATION ) conn = client._connection = make_connection(resource) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + dataset = client.create_dataset(self.DS_ID) - dataset = client.create_dataset(self.DS_ID) + final_attributes.assert_called_once_with({"path": path}, client, None) self.assertEqual(dataset.dataset_id, self.DS_ID) self.assertEqual(dataset.project, self.PROJECT) @@ -1067,8 +1154,12 @@ def test_create_dataset_alreadyexists_w_exists_ok_true(self): conn = client._connection = make_connection( google.api_core.exceptions.AlreadyExists("dataset already exists"), resource ) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + dataset = client.create_dataset(self.DS_ID, exists_ok=True) - dataset = client.create_dataset(self.DS_ID, exists_ok=True) + final_attributes.assert_called_with({"path": get_path}, client, None) self.assertEqual(dataset.dataset_id, self.DS_ID) self.assertEqual(dataset.project, self.PROJECT) @@ -1100,6 +1191,7 @@ def test_create_routine_w_minimal_resource(self): from google.cloud.bigquery.routine import RoutineReference creds = _make_credentials() + path = "/projects/test-routine-project/datasets/test_routines/routines" resource = { "routineReference": { "projectId": "test-routine-project", @@ -1111,14 +1203,15 @@ def test_create_routine_w_minimal_resource(self): conn = client._connection = make_connection(resource) full_routine_id = "test-routine-project.test_routines.minimal_routine" routine = Routine(full_routine_id) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + actual_routine = client.create_routine(routine, timeout=7.5) - actual_routine = client.create_routine(routine, timeout=7.5) + final_attributes.assert_called_once_with({"path": path}, client, None) conn.api_request.assert_called_once_with( - method="POST", - path="/projects/test-routine-project/datasets/test_routines/routines", - data=resource, - timeout=7.5, + method="POST", path=path, data=resource, timeout=7.5, ) self.assertEqual( actual_routine.reference, RoutineReference.from_string(full_routine_id) @@ -1132,12 +1225,54 @@ def test_create_routine_w_conflict(self): conn = client._connection = make_connection( google.api_core.exceptions.AlreadyExists("routine already exists") ) + path = "/projects/test-routine-project/datasets/test_routines/routines" + full_routine_id = "test-routine-project.test_routines.minimal_routine" + routine = Routine(full_routine_id) + + with pytest.raises(google.api_core.exceptions.AlreadyExists): + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + client.create_routine(routine) + + final_attributes.assert_called_once_with({"path": path}, client, None) + + resource = { + "routineReference": { + "projectId": "test-routine-project", + "datasetId": "test_routines", + "routineId": "minimal_routine", + } + } + conn.api_request.assert_called_once_with( + method="POST", path=path, data=resource, timeout=None, + ) + + @unittest.skipIf(opentelemetry is None, "Requires `opentelemetry`") + def test_span_status_is_set(self): + from google.cloud.bigquery.routine import Routine + + tracer_provider = TracerProvider() + memory_exporter = InMemorySpanExporter() + span_processor = SimpleExportSpanProcessor(memory_exporter) + tracer_provider.add_span_processor(span_processor) + trace.set_tracer_provider(tracer_provider) + + creds = _make_credentials() + client = self._make_one(project=self.PROJECT, credentials=creds) + conn = client._connection = make_connection( + google.api_core.exceptions.AlreadyExists("routine already exists") + ) + path = "/projects/test-routine-project/datasets/test_routines/routines" full_routine_id = "test-routine-project.test_routines.minimal_routine" routine = Routine(full_routine_id) with pytest.raises(google.api_core.exceptions.AlreadyExists): client.create_routine(routine) + span_list = memory_exporter.get_finished_spans() + self.assertTrue(span_list[0].status is not None) + resource = { "routineReference": { "projectId": "test-routine-project", @@ -1146,10 +1281,7 @@ def test_create_routine_w_conflict(self): } } conn.api_request.assert_called_once_with( - method="POST", - path="/projects/test-routine-project/datasets/test_routines/routines", - data=resource, - timeout=None, + method="POST", path=path, data=resource, timeout=None, ) def test_create_routine_w_conflict_exists_ok(self): @@ -1164,25 +1296,28 @@ def test_create_routine_w_conflict_exists_ok(self): "routineId": "minimal_routine", } } + path = "/projects/test-routine-project/datasets/test_routines/routines" + conn = client._connection = make_connection( google.api_core.exceptions.AlreadyExists("routine already exists"), resource ) full_routine_id = "test-routine-project.test_routines.minimal_routine" routine = Routine(full_routine_id) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + actual_routine = client.create_routine(routine, exists_ok=True) - actual_routine = client.create_routine(routine, exists_ok=True) + final_attributes.assert_called_with( + {"path": "%s/minimal_routine" % path}, client, None + ) self.assertEqual(actual_routine.project, "test-routine-project") self.assertEqual(actual_routine.dataset_id, "test_routines") self.assertEqual(actual_routine.routine_id, "minimal_routine") conn.api_request.assert_has_calls( [ - mock.call( - method="POST", - path="/projects/test-routine-project/datasets/test_routines/routines", - data=resource, - timeout=None, - ), + mock.call(method="POST", path=path, data=resource, timeout=None,), mock.call( method="GET", path="/projects/test-routine-project/datasets/test_routines/routines/minimal_routine", @@ -1202,8 +1337,14 @@ def test_create_table_w_day_partition(self): conn = client._connection = make_connection(resource) table = Table(self.TABLE_REF) table.time_partitioning = TimePartitioning() + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + got = client.create_table(table, timeout=7.5) - got = client.create_table(table, timeout=7.5) + final_attributes.assert_called_once_with( + {"path": "/%s" % path, "dataset_id": table.dataset_id}, client, None + ) conn.api_request.assert_called_once_with( method="POST", @@ -1235,8 +1376,14 @@ def test_create_table_w_custom_property(self): conn = client._connection = make_connection(resource) table = Table(self.TABLE_REF) table._properties["newAlphaProperty"] = "unreleased property" + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + got = client.create_table(table) - got = client.create_table(table) + final_attributes.assert_called_once_with( + {"path": "/%s" % path, "dataset_id": table.dataset_id}, client, None + ) conn.api_request.assert_called_once_with( method="POST", @@ -1270,8 +1417,14 @@ def test_create_table_w_encryption_configuration(self): table.encryption_configuration = EncryptionConfiguration( kms_key_name=self.KMS_KEY_NAME ) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + got = client.create_table(table) - got = client.create_table(table) + final_attributes.assert_called_once_with( + {"path": "/%s" % path, "dataset_id": table.dataset_id}, client, None + ) conn.api_request.assert_called_once_with( method="POST", @@ -1300,8 +1453,14 @@ def test_create_table_w_day_partition_and_expire(self): conn = client._connection = make_connection(resource) table = Table(self.TABLE_REF) table.time_partitioning = TimePartitioning(expiration_ms=100) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + got = client.create_table(table) - got = client.create_table(table) + final_attributes.assert_called_once_with( + {"path": "/%s" % path, "dataset_id": table.dataset_id}, client, None + ) conn.api_request.assert_called_once_with( method="POST", @@ -1359,7 +1518,14 @@ def test_create_table_w_schema_and_query(self): table = Table(self.TABLE_REF, schema=schema) table.view_query = query - got = client.create_table(table) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + got = client.create_table(table) + + final_attributes.assert_called_once_with( + {"path": "/%s" % path, "dataset_id": table.dataset_id}, client, None + ) conn.api_request.assert_called_once_with( method="POST", @@ -1420,7 +1586,14 @@ def test_create_table_w_external(self): ec.autodetect = True table.external_data_configuration = ec - got = client.create_table(table) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + got = client.create_table(table) + + final_attributes.assert_called_once_with( + {"path": "/%s" % path, "dataset_id": table.dataset_id}, client, None + ) conn.api_request.assert_called_once_with( method="POST", @@ -1454,7 +1627,16 @@ def test_create_table_w_reference(self): resource = self._make_table_resource() conn = client._connection = make_connection(resource) - got = client.create_table(self.TABLE_REF) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + got = client.create_table(self.TABLE_REF) + + final_attributes.assert_called_once_with( + {"path": "/%s" % path, "dataset_id": self.TABLE_REF.dataset_id}, + client, + None, + ) conn.api_request.assert_called_once_with( method="POST", @@ -1477,9 +1659,17 @@ def test_create_table_w_fully_qualified_string(self): client = self._make_one(project=self.PROJECT, credentials=creds) resource = self._make_table_resource() conn = client._connection = make_connection(resource) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + got = client.create_table( + "{}.{}.{}".format(self.PROJECT, self.DS_ID, self.TABLE_ID) + ) - got = client.create_table( - "{}.{}.{}".format(self.PROJECT, self.DS_ID, self.TABLE_ID) + final_attributes.assert_called_once_with( + {"path": "/%s" % path, "dataset_id": self.TABLE_REF.dataset_id}, + client, + None, ) conn.api_request.assert_called_once_with( @@ -1503,8 +1693,16 @@ def test_create_table_w_string(self): client = self._make_one(project=self.PROJECT, credentials=creds) resource = self._make_table_resource() conn = client._connection = make_connection(resource) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + got = client.create_table("{}.{}".format(self.DS_ID, self.TABLE_ID)) - got = client.create_table("{}.{}".format(self.DS_ID, self.TABLE_ID)) + final_attributes.assert_called_once_with( + {"path": "/%s" % path, "dataset_id": self.TABLE_REF.dataset_id}, + client, + None, + ) conn.api_request.assert_called_once_with( method="POST", @@ -1532,7 +1730,14 @@ def test_create_table_alreadyexists_w_exists_ok_false(self): ) with pytest.raises(google.api_core.exceptions.AlreadyExists): - client.create_table("{}.{}".format(self.DS_ID, self.TABLE_ID)) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + client.create_table("{}.{}".format(self.DS_ID, self.TABLE_ID)) + + final_attributes.assert_called_with( + {"path": post_path, "dataset_id": self.TABLE_REF.dataset_id}, client, None, + ) conn.api_request.assert_called_once_with( method="POST", @@ -1562,9 +1767,14 @@ def test_create_table_alreadyexists_w_exists_ok_true(self): google.api_core.exceptions.AlreadyExists("table already exists"), resource ) - got = client.create_table( - "{}.{}".format(self.DS_ID, self.TABLE_ID), exists_ok=True - ) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + got = client.create_table( + "{}.{}".format(self.DS_ID, self.TABLE_ID), exists_ok=True + ) + + final_attributes.assert_called_with({"path": get_path}, client, None) self.assertEqual(got.project, self.PROJECT) self.assertEqual(got.dataset_id, self.DS_ID) @@ -1619,7 +1829,12 @@ def test_get_model(self): conn = client._connection = make_connection(resource) model_ref = DatasetReference(self.PROJECT, self.DS_ID).model(self.MODEL_ID) - got = client.get_model(model_ref, timeout=7.5) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + got = client.get_model(model_ref, timeout=7.5) + + final_attributes.assert_called_once_with({"path": "/%s" % path}, client, None) conn.api_request.assert_called_once_with( method="GET", path="/%s" % path, timeout=7.5 @@ -1645,7 +1860,12 @@ def test_get_model_w_string(self): conn = client._connection = make_connection(resource) model_id = "{}.{}.{}".format(self.PROJECT, self.DS_ID, self.MODEL_ID) - got = client.get_model(model_id) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + got = client.get_model(model_id) + + final_attributes.assert_called_once_with({"path": "/%s" % path}, client, None) conn.api_request.assert_called_once_with( method="GET", path="/%s" % path, timeout=None @@ -1673,15 +1893,20 @@ def test_get_routine(self): }, "routineType": "SCALAR_FUNCTION", } + path = "/projects/test-routine-project/datasets/test_routines/routines/minimal_routine" + client = self._make_one(project=self.PROJECT, credentials=creds) conn = client._connection = make_connection(resource) - actual_routine = client.get_routine(routine, timeout=7.5) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + actual_routine = client.get_routine(routine, timeout=7.5) + + final_attributes.assert_called_once_with({"path": path}, client, None) conn.api_request.assert_called_once_with( - method="GET", - path="/projects/test-routine-project/datasets/test_routines/routines/minimal_routine", - timeout=7.5, + method="GET", path=path, timeout=7.5, ) self.assertEqual( actual_routine.reference, @@ -1710,7 +1935,12 @@ def test_get_table(self): client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) resource = self._make_table_resource() conn = client._connection = make_connection(resource) - table = client.get_table(self.TABLE_REF, timeout=7.5) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + table = client.get_table(self.TABLE_REF, timeout=7.5) + + final_attributes.assert_called_once_with({"path": "/%s" % path}, client, None) conn.api_request.assert_called_once_with( method="GET", path="/%s" % path, timeout=7.5 @@ -1786,8 +2016,12 @@ def test_get_iam_policy(self): http = object() client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) conn = client._connection = make_connection(RETURNED) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + policy = client.get_iam_policy(self.TABLE_REF, timeout=7.5) - policy = client.get_iam_policy(self.TABLE_REF, timeout=7.5) + final_attributes.assert_called_once_with({"path": PATH}, client, None) conn.api_request.assert_called_once_with( method="POST", path=PATH, data=BODY, timeout=7.5 @@ -1856,9 +2090,14 @@ def test_set_iam_policy(self): client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) conn = client._connection = make_connection(RETURNED) - returned_policy = client.set_iam_policy( - self.TABLE_REF, policy, updateMask=MASK, timeout=7.5 - ) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + returned_policy = client.set_iam_policy( + self.TABLE_REF, policy, updateMask=MASK, timeout=7.5 + ) + + final_attributes.assert_called_once_with({"path": PATH}, client, None) conn.api_request.assert_called_once_with( method="POST", path=PATH, data=BODY, timeout=7.5 @@ -1884,8 +2123,12 @@ def test_set_iam_policy_no_mask(self): http = object() client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) conn = client._connection = make_connection(RETURNED) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + client.set_iam_policy(self.TABLE_REF, policy, timeout=7.5) - client.set_iam_policy(self.TABLE_REF, policy, timeout=7.5) + final_attributes.assert_called_once_with({"path": PATH}, client, None) conn.api_request.assert_called_once_with( method="POST", path=PATH, data=BODY, timeout=7.5 @@ -1937,8 +2180,12 @@ def test_test_iam_permissions(self): http = object() client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) conn = client._connection = make_connection(RETURNED) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + client.test_iam_permissions(self.TABLE_REF, PERMISSIONS, timeout=7.5) - client.test_iam_permissions(self.TABLE_REF, PERMISSIONS, timeout=7.5) + final_attributes.assert_called_once_with({"path": PATH}, client, None) conn.api_request.assert_called_once_with( method="POST", path=PATH, data=BODY, timeout=7.5 @@ -2000,11 +2247,23 @@ def test_update_dataset(self): ds.default_table_expiration_ms = EXP ds.labels = LABELS ds.access_entries = [AccessEntry("OWNER", "userByEmail", "phred@example.com")] - ds2 = client.update_dataset( - ds, - ["description", "friendly_name", "location", "labels", "access_entries"], - timeout=7.5, + fields = [ + "description", + "friendly_name", + "location", + "labels", + "access_entries", + ] + + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + ds2 = client.update_dataset(ds, fields=fields, timeout=7.5,) + + final_attributes.assert_called_once_with( + {"path": "/%s" % PATH, "fields": fields}, client, None ) + conn.api_request.assert_called_once_with( method="PATCH", data={ @@ -2046,7 +2305,15 @@ def test_update_dataset_w_custom_property(self): dataset = Dataset(DatasetReference(self.PROJECT, self.DS_ID)) dataset._properties["newAlphaProperty"] = "unreleased property" - dataset = client.update_dataset(dataset, ["newAlphaProperty"]) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + dataset = client.update_dataset(dataset, ["newAlphaProperty"]) + + final_attributes.assert_called_once_with( + {"path": path, "fields": ["newAlphaProperty"]}, client, None + ) + conn.api_request.assert_called_once_with( method="PATCH", data={"newAlphaProperty": "unreleased property"}, @@ -2093,9 +2360,14 @@ def test_update_model(self): model.friendly_name = title model.expires = expires model.labels = {"x": "y"} + fields = ["description", "friendly_name", "labels", "expires"] + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + updated_model = client.update_model(model, fields, timeout=7.5) - updated_model = client.update_model( - model, ["description", "friendly_name", "labels", "expires"], timeout=7.5 + final_attributes.assert_called_once_with( + {"path": "/%s" % path, "fields": fields}, client, None ) sent = { @@ -2153,11 +2425,22 @@ def test_update_routine(self): routine.language = "SQL" routine.type_ = "SCALAR_FUNCTION" routine._properties["someNewField"] = "someValue" + fields = [ + "arguments", + "language", + "body", + "type_", + "return_type", + "someNewField", + ] - actual_routine = client.update_routine( - routine, - ["arguments", "language", "body", "type_", "return_type", "someNewField"], - timeout=7.5, + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + actual_routine = client.update_routine(routine, fields, timeout=7.5,) + + final_attributes.assert_called_once_with( + {"path": routine.path, "fields": fields}, client, None ) # TODO: routineReference isn't needed when the Routines API supports @@ -2177,7 +2460,15 @@ def test_update_routine(self): # ETag becomes If-Match header. routine._properties["etag"] = "im-an-etag" - client.update_routine(routine, []) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + client.update_routine(routine, []) + + final_attributes.assert_called_once_with( + {"path": routine.path, "fields": []}, client, None + ) + req = conn.api_request.call_args self.assertEqual(req[1]["headers"]["If-Match"], "im-an-etag") @@ -2228,9 +2519,15 @@ def test_update_table(self): table.description = description table.friendly_name = title table.labels = {"x": "y"} + fields = ["schema", "description", "friendly_name", "labels"] + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + updated_table = client.update_table(table, fields, timeout=7.5) + span_path = "/%s" % path - updated_table = client.update_table( - table, ["schema", "description", "friendly_name", "labels"], timeout=7.5 + final_attributes.assert_called_once_with( + {"path": span_path, "fields": fields}, client, None ) sent = { @@ -2264,7 +2561,15 @@ def test_update_table(self): # ETag becomes If-Match header. table._properties["etag"] = "etag" - client.update_table(table, []) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + client.update_table(table, []) + + final_attributes.assert_called_once_with( + {"path": "/%s" % path, "fields": []}, client, None + ) + req = conn.api_request.call_args self.assertEqual(req[1]["headers"]["If-Match"], "etag") @@ -2284,7 +2589,14 @@ def test_update_table_w_custom_property(self): table = Table(self.TABLE_REF) table._properties["newAlphaProperty"] = "unreleased property" - updated_table = client.update_table(table, ["newAlphaProperty"]) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + updated_table = client.update_table(table, ["newAlphaProperty"]) + + final_attributes.assert_called_once_with( + {"path": "/%s" % path, "fields": ["newAlphaProperty"]}, client, None, + ) conn.api_request.assert_called_once_with( method="PATCH", @@ -2312,8 +2624,14 @@ def test_update_table_only_use_legacy_sql(self): conn = client._connection = make_connection(resource) table = Table(self.TABLE_REF) table.view_use_legacy_sql = True + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + updated_table = client.update_table(table, ["view_use_legacy_sql"]) - updated_table = client.update_table(table, ["view_use_legacy_sql"]) + final_attributes.assert_called_once_with( + {"path": "/%s" % path, "fields": ["view_use_legacy_sql"]}, client, None, + ) conn.api_request.assert_called_once_with( method="PATCH", @@ -2376,8 +2694,14 @@ def test_update_table_w_query(self): table.view_query = query table.view_use_legacy_sql = True updated_properties = ["schema", "view_query", "expires", "view_use_legacy_sql"] + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + updated_table = client.update_table(table, updated_properties) - updated_table = client.update_table(table, updated_properties) + final_attributes.assert_called_once_with( + {"path": "/%s" % path, "fields": updated_properties}, client, None, + ) self.assertEqual(updated_table.schema, table.schema) self.assertEqual(updated_table.view_query, table.view_query) @@ -2420,17 +2744,30 @@ def test_update_table_w_schema_None(self): creds = _make_credentials() client = self._make_one(project=self.PROJECT, credentials=creds) conn = client._connection = make_connection(resource1, resource2) - table = client.get_table( - # Test with string for table ID - "{}.{}.{}".format( - self.TABLE_REF.project, - self.TABLE_REF.dataset_id, - self.TABLE_REF.table_id, + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + table = client.get_table( + # Test with string for table ID + "{}.{}.{}".format( + self.TABLE_REF.project, + self.TABLE_REF.dataset_id, + self.TABLE_REF.table_id, + ) ) - ) + + final_attributes.assert_called_once_with({"path": "/%s" % path}, client, None) + table.schema = None - updated_table = client.update_table(table, ["schema"]) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + updated_table = client.update_table(table, ["schema"]) + + final_attributes.assert_called_once_with( + {"path": "/%s" % path, "fields": ["schema"]}, client, None + ) self.assertEqual(len(conn.api_request.call_args_list), 2) req = conn.api_request.call_args_list[1] @@ -2460,11 +2797,30 @@ def test_update_table_delete_property(self): table = Table(self.TABLE_REF) table.description = description table.friendly_name = title - table2 = client.update_table(table, ["description", "friendly_name"]) + + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + table2 = client.update_table(table, ["description", "friendly_name"]) + + final_attributes.assert_called_once_with( + {"path": "/%s" % path, "fields": ["description", "friendly_name"]}, + client, + None, + ) + self.assertEqual(table2.description, table.description) table2.description = None - table3 = client.update_table(table2, ["description"]) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + table3 = client.update_table(table2, ["description"]) + + final_attributes.assert_called_once_with( + {"path": "/%s" % path, "fields": ["description"]}, client, None + ) + self.assertEqual(len(conn.api_request.call_args_list), 2) req = conn.api_request.call_args_list[1] self.assertEqual(req[1]["method"], "PATCH") @@ -2777,7 +3133,15 @@ def test_delete_dataset(self): client = self._make_one(project=self.PROJECT, credentials=creds) conn = client._connection = make_connection(*([{}] * len(datasets))) for arg in datasets: - client.delete_dataset(arg, timeout=7.5) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + client.delete_dataset(arg, timeout=7.5) + + final_attributes.assert_called_once_with( + {"path": "/%s" % PATH}, client, None + ) + conn.api_request.assert_called_with( method="DELETE", path="/%s" % PATH, query_params={}, timeout=7.5 ) @@ -2791,7 +3155,14 @@ def test_delete_dataset_delete_contents(self): conn = client._connection = make_connection({}, {}) ds_ref = DatasetReference(self.PROJECT, self.DS_ID) for arg in (ds_ref, Dataset(ds_ref)): - client.delete_dataset(arg, delete_contents=True) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + client.delete_dataset(arg, delete_contents=True) + + final_attributes.assert_called_once_with( + {"path": "/%s" % PATH, "deleteContents": True}, client, None + ) conn.api_request.assert_called_with( method="DELETE", path="/%s" % PATH, @@ -2817,7 +3188,12 @@ def test_delete_dataset_w_not_found_ok_false(self): ) with self.assertRaises(google.api_core.exceptions.NotFound): - client.delete_dataset(self.DS_ID) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + client.delete_dataset(self.DS_ID) + + final_attributes.assert_called_once_with({"path": path}, client, None) conn.api_request.assert_called_with( method="DELETE", path=path, query_params={}, timeout=None @@ -2832,7 +3208,12 @@ def test_delete_dataset_w_not_found_ok_true(self): google.api_core.exceptions.NotFound("dataset not found") ) - client.delete_dataset(self.DS_ID, not_found_ok=True) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + client.delete_dataset(self.DS_ID, not_found_ok=True) + + final_attributes.assert_called_once_with({"path": path}, client, None) conn.api_request.assert_called_with( method="DELETE", path=path, query_params={}, timeout=None @@ -2858,7 +3239,14 @@ def test_delete_model(self): conn = client._connection = make_connection(*([{}] * len(models))) for arg in models: - client.delete_model(arg, timeout=7.5) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + client.delete_model(arg, timeout=7.5) + + final_attributes.assert_called_once_with( + {"path": "/%s" % path}, client, None + ) conn.api_request.assert_called_with( method="DELETE", path="/%s" % path, timeout=7.5 ) @@ -2895,10 +3283,14 @@ def test_delete_model_w_not_found_ok_true(self): conn = client._connection = make_connection( google.api_core.exceptions.NotFound("model not found") ) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + client.delete_model( + "{}.{}".format(self.DS_ID, self.MODEL_ID), not_found_ok=True + ) - client.delete_model( - "{}.{}".format(self.DS_ID, self.MODEL_ID), not_found_ok=True - ) + final_attributes.assert_called_once_with({"path": path}, client, None) conn.api_request.assert_called_with(method="DELETE", path=path, timeout=None) @@ -2914,15 +3306,20 @@ def test_delete_routine(self): ] creds = _make_credentials() http = object() + path = "/projects/test-routine-project/datasets/test_routines/routines/minimal_routine" client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) conn = client._connection = make_connection(*([{}] * len(routines))) for routine in routines: - client.delete_routine(routine, timeout=7.5) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + client.delete_routine(routine, timeout=7.5) + + final_attributes.assert_called_once_with({"path": path}, client, None) + conn.api_request.assert_called_with( - method="DELETE", - path="/projects/test-routine-project/datasets/test_routines/routines/minimal_routine", - timeout=7.5, + method="DELETE", path=path, timeout=7.5, ) def test_delete_routine_w_wrong_type(self): @@ -2938,14 +3335,18 @@ def test_delete_routine_w_not_found_ok_false(self): conn = client._connection = make_connection( google.api_core.exceptions.NotFound("routine not found") ) + path = "/projects/routines-project/datasets/test_routines/routines/test_routine" with self.assertRaises(google.api_core.exceptions.NotFound): - client.delete_routine("routines-project.test_routines.test_routine") + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + client.delete_routine("routines-project.test_routines.test_routine") + + final_attributes.assert_called_once_with({"path": path}, client, None) conn.api_request.assert_called_with( - method="DELETE", - path="/projects/routines-project/datasets/test_routines/routines/test_routine", - timeout=None, + method="DELETE", path=path, timeout=None, ) def test_delete_routine_w_not_found_ok_true(self): @@ -2955,15 +3356,19 @@ def test_delete_routine_w_not_found_ok_true(self): conn = client._connection = make_connection( google.api_core.exceptions.NotFound("routine not found") ) + path = "/projects/routines-project/datasets/test_routines/routines/test_routine" - client.delete_routine( - "routines-project.test_routines.test_routine", not_found_ok=True - ) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + client.delete_routine( + "routines-project.test_routines.test_routine", not_found_ok=True + ) + + final_attributes.assert_called_once_with({"path": path}, client, None) conn.api_request.assert_called_with( - method="DELETE", - path="/projects/routines-project/datasets/test_routines/routines/test_routine", - timeout=None, + method="DELETE", path=path, timeout=None, ) def test_delete_table(self): @@ -2989,7 +3394,15 @@ def test_delete_table(self): conn = client._connection = make_connection(*([{}] * len(tables))) for arg in tables: - client.delete_table(arg, timeout=7.5) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + client.delete_table(arg, timeout=7.5) + + final_attributes.assert_called_once_with( + {"path": "/%s" % path}, client, None + ) + conn.api_request.assert_called_with( method="DELETE", path="/%s" % path, timeout=7.5 ) @@ -3012,7 +3425,12 @@ def test_delete_table_w_not_found_ok_false(self): ) with self.assertRaises(google.api_core.exceptions.NotFound): - client.delete_table("{}.{}".format(self.DS_ID, self.TABLE_ID)) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + client.delete_table("{}.{}".format(self.DS_ID, self.TABLE_ID)) + + final_attributes.assert_called_once_with({"path": path}, client, None) conn.api_request.assert_called_with(method="DELETE", path=path, timeout=None) @@ -3027,9 +3445,14 @@ def test_delete_table_w_not_found_ok_true(self): google.api_core.exceptions.NotFound("table not found") ) - client.delete_table( - "{}.{}".format(self.DS_ID, self.TABLE_ID), not_found_ok=True - ) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + client.delete_table( + "{}.{}".format(self.DS_ID, self.TABLE_ID), not_found_ok=True + ) + + final_attributes.assert_called_once_with({"path": path}, client, None) conn.api_request.assert_called_with(method="DELETE", path=path, timeout=None) diff --git a/tests/unit/test_job.py b/tests/unit/test_job.py index 9cd3631e1..d5497ffa8 100644 --- a/tests/unit/test_job.py +++ b/tests/unit/test_job.py @@ -620,15 +620,16 @@ def test__begin_defaults(self): builder.return_value = resource call_api = job._client._call_api = mock.Mock() call_api.return_value = resource + path = "/projects/{}/jobs".format(self.PROJECT) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job._begin() - job._begin() + final_attributes.assert_called_with({"path": path}, None, job) call_api.assert_called_once_with( - DEFAULT_RETRY, - method="POST", - path="/projects/{}/jobs".format(self.PROJECT), - data=resource, - timeout=None, + DEFAULT_RETRY, method="POST", path=path, data=resource, timeout=None, ) self.assertEqual(job._properties, resource) @@ -651,15 +652,16 @@ def test__begin_explicit(self): call_api = client._call_api = mock.Mock() call_api.return_value = resource retry = DEFAULT_RETRY.with_deadline(1) + path = "/projects/{}/jobs".format(self.PROJECT) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job._begin(client=client, retry=retry, timeout=7.5) - job._begin(client=client, retry=retry, timeout=7.5) + final_attributes.assert_called_with({"path": path}, None, job) call_api.assert_called_once_with( - retry, - method="POST", - path="/projects/{}/jobs".format(self.PROJECT), - data=resource, - timeout=7.5, + retry, method="POST", path=path, data=resource, timeout=7.5, ) self.assertEqual(job._properties, resource) @@ -672,7 +674,16 @@ def test_exists_defaults_miss(self): call_api = job._client._call_api = mock.Mock() call_api.side_effect = NotFound("testing") - self.assertFalse(job.exists()) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + self.assertFalse(job.exists()) + + final_attributes.assert_called_with( + {"path": "/projects/{}/jobs/{}".format(self.PROJECT, self.JOB_ID)}, + None, + job, + ) call_api.assert_called_once_with( DEFAULT_RETRY, @@ -699,8 +710,16 @@ def test_exists_explicit_hit(self): call_api = client._call_api = mock.Mock() call_api.return_value = resource retry = DEFAULT_RETRY.with_deadline(1) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + self.assertTrue(job.exists(client=client, retry=retry)) - self.assertTrue(job.exists(client=client, retry=retry)) + final_attributes.assert_called_with( + {"path": "/projects/{}/jobs/{}".format(self.PROJECT, self.JOB_ID)}, + None, + job, + ) call_api.assert_called_once_with( retry, @@ -716,8 +735,12 @@ def test_exists_w_timeout(self): PATH = "/projects/{}/jobs/{}".format(self.PROJECT, self.JOB_ID) job = self._set_properties_job() call_api = job._client._call_api = mock.Mock() + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job.exists(timeout=7.5) - job.exists(timeout=7.5) + final_attributes.assert_called_with({"path": PATH}, None, job) call_api.assert_called_once_with( DEFAULT_RETRY, @@ -742,8 +765,16 @@ def test_reload_defaults(self): job._properties["jobReference"]["location"] = self.LOCATION call_api = job._client._call_api = mock.Mock() call_api.return_value = resource + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job.reload() - job.reload() + final_attributes.assert_called_with( + {"path": "/projects/{}/jobs/{}".format(self.PROJECT, self.JOB_ID)}, + None, + job, + ) call_api.assert_called_once_with( DEFAULT_RETRY, @@ -771,8 +802,16 @@ def test_reload_explicit(self): call_api = client._call_api = mock.Mock() call_api.return_value = resource retry = DEFAULT_RETRY.with_deadline(1) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job.reload(client=client, retry=retry, timeout=4.2) - job.reload(client=client, retry=retry, timeout=4.2) + final_attributes.assert_called_with( + {"path": "/projects/{}/jobs/{}".format(self.PROJECT, self.JOB_ID)}, + None, + job, + ) call_api.assert_called_once_with( retry, @@ -796,8 +835,16 @@ def test_cancel_defaults(self): job = self._set_properties_job() job._properties["jobReference"]["location"] = self.LOCATION connection = job._client._connection = _make_connection(response) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + self.assertTrue(job.cancel()) - self.assertTrue(job.cancel()) + final_attributes.assert_called_with( + {"path": "/projects/{}/jobs/{}/cancel".format(self.PROJECT, self.JOB_ID)}, + None, + job, + ) connection.api_request.assert_called_once_with( method="POST", @@ -821,8 +868,16 @@ def test_cancel_explicit(self): job = self._set_properties_job() client = _make_client(project=other_project) connection = client._connection = _make_connection(response) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + self.assertTrue(job.cancel(client=client, timeout=7.5)) - self.assertTrue(job.cancel(client=client, timeout=7.5)) + final_attributes.assert_called_with( + {"path": "/projects/{}/jobs/{}/cancel".format(self.PROJECT, self.JOB_ID)}, + None, + job, + ) connection.api_request.assert_called_once_with( method="POST", @@ -855,7 +910,12 @@ def test_cancel_w_custom_retry(self): ) with api_request_patcher as fake_api_request: - result = job.cancel(retry=retry, timeout=7.5) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + result = job.cancel(retry=retry, timeout=7.5) + + final_attributes.assert_called_with({"path": api_path}, None, job) self.assertTrue(result) self.assertEqual(job._properties, resource) @@ -2343,12 +2403,17 @@ def test_begin_w_bound_client(self): conn = _make_connection(RESOURCE) client = _make_client(project=self.PROJECT, connection=conn) job = self._make_one(self.JOB_ID, [self.SOURCE1], self.TABLE_REF, client) + path = "/projects/{}/jobs".format(self.PROJECT) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job._begin() - job._begin() + final_attributes.assert_called_with({"path": path}, None, job) conn.api_request.assert_called_once_with( method="POST", - path="/projects/{}/jobs".format(self.PROJECT), + path=path, data={ "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, "configuration": { @@ -2384,7 +2449,12 @@ def test_begin_w_autodetect(self): job = self._make_one( self.JOB_ID, [self.SOURCE1], self.TABLE_REF, client, config ) - job._begin() + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job._begin() + + final_attributes.assert_called_with({"path": path}, None, job) sent = { "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, @@ -2478,8 +2548,12 @@ def test_begin_w_alternate_client(self): config.use_avro_logical_types = True config.write_disposition = WriteDisposition.WRITE_TRUNCATE config.schema_update_options = [SchemaUpdateOption.ALLOW_FIELD_ADDITION] + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job._begin(client=client2) - job._begin(client=client2) + final_attributes.assert_called_with({"path": PATH}, None, job) conn1.api_request.assert_not_called() self.assertEqual(len(conn2.api_request.call_args_list), 1) @@ -2504,8 +2578,13 @@ def test_begin_w_job_reference(self): conn = _make_connection(resource) client = _make_client(project=self.PROJECT, connection=conn) load_job = self._make_one(job_ref, [self.SOURCE1], self.TABLE_REF, client) - - load_job._begin() + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + load_job._begin() + final_attributes.assert_called_with( + {"path": "/projects/alternative-project/jobs"}, None, load_job + ) conn.api_request.assert_called_once() _, request = conn.api_request.call_args @@ -2522,8 +2601,16 @@ def test_exists_miss_w_bound_client(self): conn = _make_connection() client = _make_client(project=self.PROJECT, connection=conn) job = self._make_one(self.JOB_ID, [self.SOURCE1], self.TABLE_REF, client) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + self.assertFalse(job.exists()) - self.assertFalse(job.exists()) + final_attributes.assert_called_with( + {"path": "/projects/{}/jobs/{}".format(self.PROJECT, self.JOB_ID)}, + None, + job, + ) conn.api_request.assert_called_once_with( method="GET", path=PATH, query_params={"fields": "id"}, timeout=None @@ -2536,8 +2623,16 @@ def test_exists_hit_w_alternate_client(self): conn2 = _make_connection({}) client2 = _make_client(project=self.PROJECT, connection=conn2) job = self._make_one(self.JOB_ID, [self.SOURCE1], self.TABLE_REF, client1) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + self.assertTrue(job.exists(client=client2)) - self.assertTrue(job.exists(client=client2)) + final_attributes.assert_called_with( + {"path": "/projects/{}/jobs/{}".format(self.PROJECT, self.JOB_ID)}, + None, + job, + ) conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( @@ -2551,8 +2646,14 @@ def test_exists_miss_w_job_reference(self): conn = _make_connection() client = _make_client(project=self.PROJECT, connection=conn) load_job = self._make_one(job_ref, [self.SOURCE1], self.TABLE_REF, client) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + self.assertFalse(load_job.exists()) - self.assertFalse(load_job.exists()) + final_attributes.assert_called_with( + {"path": "/projects/other-project/jobs/my-job-id"}, None, load_job + ) conn.api_request.assert_called_once_with( method="GET", @@ -2567,8 +2668,12 @@ def test_reload_w_bound_client(self): conn = _make_connection(RESOURCE) client = _make_client(project=self.PROJECT, connection=conn) job = self._make_one(self.JOB_ID, [self.SOURCE1], self.TABLE_REF, client) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job.reload() - job.reload() + final_attributes.assert_called_with({"path": PATH}, None, job) conn.api_request.assert_called_once_with( method="GET", path=PATH, query_params={}, timeout=None @@ -2583,8 +2688,12 @@ def test_reload_w_alternate_client(self): conn2 = _make_connection(RESOURCE) client2 = _make_client(project=self.PROJECT, connection=conn2) job = self._make_one(self.JOB_ID, [self.SOURCE1], self.TABLE_REF, client1) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job.reload(client=client2) - job.reload(client=client2) + final_attributes.assert_called_with({"path": PATH}, None, job) conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( @@ -2602,8 +2711,16 @@ def test_reload_w_job_reference(self): conn = _make_connection(resource) client = _make_client(project=self.PROJECT, connection=conn) load_job = self._make_one(job_ref, [self.SOURCE1], self.TABLE_REF, client) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + load_job.reload() - load_job.reload() + final_attributes.assert_called_with( + {"path": "/projects/alternative-project/jobs/{}".format(self.JOB_ID)}, + None, + load_job, + ) conn.api_request.assert_called_once_with( method="GET", @@ -2619,8 +2736,12 @@ def test_cancel_w_bound_client(self): conn = _make_connection(RESPONSE) client = _make_client(project=self.PROJECT, connection=conn) job = self._make_one(self.JOB_ID, [self.SOURCE1], self.TABLE_REF, client) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job.cancel() - job.cancel() + final_attributes.assert_called_with({"path": PATH}, None, job) conn.api_request.assert_called_once_with( method="POST", path=PATH, query_params={}, timeout=None, @@ -2636,8 +2757,12 @@ def test_cancel_w_alternate_client(self): conn2 = _make_connection(RESPONSE) client2 = _make_client(project=self.PROJECT, connection=conn2) job = self._make_one(self.JOB_ID, [self.SOURCE1], self.TABLE_REF, client1) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job.cancel(client=client2) - job.cancel(client=client2) + final_attributes.assert_called_with({"path": PATH}, None, job) conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( @@ -2655,9 +2780,20 @@ def test_cancel_w_job_reference(self): conn = _make_connection({"job": resource}) client = _make_client(project=self.PROJECT, connection=conn) load_job = self._make_one(job_ref, [self.SOURCE1], self.TABLE_REF, client) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + load_job.cancel() - load_job.cancel() - + final_attributes.assert_called_with( + { + "path": "/projects/alternative-project/jobs/{}/cancel".format( + self.JOB_ID + ) + }, + None, + load_job, + ) conn.api_request.assert_called_once_with( method="POST", path="/projects/alternative-project/jobs/{}/cancel".format(self.JOB_ID), @@ -2952,8 +3088,12 @@ def test_begin_w_bound_client(self): source = self._table_ref(self.SOURCE_TABLE) destination = self._table_ref(self.DESTINATION_TABLE) job = self._make_one(self.JOB_ID, [source], destination, client) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job._begin() - job._begin() + final_attributes.assert_called_with({"path": PATH}, None, job) conn.api_request.assert_called_once_with( method="POST", @@ -3016,7 +3156,12 @@ def test_begin_w_alternate_client(self): config.create_disposition = CreateDisposition.CREATE_NEVER config.write_disposition = WriteDisposition.WRITE_TRUNCATE job = self._make_one(self.JOB_ID, [source], destination, client1, config) - job._begin(client=client2) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job._begin(client=client2) + + final_attributes.assert_called_with({"path": PATH}, None, job) conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( @@ -3038,8 +3183,12 @@ def test_exists_miss_w_bound_client(self): source = self._table_ref(self.SOURCE_TABLE) destination = self._table_ref(self.DESTINATION_TABLE) job = self._make_one(self.JOB_ID, [source], destination, client) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + self.assertFalse(job.exists()) - self.assertFalse(job.exists()) + final_attributes.assert_called_with({"path": PATH}, None, job) conn.api_request.assert_called_once_with( method="GET", path=PATH, query_params={"fields": "id"}, timeout=None, @@ -3054,8 +3203,12 @@ def test_exists_hit_w_alternate_client(self): source = self._table_ref(self.SOURCE_TABLE) destination = self._table_ref(self.DESTINATION_TABLE) job = self._make_one(self.JOB_ID, [source], destination, client1) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + self.assertTrue(job.exists(client=client2)) - self.assertTrue(job.exists(client=client2)) + final_attributes.assert_called_with({"path": PATH}, None, job) conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( @@ -3070,8 +3223,12 @@ def test_reload_w_bound_client(self): source = self._table_ref(self.SOURCE_TABLE) destination = self._table_ref(self.DESTINATION_TABLE) job = self._make_one(self.JOB_ID, [source], destination, client) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job.reload() - job.reload() + final_attributes.assert_called_with({"path": PATH}, None, job) conn.api_request.assert_called_once_with( method="GET", path=PATH, query_params={}, timeout=None @@ -3088,8 +3245,12 @@ def test_reload_w_alternate_client(self): source = self._table_ref(self.SOURCE_TABLE) destination = self._table_ref(self.DESTINATION_TABLE) job = self._make_one(self.JOB_ID, [source], destination, client1) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job.reload(client=client2) - job.reload(client=client2) + final_attributes.assert_called_with({"path": PATH}, None, job) conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( @@ -3349,8 +3510,12 @@ def test_begin_w_bound_client(self): source_dataset = DatasetReference(self.PROJECT, self.DS_ID) source = source_dataset.table(self.SOURCE_TABLE) job = self._make_one(self.JOB_ID, source, [self.DESTINATION_URI], client) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job._begin() - job._begin() + final_attributes.assert_called_with({"path": PATH}, None, job) conn.api_request.assert_called_once_with( method="POST", @@ -3407,8 +3572,12 @@ def test_begin_w_alternate_client(self): job = self._make_one( self.JOB_ID, source, [self.DESTINATION_URI], client1, config ) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job._begin(client=client2) - job._begin(client=client2) + final_attributes.assert_called_with({"path": PATH}, None, job) conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( @@ -3429,8 +3598,12 @@ def test_exists_miss_w_bound_client(self): job = self._make_one( self.JOB_ID, self.TABLE_REF, [self.DESTINATION_URI], client ) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + self.assertFalse(job.exists()) - self.assertFalse(job.exists()) + final_attributes.assert_called_with({"path": PATH}, None, job) conn.api_request.assert_called_once_with( method="GET", path=PATH, query_params={"fields": "id"}, timeout=None, @@ -3445,8 +3618,12 @@ def test_exists_hit_w_alternate_client(self): job = self._make_one( self.JOB_ID, self.TABLE_REF, [self.DESTINATION_URI], client1 ) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + self.assertTrue(job.exists(client=client2)) - self.assertTrue(job.exists(client=client2)) + final_attributes.assert_called_with({"path": PATH}, None, job) conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( @@ -3463,9 +3640,12 @@ def test_reload_w_bound_client(self): source_dataset = DatasetReference(self.PROJECT, self.DS_ID) source = source_dataset.table(self.SOURCE_TABLE) job = self._make_one(self.JOB_ID, source, [self.DESTINATION_URI], client) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job.reload() - job.reload() - + final_attributes.assert_called_with({"path": PATH}, None, job) conn.api_request.assert_called_once_with( method="GET", path=PATH, query_params={}, timeout=None ) @@ -3483,8 +3663,12 @@ def test_reload_w_alternate_client(self): source_dataset = DatasetReference(self.PROJECT, self.DS_ID) source = source_dataset.table(self.SOURCE_TABLE) job = self._make_one(self.JOB_ID, source, [self.DESTINATION_URI], client1) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job.reload(client=client2) - job.reload(client=client2) + final_attributes.assert_called_with({"path": PATH}, None, job) conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( @@ -4823,8 +5007,12 @@ def test__begin_w_timeout(self): conn = _make_connection(RESOURCE) client = _make_client(project=self.PROJECT, connection=conn) job = self._make_one(self.JOB_ID, self.QUERY, client) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job._begin(timeout=7.5) - job._begin(timeout=7.5) + final_attributes.assert_called_with({"path": PATH}, None, job) conn.api_request.assert_called_once_with( method="POST", @@ -4856,8 +5044,12 @@ def test_begin_w_bound_client(self): config = QueryJobConfig() config.default_dataset = DatasetReference(self.PROJECT, DS_ID) job = self._make_one(self.JOB_ID, self.QUERY, client, job_config=config) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job._begin() - job._begin() + final_attributes.assert_called_with({"path": PATH}, None, job) self.assertIsNone(job.default_dataset) self.assertEqual(job.udf_resources, []) @@ -4936,8 +5128,12 @@ def test_begin_w_alternate_client(self): config.maximum_bytes_billed = 123456 config.schema_update_options = [SchemaUpdateOption.ALLOW_FIELD_RELAXATION] job = self._make_one(self.JOB_ID, self.QUERY, client1, job_config=config) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job._begin(client=client2) - job._begin(client=client2) + final_attributes.assert_called_with({"path": PATH}, None, job) conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( @@ -4978,8 +5174,12 @@ def test_begin_w_udf(self): config.udf_resources = udf_resources config.use_legacy_sql = True job = self._make_one(self.JOB_ID, self.QUERY, client, job_config=config) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job._begin() - job._begin() + final_attributes.assert_called_with({"path": PATH}, None, job) self.assertEqual(job.udf_resources, udf_resources) conn.api_request.assert_called_once_with( @@ -5028,8 +5228,12 @@ def test_begin_w_named_query_parameter(self): jconfig = QueryJobConfig() jconfig.query_parameters = query_parameters job = self._make_one(self.JOB_ID, self.QUERY, client, job_config=jconfig) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job._begin() - job._begin() + final_attributes.assert_called_with({"path": PATH}, None, job) self.assertEqual(job.query_parameters, query_parameters) conn.api_request.assert_called_once_with( @@ -5072,8 +5276,12 @@ def test_begin_w_positional_query_parameter(self): jconfig = QueryJobConfig() jconfig.query_parameters = query_parameters job = self._make_one(self.JOB_ID, self.QUERY, client, job_config=jconfig) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job._begin() - job._begin() + final_attributes.assert_called_with({"path": PATH}, None, job) self.assertEqual(job.query_parameters, query_parameters) conn.api_request.assert_called_once_with( @@ -5148,8 +5356,12 @@ def test_begin_w_table_defs(self): config.table_definitions = {bt_table: bt_config, csv_table: csv_config} config.use_legacy_sql = True job = self._make_one(self.JOB_ID, self.QUERY, client, job_config=config) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job._begin() - job._begin() + final_attributes.assert_called_with({"path": PATH}, None, job) conn.api_request.assert_called_once_with( method="POST", @@ -5187,8 +5399,12 @@ def test_dry_run_query(self): config = QueryJobConfig() config.dry_run = True job = self._make_one(self.JOB_ID, self.QUERY, client, job_config=config) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job._begin() - job._begin() + final_attributes.assert_called_with({"path": PATH}, None, job) self.assertEqual(job.udf_resources, []) conn.api_request.assert_called_once_with( method="POST", @@ -5209,8 +5425,12 @@ def test_exists_miss_w_bound_client(self): conn = _make_connection() client = _make_client(project=self.PROJECT, connection=conn) job = self._make_one(self.JOB_ID, self.QUERY, client) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + self.assertFalse(job.exists()) - self.assertFalse(job.exists()) + final_attributes.assert_called_with({"path": PATH}, None, job) conn.api_request.assert_called_once_with( method="GET", path=PATH, query_params={"fields": "id"}, timeout=None @@ -5223,8 +5443,12 @@ def test_exists_hit_w_alternate_client(self): conn2 = _make_connection({}) client2 = _make_client(project=self.PROJECT, connection=conn2) job = self._make_one(self.JOB_ID, self.QUERY, client1) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + self.assertTrue(job.exists(client=client2)) - self.assertTrue(job.exists(client=client2)) + final_attributes.assert_called_with({"path": PATH}, None, job) conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( @@ -5246,8 +5470,12 @@ def test_reload_w_bound_client(self): config = QueryJobConfig() config.destination = table_ref job = self._make_one(self.JOB_ID, None, client, job_config=config) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job.reload() - job.reload() + final_attributes.assert_called_with({"path": PATH}, None, job) self.assertNotEqual(job.destination, table_ref) @@ -5272,8 +5500,12 @@ def test_reload_w_alternate_client(self): conn2 = _make_connection(RESOURCE) client2 = _make_client(project=self.PROJECT, connection=conn2) job = self._make_one(self.JOB_ID, self.QUERY, client1) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job.reload(client=client2) - job.reload(client=client2) + final_attributes.assert_called_with({"path": PATH}, None, job) conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( @@ -5296,8 +5528,12 @@ def test_reload_w_timeout(self): config = QueryJobConfig() config.destination = table_ref job = self._make_one(self.JOB_ID, None, client, job_config=config) + with mock.patch( + "google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes" + ) as final_attributes: + job.reload(timeout=4.2) - job.reload(timeout=4.2) + final_attributes.assert_called_with({"path": PATH}, None, job) self.assertNotEqual(job.destination, table_ref) diff --git a/tests/unit/test_opentelemetry_tracing.py b/tests/unit/test_opentelemetry_tracing.py new file mode 100644 index 000000000..1c35b0a82 --- /dev/null +++ b/tests/unit/test_opentelemetry_tracing.py @@ -0,0 +1,212 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import sys + +import mock + +try: + import opentelemetry + from opentelemetry import trace + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import SimpleExportSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, + ) +except ImportError: + opentelemetry = None +import pytest +from six.moves import reload_module + +from google.cloud.bigquery import opentelemetry_tracing + +TEST_SPAN_NAME = "bar" +TEST_SPAN_ATTRIBUTES = {"foo": "baz"} + + +@pytest.mark.skipif(opentelemetry is None, reason="Require `opentelemetry`") +@pytest.fixture +def setup(): + reload_module(opentelemetry_tracing) + tracer_provider = TracerProvider() + memory_exporter = InMemorySpanExporter() + span_processor = SimpleExportSpanProcessor(memory_exporter) + tracer_provider.add_span_processor(span_processor) + trace.set_tracer_provider(tracer_provider) + yield memory_exporter + + +@pytest.mark.skipif(opentelemetry is None, reason="Require `opentelemetry`") +def test_opentelemetry_not_installed(setup, monkeypatch): + monkeypatch.setitem(sys.modules, "opentelemetry", None) + reload_module(opentelemetry_tracing) + with opentelemetry_tracing.create_span("No-op for opentelemetry") as span: + assert span is None + + +@pytest.mark.skipif(opentelemetry is None, reason="Require `opentelemetry`") +def test_opentelemetry_success(setup): + expected_attributes = {"foo": "baz", "db.system": "BigQuery"} + + with opentelemetry_tracing.create_span( + TEST_SPAN_NAME, attributes=TEST_SPAN_ATTRIBUTES, client=None, job_ref=None + ) as span: + assert span is not None + assert span.name == TEST_SPAN_NAME + assert span.attributes == expected_attributes + + +@pytest.mark.skipif(opentelemetry is None, reason="Require `opentelemetry`") +def test_default_client_attributes(setup): + expected_attributes = { + "foo": "baz", + "db.system": "BigQuery", + "db.name": "test_project", + "location": "test_location", + } + with mock.patch("google.cloud.bigquery.client.Client") as test_client: + test_client.project = "test_project" + test_client.location = "test_location" + with opentelemetry_tracing.create_span( + TEST_SPAN_NAME, attributes=TEST_SPAN_ATTRIBUTES, client=test_client + ) as span: + assert span is not None + assert span.name == TEST_SPAN_NAME + assert span.attributes == expected_attributes + + +@pytest.mark.skipif(opentelemetry is None, reason="Require `opentelemetry`") +def test_default_job_attributes(setup): + import google.cloud._helpers + + time_created = datetime.datetime( + 2010, 5, 19, 16, 0, 0, tzinfo=google.cloud._helpers.UTC + ) + started_time = datetime.datetime( + 2011, 10, 1, 16, 0, 0, tzinfo=google.cloud._helpers.UTC + ) + ended_time = datetime.datetime( + 2011, 10, 2, 16, 0, 0, tzinfo=google.cloud._helpers.UTC + ) + error_result = [ + {"errorResult1": "some_error_result1", "errorResult2": "some_error_result2"} + ] + + expected_attributes = { + "db.system": "BigQuery", + "db.name": "test_project_id", + "location": "test_location", + "num_child_jobs": "0", + "job_id": "test_job_id", + "foo": "baz", + "parent_job_id": "parent_job_id", + "timeCreated": time_created.isoformat(), + "timeStarted": started_time.isoformat(), + "timeEnded": ended_time.isoformat(), + "hasErrors": True, + "state": "some_job_state", + } + with mock.patch("google.cloud.bigquery.job._AsyncJob") as test_job_ref: + test_job_ref.job_id = "test_job_id" + test_job_ref.location = "test_location" + test_job_ref.project = "test_project_id" + test_job_ref.num_child_jobs = "0" + test_job_ref.parent_job_id = "parent_job_id" + test_job_ref.created = time_created + test_job_ref.started = started_time + test_job_ref.ended = ended_time + test_job_ref.error_result = error_result + test_job_ref.state = "some_job_state" + + with opentelemetry_tracing.create_span( + TEST_SPAN_NAME, attributes=TEST_SPAN_ATTRIBUTES, job_ref=test_job_ref + ) as span: + assert span is not None + assert span.name == TEST_SPAN_NAME + assert span.attributes == expected_attributes + + +@pytest.mark.skipif(opentelemetry is None, reason="Require `opentelemetry`") +def test_default_no_data_leakage(setup): + import google.auth.credentials + from google.cloud.bigquery import client + from google.cloud.bigquery import job + + mock_credentials = mock.Mock(spec=google.auth.credentials.Credentials) + test_client = client.Client( + project="test_project", credentials=mock_credentials, location="test_location" + ) + + expected_attributes = { + "foo": "baz", + "db.system": "BigQuery", + "db.name": "test_project", + "location": "test_location", + } + with opentelemetry_tracing.create_span( + TEST_SPAN_NAME, attributes=TEST_SPAN_ATTRIBUTES, client=test_client + ) as span: + assert span.name == TEST_SPAN_NAME + assert span.attributes == expected_attributes + + test_job_reference = job._JobReference( + job_id="test_job_id", project="test_project_id", location="test_location" + ) + test_client = client.Client( + project="test_project", credentials=mock_credentials, location="test_location" + ) + test_job = job._AsyncJob(job_id=test_job_reference, client=test_client) + + expected_attributes = { + "db.system": "BigQuery", + "db.name": "test_project_id", + "location": "test_location", + "num_child_jobs": 0, + "job_id": "test_job_id", + "foo": "baz", + "hasErrors": False, + } + + with opentelemetry_tracing.create_span( + TEST_SPAN_NAME, attributes=TEST_SPAN_ATTRIBUTES, job_ref=test_job + ) as span: + assert span.name == TEST_SPAN_NAME + assert span.attributes == expected_attributes + + +@pytest.mark.skipif(opentelemetry is None, reason="Require `opentelemetry`") +def test_span_creation_error(setup): + import google.auth.credentials + from google.cloud.bigquery import client + from google.api_core.exceptions import GoogleAPICallError, InvalidArgument + + mock_credentials = mock.Mock(spec=google.auth.credentials.Credentials) + test_client = client.Client( + project="test_project", credentials=mock_credentials, location="test_location" + ) + + expected_attributes = { + "foo": "baz", + "db.system": "BigQuery", + "db.name": "test_project", + "location": "test_location", + } + with pytest.raises(GoogleAPICallError): + with opentelemetry_tracing.create_span( + TEST_SPAN_NAME, attributes=TEST_SPAN_ATTRIBUTES, client=test_client + ) as span: + assert span.name == TEST_SPAN_NAME + assert span.attributes == expected_attributes + raise InvalidArgument("test_error")