Skip to content

Commit

Permalink
Update __init__.py
Browse files Browse the repository at this point in the history
  • Loading branch information
rahuldimri authored Feb 1, 2023
1 parent 1874501 commit ac0e4e0
Showing 1 changed file with 113 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,111 +11,79 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This library allows tracing HTTP elasticsearch made by the
`elasticsearch <https://elasticsearch-py.readthedocs.io/en/master/>`_ library.
Usage
-----
.. code-block:: python
from opentelemetry.instrumentation.elasticsearch import ElasticsearchInstrumentor
import elasticsearch
# instrument elasticsearch
ElasticsearchInstrumentor().instrument()
# Using elasticsearch as normal now will automatically generate spans
es = elasticsearch.Elasticsearch()
es.index(index='my-index', doc_type='my-type', id=1, body={'my': 'data', 'timestamp': datetime.now()})
es.get(index='my-index', doc_type='my-type', id=1)
Elasticsearch instrumentation prefixes operation names with the string "Elasticsearch". This
can be changed to a different string by either setting the `OTEL_PYTHON_ELASTICSEARCH_NAME_PREFIX`
environment variable or by passing the prefix as an argument to the instrumentor. For example,
.. code-block:: python
ElasticsearchInstrumentor("my-custom-prefix").instrument()
The `instrument` method accepts the following keyword args:
tracer_provider (TracerProvider) - an optional tracer provider
request_hook (Callable) - a function with extra user-defined logic to be performed before performing the request
this function signature is:
def request_hook(span: Span, method: str, url: str, kwargs)
response_hook (Callable) - a function with extra user-defined logic to be performed after performing the request
this function signature is:
def response_hook(span: Span, response: dict)
for example:
.. code: python
from opentelemetry.instrumentation.elasticsearch import ElasticsearchInstrumentor
import elasticsearch
def request_hook(span, method, url, kwargs):
if span and span.is_recording():
span.set_attribute("custom_user_attribute_from_request_hook", "some-value")
def response_hook(span, response):
if span and span.is_recording():
span.set_attribute("custom_user_attribute_from_response_hook", "some-value")
# instrument elasticsearch with request and response hooks
ElasticsearchInstrumentor().instrument(request_hook=request_hook, response_hook=response_hook)
# Using elasticsearch as normal now will automatically generate spans,
# including user custom attributes added from the hooks
es = elasticsearch.Elasticsearch()
es.index(index='my-index', doc_type='my-type', id=1, body={'my': 'data', 'timestamp': datetime.now()})
es.get(index='my-index', doc_type='my-type', id=1)
API
---
"""

import re
from logging import getLogger
from os import environ
from typing import Collection

import elasticsearch
import elasticsearch.exceptions
from wrapt import wrap_function_wrapper as _wrap

from opentelemetry.instrumentation.elasticsearch.package import _instruments
from opentelemetry.instrumentation.elasticsearch.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import SpanKind, get_tracer

logger = getLogger(__name__)


# Values to add as tags from the actual
# payload returned by Elasticsearch, if any.
_ATTRIBUTES_FROM_RESULT = [
"found",
"timed_out",
"took",
]

_DEFAULT_OP_NAME = "request"


class ElasticsearchInstrumentor(BaseInstrumentor):
"""An instrumentor for elasticsearch
See `BaseInstrumentor`
"""

def __init__(self, span_name_prefix=None):
if not span_name_prefix:
span_name_prefix = environ.get(
Expand All @@ -124,80 +92,108 @@ def __init__(self, span_name_prefix=None):
)
self._span_name_prefix = span_name_prefix.strip()
super().__init__()

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _instrument(self, **kwargs):
"""
Instruments elasticsearch module
Instruments elasticsearch module
"""
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, __version__, tracer_provider)
request_hook = kwargs.get("request_hook")
response_hook = kwargs.get("response_hook")
@@ -143,49 +143,124 @@ def _instrument(self, **kwargs):
tracer, self._span_name_prefix, request_hook, response_hook
),
)
_wrap(
elasticsearch,
"AsyncTransport.perform_request",
_wrap_perform_request(
_wrap_perform_async_request(
tracer, self._span_name_prefix, request_hook, response_hook
),
)

def _uninstrument(self, **kwargs):
unwrap(elasticsearch.Transport, "perform_request")
unwrap(elasticsearch.AsyncTransport, "perform_request")


_regex_doc_url = re.compile(r"/_doc/([^/]+)")

# search api https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html
_regex_search_url = re.compile(r"/([^/]+)/_search[/]?")

def _extract(args, kwargs, span_name_prefix):
method = url = None
try:
method, url, *_ = args
except IndexError:
logger.warning(
"expected perform_request to receive two positional arguments. "
"Got %d",
len(args),
)
op_name = span_name_prefix + (url or method or _DEFAULT_OP_NAME)
doc_id = None
if url:
# TODO: This regex-based solution avoids creating an unbounded number of span names,
# but should be replaced by instrumenting individual Elasticsearch methods instead of
# Transport.perform_request()
# A limitation of the regex is that only the '_doc' mapping type is supported.
# Mapping types are deprecated since Elasticsearch 7
# https://github.com/open-telemetry/opentelemetry-python-contrib/issues/708
match = _regex_doc_url.search(url)
if match is not None:
# Remove the full document ID from the URL
doc_span = match.span()
op_name = (
span_name_prefix
+ url[: doc_span[0]]
+ "/_doc/:id"
+ url[doc_span[1] :]
)
# Put the document ID in attributes
doc_id = match.group(1)
params = kwargs.get("params", {})
body = kwargs.get("body", None)
return method, url, op_name, body, params, doc_id


def _set_span_attributes(span, url, method, body, params, doc_id):
attributes = {
SpanAttributes.DB_SYSTEM: "elasticsearch",
}
if url:
attributes["elasticsearch.url"] = url
if method:
attributes["elasticsearch.method"] = method
if body:
attributes[SpanAttributes.DB_STATEMENT] = str(body)
if params:
attributes["elasticsearch.params"] = str(params)
if doc_id:
attributes["elasticsearch.id"] = doc_id
for key, value in attributes.items():
span.set_attribute(key, value)


def _set_span_attributes_from_rv(span, return_value):
for member in _ATTRIBUTES_FROM_RESULT:
if member in return_value:
span.set_attribute(
f"elasticsearch.{member}",
str(return_value[member]),
)


def _wrap_perform_request(
tracer, span_name_prefix, request_hook=None, response_hook=None
):
# pylint: disable=R0912,R0914
# pylint: disable=R0912
def wrapper(wrapped, _, args, kwargs):
method = url = None
try:
method, url, *_ = args
except IndexError:
logger.warning(
"expected perform_request to receive two positional arguments. "
"Got %d",
len(args),
)

op_name = span_name_prefix + (url or method or _DEFAULT_OP_NAME)

doc_id = None
search_target = None

if url:
# TODO: This regex-based solution avoids creating an unbounded number of span names, but should be replaced by instrumenting individual Elasticsearch methods instead of Transport.perform_request()
# A limitation of the regex is that only the '_doc' mapping type is supported. Mapping types are deprecated since Elasticsearch 7
# https://github.com/open-telemetry/opentelemetry-python-contrib/issues/708
match = _regex_doc_url.search(url)
if match is not None:
# Remove the full document ID from the URL
doc_span = match.span()
op_name = (
span_name_prefix
+ url[: doc_span[0]]
+ "/_doc/:id"
+ url[doc_span[1] :]
)
# Put the document ID in attributes
doc_id = match.group(1)
match = _regex_search_url.search(url)
if match is not None:
op_name = span_name_prefix + "/<target>/_search"
search_target = match.group(1)

params = kwargs.get("params", {})
body = kwargs.get("body", None)


method, url, op_name, body, params, doc_id = _extract(
args, kwargs, span_name_prefix
)
with tracer.start_as_current_span(
op_name,
kind=SpanKind.CLIENT,
Expand All @@ -207,35 +203,45 @@ def wrapper(wrapped, _, args, kwargs):
request_hook(span, method, url, kwargs)

if span.is_recording():
attributes = {
SpanAttributes.DB_SYSTEM: "elasticsearch",
}
if url:
attributes["elasticsearch.url"] = url
if method:
attributes["elasticsearch.method"] = method
if body:
attributes[SpanAttributes.DB_STATEMENT] = str(body)
if params:
attributes["elasticsearch.params"] = str(params)
if doc_id:
attributes["elasticsearch.id"] = doc_id
if search_target:
attributes["elasticsearch.target"] = search_target
for key, value in attributes.items():
span.set_attribute(key, value)

rv = wrapped(*args, **kwargs)
if isinstance(rv, dict) and span.is_recording():
for member in _ATTRIBUTES_FROM_RESULT:
if member in rv:
span.set_attribute(
f"elasticsearch.{member}",
str(rv[member]),
)
_set_span_attributes(span, url, method, body, params, doc_id)

return_value = wrapped(*args, **kwargs)
if isinstance(return_value, dict) and span.is_recording():
_set_span_attributes_from_rv(span, return_value)

if callable(response_hook):
response_hook(span, return_value)
return return_value

return wrapper


def _wrap_perform_async_request(
tracer, span_name_prefix, request_hook=None, response_hook=None
):
# pylint: disable=R0912
async def wrapper(wrapped, _, args, kwargs):
method, url, op_name, body, params, doc_id = _extract(
args, kwargs, span_name_prefix
)

with tracer.start_as_current_span(
op_name,
@@ -196,33 +271,14 @@ def wrapper(wrapped, _, args, kwargs):
request_hook(span, method, url, kwargs)

if span.is_recording():

_set_span_attributes(span, url, method, body, params, doc_id)

return_value = await wrapped(*args, **kwargs)
if isinstance(return_value, dict) and span.is_recording():
_set_span_attributes_from_rv(span, return_value)

if callable(response_hook):
response_hook(span, rv)
return rv
response_hook(span, return_value)
return return_value

return wrapper


0 comments on commit ac0e4e0

Please sign in to comment.