diff --git a/CHANGELOG.md b/CHANGELOG.md index b6882cbe57..7bf02de0e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,7 +40,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#2682](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2682)) ### Fixed - +- Handle `redis.exceptions.WatchError` as a non-error event in redis instrumentation + ([#2668](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2668)) - `opentelemetry-instrumentation-httpx` Ensure httpx.get or httpx.request like methods are instrumented ([#2538](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2538)) - Add Python 3.12 support diff --git a/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py b/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py index 1d61e8cfd3..c5f19fc736 100644 --- a/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py @@ -106,7 +106,7 @@ def response_hook(span, instance, response): from opentelemetry.instrumentation.redis.version import __version__ from opentelemetry.instrumentation.utils import unwrap from opentelemetry.semconv.trace import SpanAttributes -from opentelemetry.trace import Span +from opentelemetry.trace import Span, StatusCode _DEFAULT_SERVICE = "redis" @@ -212,9 +212,16 @@ def _traced_execute_pipeline(func, instance, args, kwargs): span.set_attribute( "db.redis.pipeline_length", len(command_stack) ) - response = func(*args, **kwargs) + + response = None + try: + response = func(*args, **kwargs) + except redis.WatchError: + span.set_status(StatusCode.UNSET) + if callable(response_hook): response_hook(span, instance, response) + return response pipeline_class = ( @@ -281,7 +288,13 @@ async def _async_traced_execute_pipeline(func, instance, args, kwargs): span.set_attribute( "db.redis.pipeline_length", len(command_stack) ) - response = await func(*args, **kwargs) + + response = None + try: + response = await func(*args, **kwargs) + except redis.WatchError: + span.set_status(StatusCode.UNSET) + if callable(response_hook): response_hook(span, instance, response) return response diff --git a/instrumentation/opentelemetry-instrumentation-redis/test-requirements.txt b/instrumentation/opentelemetry-instrumentation-redis/test-requirements.txt index 1731de082a..43d4bd9788 100644 --- a/instrumentation/opentelemetry-instrumentation-redis/test-requirements.txt +++ b/instrumentation/opentelemetry-instrumentation-redis/test-requirements.txt @@ -1,6 +1,7 @@ asgiref==3.7.2 async-timeout==4.0.3 Deprecated==1.2.14 +fakeredis==2.23.3 importlib-metadata==6.11.0 iniconfig==2.0.0 packaging==24.0 diff --git a/instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py b/instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py index 4a2fce5026..23d21b6e5a 100644 --- a/instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py +++ b/instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py @@ -12,11 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. import asyncio -from unittest import mock +from unittest import IsolatedAsyncioTestCase, mock from unittest.mock import AsyncMock +import fakeredis +import pytest import redis import redis.asyncio +from fakeredis.aioredis import FakeRedis +from redis.exceptions import ConnectionError as redis_ConnectionError +from redis.exceptions import WatchError from opentelemetry import trace from opentelemetry.instrumentation.redis import RedisInstrumentor @@ -311,3 +316,113 @@ def test_attributes_unix_socket(self): span.attributes[SpanAttributes.NET_TRANSPORT], NetTransportValues.OTHER.value, ) + + def test_connection_error(self): + server = fakeredis.FakeServer() + server.connected = False + redis_client = fakeredis.FakeStrictRedis(server=server) + try: + redis_client.set("foo", "bar") + except redis_ConnectionError: + pass + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "SET") + self.assertEqual(span.kind, SpanKind.CLIENT) + self.assertEqual(span.status.status_code, trace.StatusCode.ERROR) + + def test_response_error(self): + redis_client = fakeredis.FakeStrictRedis() + redis_client.lpush("mylist", "value") + try: + redis_client.incr( + "mylist" + ) # Trying to increment a list, which is invalid + except redis.ResponseError: + pass + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 2) + + span = spans[0] + self.assertEqual(span.name, "LPUSH") + self.assertEqual(span.kind, SpanKind.CLIENT) + self.assertEqual(span.status.status_code, trace.StatusCode.UNSET) + + span = spans[1] + self.assertEqual(span.name, "INCRBY") + self.assertEqual(span.kind, SpanKind.CLIENT) + self.assertEqual(span.status.status_code, trace.StatusCode.ERROR) + + def test_watch_error_sync(self): + def redis_operations(): + try: + redis_client = fakeredis.FakeStrictRedis() + pipe = redis_client.pipeline(transaction=True) + pipe.watch("a") + redis_client.set("a", "bad") # This will cause the WatchError + pipe.multi() + pipe.set("a", "1") + pipe.execute() + except WatchError: + pass + + redis_operations() + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 3) + + # there should be 3 tests, we start watch operation and have 2 set operation on same key + self.assertEqual(len(spans), 3) + + self.assertEqual(spans[0].attributes.get("db.statement"), "WATCH ?") + self.assertEqual(spans[0].kind, SpanKind.CLIENT) + self.assertEqual(spans[0].status.status_code, trace.StatusCode.UNSET) + + for span in spans[1:]: + self.assertEqual(span.attributes.get("db.statement"), "SET ? ?") + self.assertEqual(span.kind, SpanKind.CLIENT) + self.assertEqual(span.status.status_code, trace.StatusCode.UNSET) + + +class TestRedisAsync(TestBase, IsolatedAsyncioTestCase): + def setUp(self): + super().setUp() + RedisInstrumentor().instrument(tracer_provider=self.tracer_provider) + + def tearDown(self): + super().tearDown() + RedisInstrumentor().uninstrument() + + @pytest.mark.asyncio + async def test_watch_error_async(self): + async def redis_operations(): + try: + redis_client = FakeRedis() + async with redis_client.pipeline(transaction=False) as pipe: + await pipe.watch("a") + await redis_client.set("a", "bad") + pipe.multi() + await pipe.set("a", "1") + await pipe.execute() + except WatchError: + pass + + await redis_operations() + + spans = self.memory_exporter.get_finished_spans() + + # there should be 3 tests, we start watch operation and have 2 set operation on same key + self.assertEqual(len(spans), 3) + + self.assertEqual(spans[0].attributes.get("db.statement"), "WATCH ?") + self.assertEqual(spans[0].kind, SpanKind.CLIENT) + self.assertEqual(spans[0].status.status_code, trace.StatusCode.UNSET) + + for span in spans[1:]: + self.assertEqual(span.attributes.get("db.statement"), "SET ? ?") + self.assertEqual(span.kind, SpanKind.CLIENT) + self.assertEqual(span.status.status_code, trace.StatusCode.UNSET)