From 6bb6d3896d861c875ef3bbc0fec4c07347c9b00c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcin=20Po=C5=BAniak?= Date: Thu, 3 Oct 2024 00:42:33 +0200 Subject: [PATCH] Add additional attributes for redis.search methods create_index, search (#2635) --- CHANGELOG.md | 2 + .../instrumentation/redis/__init__.py | 73 ++++++++++++++++++- .../instrumentation/redis/util.py | 12 +++ .../tests/docker-compose.yml | 2 +- .../tests/redis/test_redis_functional.py | 69 ++++++++++++++++++ 5 files changed, 153 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d25b0b3a98..e4dcc10c7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#2860](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2860)) - `opentelemetry-instrumentation-aiokafka` Add instrumentor and auto instrumentation support for aiokafka ([#2082](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2082)) +- `opentelemetry-instrumentation-redis` Add additional attributes for methods create_index and search, rename those spans + ([#2635](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2635)) ### Fixed diff --git a/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py b/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py index 08337c2d4a..1d3b8b8a87 100644 --- a/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py @@ -102,6 +102,8 @@ def response_hook(span, instance, response): from opentelemetry.instrumentation.redis.util import ( _extract_conn_attributes, _format_command_args, + _set_span_attribute_if_value, + _value_or_none, ) from opentelemetry.instrumentation.redis.version import __version__ from opentelemetry.instrumentation.utils import unwrap @@ -126,6 +128,8 @@ def response_hook(span, instance, response): _REDIS_CLUSTER_VERSION = (4, 1, 0) _REDIS_ASYNCIO_CLUSTER_VERSION = (4, 3, 2) +_FIELD_TYPES = ["NUMERIC", "TEXT", "GEO", "TAG", "VECTOR"] + def _set_connection_attributes(span, conn): if not span.is_recording() or not hasattr(conn, "connection_pool"): @@ -138,7 +142,12 @@ def _set_connection_attributes(span, conn): def _build_span_name(instance, cmd_args): if len(cmd_args) > 0 and cmd_args[0]: - name = cmd_args[0] + if cmd_args[0] == "FT.SEARCH": + name = "redis.search" + elif cmd_args[0] == "FT.CREATE": + name = "redis.create_index" + else: + name = cmd_args[0] else: name = instance.connection_pool.connection_kwargs.get("db", 0) return name @@ -181,7 +190,6 @@ def _instrument( def _traced_execute_command(func, instance, args, kwargs): query = _format_command_args(args) name = _build_span_name(instance, args) - with tracer.start_as_current_span( name, kind=trace.SpanKind.CLIENT ) as span: @@ -189,9 +197,14 @@ def _traced_execute_command(func, instance, args, kwargs): span.set_attribute(SpanAttributes.DB_STATEMENT, query) _set_connection_attributes(span, instance) span.set_attribute("db.redis.args_length", len(args)) + if span.name == "redis.create_index": + _add_create_attributes(span, args) if callable(request_hook): request_hook(span, instance, args, kwargs) response = func(*args, **kwargs) + if span.is_recording(): + if span.name == "redis.search": + _add_search_attributes(span, response, args) if callable(response_hook): response_hook(span, instance, response) return response @@ -202,9 +215,7 @@ def _traced_execute_pipeline(func, instance, args, kwargs): resource, span_name, ) = _build_span_meta_data_for_pipeline(instance) - exception = None - with tracer.start_as_current_span( span_name, kind=trace.SpanKind.CLIENT ) as span: @@ -230,6 +241,60 @@ def _traced_execute_pipeline(func, instance, args, kwargs): return response + def _add_create_attributes(span, args): + _set_span_attribute_if_value( + span, "redis.create_index.index", _value_or_none(args, 1) + ) + # According to: https://github.com/redis/redis-py/blob/master/redis/commands/search/commands.py#L155 schema is last argument for execute command + try: + schema_index = args.index("SCHEMA") + except ValueError: + return + schema = args[schema_index:] + field_attribute = "" + # Schema in format: + # [first_field_name, first_field_type, first_field_some_attribute1, first_field_some_attribute2, second_field_name, ...] + field_attribute = "".join( + f"Field(name: {schema[index - 1]}, type: {schema[index]});" + for index in range(1, len(schema)) + if schema[index] in _FIELD_TYPES + ) + _set_span_attribute_if_value( + span, + "redis.create_index.fields", + field_attribute, + ) + + def _add_search_attributes(span, response, args): + _set_span_attribute_if_value( + span, "redis.search.index", _value_or_none(args, 1) + ) + _set_span_attribute_if_value( + span, "redis.search.query", _value_or_none(args, 2) + ) + # Parse response from search + # https://redis.io/docs/latest/commands/ft.search/ + # Response in format: + # [number_of_returned_documents, index_of_first_returned_doc, first_doc(as a list), index_of_second_returned_doc, second_doc(as a list) ...] + # Returned documents in array format: + # [first_field_name, first_field_value, second_field_name, second_field_value ...] + number_of_returned_documents = _value_or_none(response, 0) + _set_span_attribute_if_value( + span, "redis.search.total", number_of_returned_documents + ) + if "NOCONTENT" in args or not number_of_returned_documents: + return + for document_number in range(number_of_returned_documents): + document_index = _value_or_none(response, 1 + 2 * document_number) + if document_index: + document = response[2 + 2 * document_number] + for attribute_name_index in range(0, len(document), 2): + _set_span_attribute_if_value( + span, + f"redis.search.xdoc_{document_index}.{document[attribute_name_index]}", + document[attribute_name_index + 1], + ) + pipeline_class = ( "BasePipeline" if redis.VERSION < (3, 0, 0) else "Pipeline" ) diff --git a/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/util.py b/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/util.py index 4703bc271f..24ca387861 100644 --- a/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/util.py +++ b/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/util.py @@ -68,3 +68,15 @@ def _format_command_args(args): out_str = "" return out_str + + +def _set_span_attribute_if_value(span, name, value): + if value is not None and value != "": + span.set_attribute(name, value) + + +def _value_or_none(values, n): + try: + return values[n] + except IndexError: + return None diff --git a/tests/opentelemetry-docker-tests/tests/docker-compose.yml b/tests/opentelemetry-docker-tests/tests/docker-compose.yml index 59f0e42d3d..02a3721d9b 100644 --- a/tests/opentelemetry-docker-tests/tests/docker-compose.yml +++ b/tests/opentelemetry-docker-tests/tests/docker-compose.yml @@ -24,7 +24,7 @@ services: POSTGRES_PASSWORD: testpassword POSTGRES_DB: opentelemetry-tests otredis: - image: redis:4.0-alpine + image: redis/redis-stack:7.2.0-v12 ports: - "127.0.0.1:6379:6379" otrediscluster: diff --git a/tests/opentelemetry-docker-tests/tests/redis/test_redis_functional.py b/tests/opentelemetry-docker-tests/tests/redis/test_redis_functional.py index 481b8d21c8..d02febca10 100644 --- a/tests/opentelemetry-docker-tests/tests/redis/test_redis_functional.py +++ b/tests/opentelemetry-docker-tests/tests/redis/test_redis_functional.py @@ -18,6 +18,15 @@ import redis import redis.asyncio +from redis.exceptions import ResponseError +from redis.commands.search.indexDefinition import IndexDefinition, IndexType +from redis.commands.search.aggregation import AggregateRequest +from redis.commands.search.query import Query +from redis.commands.search.field import ( + TextField, + VectorField, +) + from opentelemetry import trace from opentelemetry.instrumentation.redis import RedisInstrumentor from opentelemetry.semconv.trace import SpanAttributes @@ -614,3 +623,63 @@ def test_get(self): self.assertEqual( span.attributes.get(SpanAttributes.DB_STATEMENT), "GET ?" ) + + +class TestRedisearchInstrument(TestBase): + def setUp(self): + super().setUp() + self.redis_client = redis.Redis(port=6379) + self.redis_client.flushall() + self.embedding_dim = 256 + RedisInstrumentor().instrument(tracer_provider=self.tracer_provider) + self.prepare_data() + self.create_index() + + def tearDown(self): + RedisInstrumentor().uninstrument() + super().tearDown() + + def prepare_data(self): + try: + self.redis_client.ft("idx:test_vss").dropindex(True) + except ResponseError: + print("No such index") + item = {"name": "test", + "value": "test_value", + "embeddings": [0.1] * 256} + pipeline = self.redis_client.pipeline() + pipeline.json().set(f"test:001", "$", item) + res = pipeline.execute() + assert False not in res + + def create_index(self): + schema = ( + TextField("$.name", no_stem=True, as_name="name"), + TextField("$.value", no_stem=True, as_name="value"), + VectorField("$.embeddings", + "FLAT", + { + "TYPE": "FLOAT32", + "DIM": self.embedding_dim, + "DISTANCE_METRIC": "COSINE", + }, + as_name="vector",), + ) + definition = IndexDefinition(prefix=["test:"], index_type=IndexType.JSON) + res = self.redis_client.ft("idx:test_vss").create_index(fields=schema, definition=definition) + assert "OK" in str(res) + + def test_redis_create_index(self): + spans = self.memory_exporter.get_finished_spans() + span = next(span for span in spans if span.name == "redis.create_index") + assert "redis.create_index.fields" in span.attributes + + def test_redis_query(self): + query = "@name:test" + res = self.redis_client.ft("idx:test_vss").search(Query(query)) + + spans = self.memory_exporter.get_finished_spans() + span = next(span for span in spans if span.name == "redis.search") + + assert span.attributes.get("redis.search.query") == query + assert span.attributes.get("redis.search.total") == 1