Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Firstore Async Instrumentation #882

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
e07ffc3
Remove unnecessary instrumentation
TimPansino Jul 24, 2023
ba7850a
Simplify existing instrumentation
TimPansino Jul 24, 2023
19f5a48
Remove unnecessary settings lookups
TimPansino Jul 24, 2023
b9eaa5b
Client instrumentation
TimPansino Jul 25, 2023
44598cc
Add query and aggregation query instrumentation
TimPansino Jul 25, 2023
a0c78a2
Fix deprecation warning
TimPansino Jul 25, 2023
b4e8700
Simplify collection lookup
TimPansino Jul 25, 2023
6214f0b
Combine query test files
TimPansino Jul 25, 2023
d17b62f
Rename methods for clarity
TimPansino Jul 25, 2023
2ce45c8
Instrument Firestore batching
TimPansino Jul 25, 2023
ef06df5
Add transaction instrumentation
TimPansino Jul 26, 2023
b9a91e5
Consumer iterators on <=Py38
TimPansino Jul 26, 2023
fbe40ea
Add async generator wrapper
TimPansino Jul 26, 2023
5693dd2
Allow better parallelization in firestore tests
TimPansino Jul 26, 2023
c857358
Fix issue in async generator wrapper
TimPansino Jul 26, 2023
c49a1cf
Add async client instrumentation
TimPansino Jul 26, 2023
7851baf
Squashed commit of the following:
TimPansino Jul 26, 2023
3fb6a6c
Add async collection instrumentation
TimPansino Jul 26, 2023
aab244b
Add async document instrumentation
TimPansino Jul 26, 2023
c392e78
Async Query instrumentation
TimPansino Jul 26, 2023
6b7fc79
Add async batch instrumentation
TimPansino Jul 26, 2023
e04ec6f
Add instrumentation for AsyncTransaction
TimPansino Jul 27, 2023
87fbe62
Squashed commit of the following:
TimPansino Jul 27, 2023
b6bc9a4
Remove reset_firestore
TimPansino Jul 27, 2023
9266924
Re-merge of test_query
TimPansino Jul 27, 2023
5902515
Use public API imports
TimPansino Jul 27, 2023
d3e4732
Add async collection group instrumentation
TimPansino Jul 27, 2023
7bf6f49
Refactor exercise functions to fixtures
TimPansino Jul 31, 2023
4a8a3fe
Merge remote-tracking branch 'origin/develop-google-firestore-instrum…
TimPansino Jul 31, 2023
29579fc
Merge branch 'develop-google-firestore-instrumentation' into feature-…
mergify[bot] Aug 2, 2023
ad2999f
Squashed commit of the following:
TimPansino Aug 2, 2023
844e556
Remove custom wrapper code from firestore
TimPansino Aug 2, 2023
db3561e
Merge branch 'develop-google-firestore-instrumentation' into feature-…
mergify[bot] Aug 2, 2023
31b6db5
Merge branch 'develop-google-firestore-instrumentation' into feature-…
mergify[bot] Aug 2, 2023
dabfc4a
Undo wrapper edits
TimPansino Aug 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions newrelic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2279,31 +2279,61 @@ def _process_module_builtin_defaults():
"newrelic.hooks.datastore_firestore",
"instrument_google_cloud_firestore_v1_client",
)
_process_module_definition(
"google.cloud.firestore_v1.async_client",
"newrelic.hooks.datastore_firestore",
"instrument_google_cloud_firestore_v1_async_client",
)
_process_module_definition(
"google.cloud.firestore_v1.document",
"newrelic.hooks.datastore_firestore",
"instrument_google_cloud_firestore_v1_document",
)
_process_module_definition(
"google.cloud.firestore_v1.async_document",
"newrelic.hooks.datastore_firestore",
"instrument_google_cloud_firestore_v1_async_document",
)
_process_module_definition(
"google.cloud.firestore_v1.collection",
"newrelic.hooks.datastore_firestore",
"instrument_google_cloud_firestore_v1_collection",
)
_process_module_definition(
"google.cloud.firestore_v1.async_collection",
"newrelic.hooks.datastore_firestore",
"instrument_google_cloud_firestore_v1_async_collection",
)
_process_module_definition(
"google.cloud.firestore_v1.query",
"newrelic.hooks.datastore_firestore",
"instrument_google_cloud_firestore_v1_query",
)
_process_module_definition(
"google.cloud.firestore_v1.async_query",
"newrelic.hooks.datastore_firestore",
"instrument_google_cloud_firestore_v1_async_query",
)
_process_module_definition(
"google.cloud.firestore_v1.aggregation",
"newrelic.hooks.datastore_firestore",
"instrument_google_cloud_firestore_v1_aggregation",
)
_process_module_definition(
"google.cloud.firestore_v1.async_aggregation",
"newrelic.hooks.datastore_firestore",
"instrument_google_cloud_firestore_v1_async_aggregation",
)
_process_module_definition(
"google.cloud.firestore_v1.batch",
"newrelic.hooks.datastore_firestore",
"instrument_google_cloud_firestore_v1_batch",
)
_process_module_definition(
"google.cloud.firestore_v1.async_batch",
"newrelic.hooks.datastore_firestore",
"instrument_google_cloud_firestore_v1_async_batch",
)
_process_module_definition(
"google.cloud.firestore_v1.bulk_batch",
"newrelic.hooks.datastore_firestore",
Expand All @@ -2314,6 +2344,11 @@ def _process_module_builtin_defaults():
"newrelic.hooks.datastore_firestore",
"instrument_google_cloud_firestore_v1_transaction",
)
_process_module_definition(
"google.cloud.firestore_v1.async_transaction",
"newrelic.hooks.datastore_firestore",
"instrument_google_cloud_firestore_v1_async_transaction",
)

_process_module_definition(
"ariadne.asgi",
Expand Down
121 changes: 99 additions & 22 deletions newrelic/hooks/datastore_firestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from newrelic.api.datastore_trace import DatastoreTrace, wrap_datastore_trace
from newrelic.api.datastore_trace import wrap_datastore_trace
from newrelic.api.function_trace import wrap_function_trace
from newrelic.common.async_wrapper import generator_wrapper
from newrelic.common.object_wrapper import wrap_function_wrapper
from newrelic.common.async_wrapper import generator_wrapper, async_generator_wrapper


_get_object_id = lambda obj, *args, **kwargs: getattr(obj, "id", None)
_get_parent_id = lambda obj, *args, **kwargs: getattr(getattr(obj, "_parent", None), "id", None)
_get_collection_ref_id = lambda obj, *args, **kwargs: getattr(getattr(obj, "_collection_ref", None), "id", None)


def wrap_generator_method(module, class_name, method_name, target):
def _wrapper(wrapped, instance, args, kwargs):
target_ = target(instance) if callable(target) else target
trace = DatastoreTrace(product="Firestore", target=target_, operation=method_name)
wrapped = generator_wrapper(wrapped, trace)
return wrapped(*args, **kwargs)

class_ = getattr(module, class_name)
if class_ is not None:
if hasattr(class_, method_name):
wrap_function_wrapper(module, "%s.%s" % (class_name, method_name), _wrapper)


def instrument_google_cloud_firestore_v1_base_client(module):
rollup = ("Datastore/all", "Datastore/Firestore/all")
wrap_function_trace(
Expand All @@ -48,7 +34,15 @@ def instrument_google_cloud_firestore_v1_client(module):
class_ = module.Client
for method in ("collections", "get_all"):
if hasattr(class_, method):
wrap_generator_method(module, "Client", method, target=None)
wrap_datastore_trace(module, "Client.%s" % method, operation=method, product="Firestore", target=None, async_wrapper=generator_wrapper)


def instrument_google_cloud_firestore_v1_async_client(module):
if hasattr(module, "AsyncClient"):
class_ = module.AsyncClient
for method in ("collections", "get_all"):
if hasattr(class_, method):
wrap_datastore_trace(module, "AsyncClient.%s" % method, operation=method, product="Firestore", target=None, async_wrapper=async_generator_wrapper)


def instrument_google_cloud_firestore_v1_collection(module):
Expand All @@ -66,7 +60,21 @@ def instrument_google_cloud_firestore_v1_collection(module):

for method in ("stream", "list_documents"):
if hasattr(class_, method):
wrap_generator_method(module, "CollectionReference", method, target=_get_object_id)
wrap_datastore_trace(module, "CollectionReference.%s" % method, operation=method, product="Firestore", target=_get_object_id, async_wrapper=generator_wrapper)


def instrument_google_cloud_firestore_v1_async_collection(module):
if hasattr(module, "AsyncCollectionReference"):
class_ = module.AsyncCollectionReference
for method in ("add", "get"):
if hasattr(class_, method):
wrap_datastore_trace(
module, "AsyncCollectionReference.%s" % method, product="Firestore", target=_get_object_id, operation=method
)

for method in ("stream", "list_documents"):
if hasattr(class_, method):
wrap_datastore_trace(module, "AsyncCollectionReference.%s" % method, operation=method, product="Firestore", target=_get_object_id, async_wrapper=async_generator_wrapper)


def instrument_google_cloud_firestore_v1_document(module):
Expand All @@ -84,7 +92,21 @@ def instrument_google_cloud_firestore_v1_document(module):

for method in ("collections",):
if hasattr(class_, method):
wrap_generator_method(module, "DocumentReference", method, target=_get_object_id)
wrap_datastore_trace(module, "DocumentReference.%s" % method, operation=method, product="Firestore", target=_get_object_id, async_wrapper=generator_wrapper)


def instrument_google_cloud_firestore_v1_async_document(module):
if hasattr(module, "AsyncDocumentReference"):
class_ = module.AsyncDocumentReference
for method in ("create", "delete", "get", "set", "update"):
if hasattr(class_, method):
wrap_datastore_trace(
module, "AsyncDocumentReference.%s" % method, product="Firestore", target=_get_object_id, operation=method
)

for method in ("collections",):
if hasattr(class_, method):
wrap_datastore_trace(module, "AsyncDocumentReference.%s" % method, operation=method, product="Firestore", target=_get_object_id, async_wrapper=async_generator_wrapper)


def instrument_google_cloud_firestore_v1_query(module):
Expand All @@ -98,13 +120,33 @@ def instrument_google_cloud_firestore_v1_query(module):

for method in ("stream",):
if hasattr(class_, method):
wrap_generator_method(module, "Query", method, target=_get_parent_id)
wrap_datastore_trace(module, "Query.%s" % method, operation=method, product="Firestore", target=_get_parent_id, async_wrapper=generator_wrapper)

if hasattr(module, "CollectionGroup"):
class_ = module.CollectionGroup
for method in ("get_partitions",):
if hasattr(class_, method):
wrap_generator_method(module, "CollectionGroup", method, target=_get_parent_id)
wrap_datastore_trace(module, "CollectionGroup.%s" % method, operation=method, product="Firestore", target=_get_parent_id, async_wrapper=generator_wrapper)


def instrument_google_cloud_firestore_v1_async_query(module):
if hasattr(module, "AsyncQuery"):
class_ = module.AsyncQuery
for method in ("get",):
if hasattr(class_, method):
wrap_datastore_trace(
module, "AsyncQuery.%s" % method, product="Firestore", target=_get_parent_id, operation=method
)

for method in ("stream",):
if hasattr(class_, method):
wrap_datastore_trace(module, "AsyncQuery.%s" % method, operation=method, product="Firestore", target=_get_parent_id, async_wrapper=async_generator_wrapper)

if hasattr(module, "AsyncCollectionGroup"):
class_ = module.AsyncCollectionGroup
for method in ("get_partitions",):
if hasattr(class_, method):
wrap_datastore_trace(module, "AsyncCollectionGroup.%s" % method, operation=method, product="Firestore", target=_get_parent_id, async_wrapper=async_generator_wrapper)


def instrument_google_cloud_firestore_v1_aggregation(module):
Expand All @@ -122,7 +164,21 @@ def instrument_google_cloud_firestore_v1_aggregation(module):

for method in ("stream",):
if hasattr(class_, method):
wrap_generator_method(module, "AggregationQuery", method, target=_get_collection_ref_id)
wrap_datastore_trace(module, "AggregationQuery.%s" % method, operation=method, product="Firestore", target=_get_collection_ref_id, async_wrapper=generator_wrapper)


def instrument_google_cloud_firestore_v1_async_aggregation(module):
if hasattr(module, "AsyncAggregationQuery"):
class_ = module.AsyncAggregationQuery
for method in ("get",):
if hasattr(class_, method):
wrap_datastore_trace(
module, "AsyncAggregationQuery.%s" % method, product="Firestore", target=_get_collection_ref_id, operation=method
)

for method in ("stream",):
if hasattr(class_, method):
wrap_datastore_trace(module, "AsyncAggregationQuery.%s" % method, operation=method, product="Firestore", target=_get_collection_ref_id, async_wrapper=async_generator_wrapper)


def instrument_google_cloud_firestore_v1_batch(module):
Expand All @@ -135,6 +191,16 @@ def instrument_google_cloud_firestore_v1_batch(module):
)


def instrument_google_cloud_firestore_v1_async_batch(module):
if hasattr(module, "AsyncWriteBatch"):
class_ = module.AsyncWriteBatch
for method in ("commit",):
if hasattr(class_, method):
wrap_datastore_trace(
module, "AsyncWriteBatch.%s" % method, product="Firestore", target=None, operation=method
)


def instrument_google_cloud_firestore_v1_bulk_batch(module):
if hasattr(module, "BulkWriteBatch"):
class_ = module.BulkWriteBatch
Expand All @@ -154,3 +220,14 @@ def instrument_google_cloud_firestore_v1_transaction(module):
wrap_datastore_trace(
module, "Transaction.%s" % method, product="Firestore", target=None, operation=operation
)


def instrument_google_cloud_firestore_v1_async_transaction(module):
if hasattr(module, "AsyncTransaction"):
class_ = module.AsyncTransaction
for method in ("_commit", "_rollback"):
if hasattr(class_, method):
operation = method[1:] # Trim leading underscore
wrap_datastore_trace(
module, "AsyncTransaction.%s" % method, product="Firestore", target=None, operation=operation
)
38 changes: 38 additions & 0 deletions tests/datastore_firestore/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
import uuid

import pytest

from google.cloud.firestore import Client
from google.cloud.firestore import Client, AsyncClient

from testing_support.db_settings import firestore_settings
from testing_support.fixture.event_loop import event_loop as loop # noqa: F401; pylint: disable=W0611
from testing_support.fixtures import ( # noqa: F401; pylint: disable=W0611
collector_agent_registration_fixture,
collector_available_fixture,
Expand Down Expand Up @@ -62,6 +66,20 @@ def collection(client):
client.recursive_delete(collection_)


@pytest.fixture(scope="session")
def async_client(loop):
os.environ["FIRESTORE_EMULATOR_HOST"] = "%s:%d" % (FIRESTORE_HOST, FIRESTORE_PORT)
client = AsyncClient()
loop.run_until_complete(client.collection("healthcheck").document("healthcheck").set({}, retry=None, timeout=5)) # Ensure connection is available
return client


@pytest.fixture(scope="function")
def async_collection(async_client, collection):
# Use the same collection name as the collection fixture
yield async_client.collection(collection.id)


@pytest.fixture(scope="session")
def assert_trace_for_generator():
def _assert_trace_for_generator(generator_func, *args, **kwargs):
Expand All @@ -76,3 +94,23 @@ def _assert_trace_for_generator(generator_func, *args, **kwargs):
assert current_trace() is txn # Generator trace has exited.

return _assert_trace_for_generator


@pytest.fixture(scope="session")
def assert_trace_for_async_generator(loop):
def _assert_trace_for_async_generator(generator_func, *args, **kwargs):
_trace_check = []
txn = current_trace()
assert not isinstance(txn, DatastoreTrace)

async def coro():
# Check for generator trace on collections
async for _ in generator_func(*args, **kwargs):
_trace_check.append(isinstance(current_trace(), DatastoreTrace))

loop.run_until_complete(coro())

assert _trace_check and all(_trace_check) # All checks are True, and at least 1 is present.
assert current_trace() is txn # Generator trace has exited.

return _assert_trace_for_async_generator
56 changes: 56 additions & 0 deletions tests/datastore_firestore/test_async_batching.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

from testing_support.validators.validate_transaction_metrics import validate_transaction_metrics
from newrelic.api.background_task import background_task
from testing_support.validators.validate_database_duration import (
validate_database_duration,
)


@pytest.fixture()
def exercise_async_write_batch(async_client, async_collection):
async def _exercise_async_write_batch():
docs = [async_collection.document(str(x)) for x in range(1, 4)]
async_batch = async_client.batch()
for doc in docs:
async_batch.set(doc, {})

await async_batch.commit()
return _exercise_async_write_batch


def test_firestore_async_write_batch(loop, exercise_async_write_batch):
_test_scoped_metrics = [
("Datastore/operation/Firestore/commit", 1),
]

_test_rollup_metrics = [
("Datastore/all", 1),
("Datastore/allOther", 1),
]
@validate_database_duration()
@validate_transaction_metrics(
"test_firestore_async_write_batch",
scoped_metrics=_test_scoped_metrics,
rollup_metrics=_test_rollup_metrics,
background_task=True,
)
@background_task(name="test_firestore_async_write_batch")
def _test():
loop.run_until_complete(exercise_async_write_batch())

_test()
Loading