Skip to content

Commit

Permalink
Handle redis.exceptions.WatchError as a non-error event in instrument…
Browse files Browse the repository at this point in the history
…ation (#2668)
  • Loading branch information
zhihali authored Jul 15, 2024
1 parent 432d6f5 commit 7567efa
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 5 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2682](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2682))

### Fixed

- Handle `redis.exceptions.WatchError` as a non-error event in redis instrumentation
([#2668](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2668))
- `opentelemetry-instrumentation-httpx` Ensure httpx.get or httpx.request like methods are instrumented
([#2538](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2538))
- Add Python 3.12 support
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def response_hook(span, instance, response):
from opentelemetry.instrumentation.redis.version import __version__
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import Span
from opentelemetry.trace import Span, StatusCode

_DEFAULT_SERVICE = "redis"

Expand Down Expand Up @@ -212,9 +212,16 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
span.set_attribute(
"db.redis.pipeline_length", len(command_stack)
)
response = func(*args, **kwargs)

response = None
try:
response = func(*args, **kwargs)
except redis.WatchError:
span.set_status(StatusCode.UNSET)

if callable(response_hook):
response_hook(span, instance, response)

return response

pipeline_class = (
Expand Down Expand Up @@ -281,7 +288,13 @@ async def _async_traced_execute_pipeline(func, instance, args, kwargs):
span.set_attribute(
"db.redis.pipeline_length", len(command_stack)
)
response = await func(*args, **kwargs)

response = None
try:
response = await func(*args, **kwargs)
except redis.WatchError:
span.set_status(StatusCode.UNSET)

if callable(response_hook):
response_hook(span, instance, response)
return response
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
asgiref==3.7.2
async-timeout==4.0.3
Deprecated==1.2.14
fakeredis==2.23.3
importlib-metadata==6.11.0
iniconfig==2.0.0
packaging==24.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
from unittest import mock
from unittest import IsolatedAsyncioTestCase, mock
from unittest.mock import AsyncMock

import fakeredis
import pytest
import redis
import redis.asyncio
from fakeredis.aioredis import FakeRedis
from redis.exceptions import ConnectionError as redis_ConnectionError
from redis.exceptions import WatchError

from opentelemetry import trace
from opentelemetry.instrumentation.redis import RedisInstrumentor
Expand Down Expand Up @@ -311,3 +316,113 @@ def test_attributes_unix_socket(self):
span.attributes[SpanAttributes.NET_TRANSPORT],
NetTransportValues.OTHER.value,
)

def test_connection_error(self):
server = fakeredis.FakeServer()
server.connected = False
redis_client = fakeredis.FakeStrictRedis(server=server)
try:
redis_client.set("foo", "bar")
except redis_ConnectionError:
pass

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]

self.assertEqual(span.name, "SET")
self.assertEqual(span.kind, SpanKind.CLIENT)
self.assertEqual(span.status.status_code, trace.StatusCode.ERROR)

def test_response_error(self):
redis_client = fakeredis.FakeStrictRedis()
redis_client.lpush("mylist", "value")
try:
redis_client.incr(
"mylist"
) # Trying to increment a list, which is invalid
except redis.ResponseError:
pass

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 2)

span = spans[0]
self.assertEqual(span.name, "LPUSH")
self.assertEqual(span.kind, SpanKind.CLIENT)
self.assertEqual(span.status.status_code, trace.StatusCode.UNSET)

span = spans[1]
self.assertEqual(span.name, "INCRBY")
self.assertEqual(span.kind, SpanKind.CLIENT)
self.assertEqual(span.status.status_code, trace.StatusCode.ERROR)

def test_watch_error_sync(self):
def redis_operations():
try:
redis_client = fakeredis.FakeStrictRedis()
pipe = redis_client.pipeline(transaction=True)
pipe.watch("a")
redis_client.set("a", "bad") # This will cause the WatchError
pipe.multi()
pipe.set("a", "1")
pipe.execute()
except WatchError:
pass

redis_operations()

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 3)

# there should be 3 tests, we start watch operation and have 2 set operation on same key
self.assertEqual(len(spans), 3)

self.assertEqual(spans[0].attributes.get("db.statement"), "WATCH ?")
self.assertEqual(spans[0].kind, SpanKind.CLIENT)
self.assertEqual(spans[0].status.status_code, trace.StatusCode.UNSET)

for span in spans[1:]:
self.assertEqual(span.attributes.get("db.statement"), "SET ? ?")
self.assertEqual(span.kind, SpanKind.CLIENT)
self.assertEqual(span.status.status_code, trace.StatusCode.UNSET)


class TestRedisAsync(TestBase, IsolatedAsyncioTestCase):
def setUp(self):
super().setUp()
RedisInstrumentor().instrument(tracer_provider=self.tracer_provider)

def tearDown(self):
super().tearDown()
RedisInstrumentor().uninstrument()

@pytest.mark.asyncio
async def test_watch_error_async(self):
async def redis_operations():
try:
redis_client = FakeRedis()
async with redis_client.pipeline(transaction=False) as pipe:
await pipe.watch("a")
await redis_client.set("a", "bad")
pipe.multi()
await pipe.set("a", "1")
await pipe.execute()
except WatchError:
pass

await redis_operations()

spans = self.memory_exporter.get_finished_spans()

# there should be 3 tests, we start watch operation and have 2 set operation on same key
self.assertEqual(len(spans), 3)

self.assertEqual(spans[0].attributes.get("db.statement"), "WATCH ?")
self.assertEqual(spans[0].kind, SpanKind.CLIENT)
self.assertEqual(spans[0].status.status_code, trace.StatusCode.UNSET)

for span in spans[1:]:
self.assertEqual(span.attributes.get("db.statement"), "SET ? ?")
self.assertEqual(span.kind, SpanKind.CLIENT)
self.assertEqual(span.status.status_code, trace.StatusCode.UNSET)

0 comments on commit 7567efa

Please sign in to comment.