Skip to content

Commit

Permalink
fix: add support for missing OpenTelemetry library
Browse files Browse the repository at this point in the history
  • Loading branch information
dalpasso committed Dec 13, 2023
1 parent ddf1377 commit 8cf83ef
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 97 deletions.
30 changes: 12 additions & 18 deletions eodag/api/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import geojson
import pkg_resources
import yaml.parser
from opentelemetry import trace
from pkg_resources import resource_filename
from whoosh import analysis, fields
from whoosh.fields import Schema
Expand Down Expand Up @@ -1655,26 +1654,21 @@ def _do_search(
total_results = 0

try:
tracer = trace.get_tracer("eodag.tracer")
with tracer.start_as_current_span("core-search") as span:
trace_id = span.get_span_context().trace_id
timer = telemetry.get_overhead_timer(trace_id)
start_time = perf_counter()
trace_id = telemetry.get_current_trace_id()
timer = telemetry.get_overhead_timer(trace_id)
start_time = perf_counter()

if need_auth and auth_plugin and can_authenticate:
search_plugin.auth = auth_plugin.authenticate()
if need_auth and auth_plugin and can_authenticate:
search_plugin.auth = auth_plugin.authenticate()

res, nb_res = search_plugin.query(
count=count, auth=auth_plugin, **kwargs
)
res, nb_res = search_plugin.query(count=count, auth=auth_plugin, **kwargs)

end_time = perf_counter()
total_time = end_time - start_time
telemetry.record_outbound_request_duration(
search_plugin.provider, total_time
)
if timer:
timer.record_subtask_time(total_time)
end_time = perf_counter()
total_time = end_time - start_time
telemetry.record_outbound_request_duration(
search_plugin.provider, total_time
)
timer.record_subtask_time(total_time)

# Only do the pagination computations when it makes sense. For example,
# for a search by id, we can reasonably guess that the provider will return
Expand Down
125 changes: 59 additions & 66 deletions eodag/rest/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
from fastapi.middleware.cors import CORSMiddleware
from fastapi.openapi.utils import get_openapi
from fastapi.responses import ORJSONResponse, StreamingResponse
from opentelemetry import trace
from pydantic import BaseModel
from starlette.exceptions import HTTPException as StarletteHTTPException

Expand Down Expand Up @@ -399,26 +398,24 @@ def stac_collections_item_download(
collection_id: str, item_id: str, request: Request
) -> StreamingResponse:
"""STAC collection item download"""
tracer = trace.get_tracer("eodag.tracer")
with tracer.start_as_current_span("server-download") as span:
trace_id = span.get_span_context().trace_id
timer = telemetry.create_overhead_timer(trace_id)
timer.start_global_timer()
logger.debug(f"URL: {request.url}")

arguments = dict(request.query_params)
provider = arguments.pop("provider", None)

response = download_stac_item_by_id_stream(
catalogs=[collection_id], item_id=item_id, provider=provider, **arguments
)
trace_id = telemetry.get_current_trace_id()
timer = telemetry.create_overhead_timer(trace_id)
timer.start_global_timer()
logger.debug(f"URL: {request.url}")

arguments = dict(request.query_params)
provider = arguments.pop("provider", None)

response = download_stac_item_by_id_stream(
catalogs=[collection_id], item_id=item_id, provider=provider, **arguments
)

timer.stop_global_timer()
telemetry.record_request_duration(provider, timer.get_global_time())
telemetry.record_request_overhead_duration(provider, timer.get_overhead_time())
telemetry.delete_overhead_timer(trace_id)
timer.stop_global_timer()
telemetry.record_request_duration(provider, timer.get_global_time())
telemetry.record_request_overhead_duration(provider, timer.get_overhead_time())
telemetry.delete_overhead_timer(trace_id)

return response
return response


@router.get(
Expand Down Expand Up @@ -602,28 +599,26 @@ def stac_catalogs_item_download(
catalogs: str, item_id: str, request: Request
) -> StreamingResponse:
"""STAC Catalog item download"""
tracer = trace.get_tracer("eodag.tracer")
with tracer.start_as_current_span("server-download") as span:
trace_id = span.get_span_context().trace_id
timer = telemetry.create_overhead_timer(trace_id)
timer.start_global_timer()
logger.debug(f"URL: {request.url}")
trace_id = telemetry.get_current_trace_id()
timer = telemetry.create_overhead_timer(trace_id)
timer.start_global_timer()
logger.debug(f"URL: {request.url}")

arguments = dict(request.query_params)
provider = arguments.pop("provider", None)
arguments = dict(request.query_params)
provider = arguments.pop("provider", None)

list_catalog = catalogs.strip("/").split("/")
list_catalog = catalogs.strip("/").split("/")

response = download_stac_item_by_id_stream(
catalogs=list_catalog, item_id=item_id, provider=provider, **arguments
)
response = download_stac_item_by_id_stream(
catalogs=list_catalog, item_id=item_id, provider=provider, **arguments
)

timer.stop_global_timer()
telemetry.record_request_duration(provider, timer.get_global_time())
telemetry.record_request_overhead_duration(provider, timer.get_overhead_time())
telemetry.delete_overhead_timer(trace_id)
timer.stop_global_timer()
telemetry.record_request_duration(provider, timer.get_global_time())
telemetry.record_request_overhead_duration(provider, timer.get_overhead_time())
telemetry.delete_overhead_timer(trace_id)

return response
return response


@router.get(
Expand Down Expand Up @@ -774,16 +769,14 @@ def stac_search(
request: Request, search_body: Optional[SearchBody] = None
) -> ORJSONResponse:
"""STAC collections items"""
tracer = trace.get_tracer("eodag.tracer")
with tracer.start_as_current_span("server-search") as span:
trace_id = span.get_span_context().trace_id
timer = telemetry.create_overhead_timer(trace_id)
timer.start_global_timer()
logger.debug(f"URL: {request.url}")
logger.debug(f"Body: {search_body}")
trace_id = telemetry.get_current_trace_id()
timer = telemetry.create_overhead_timer(trace_id)
timer.start_global_timer()
logger.debug(f"URL: {request.url}")
logger.debug(f"Body: {search_body}")

url = request.state.url
url_root = request.state.url_root
url = request.state.url
url_root = request.state.url_root

if search_body is None:
body = {}
Expand All @@ -792,29 +785,29 @@ def stac_search(
if body["sortby"] is not None:
body["sortby"] = convert_sortby_to_get_format(body["sortby"])

arguments = dict(request.query_params, **body)
provider = arguments.pop("provider", None)
arguments = dict(request.query_params, **body)
provider = arguments.pop("provider", None)

# metrics
args_collections = arguments.get("collections", None)
product_type = args_collections.split(",")[0] if args_collections else None
telemetry.record_searched_product_type(product_type)
# metrics
args_collections = arguments.get("collections", None)
product_type = args_collections.split(",")[0] if args_collections else None
telemetry.record_searched_product_type(product_type)

response = search_stac_items(
url=url,
arguments=arguments,
root=url_root,
provider=provider,
method=request.method,
)
resp = ORJSONResponse(
content=response, status_code=200, media_type="application/json"
)
timer.stop_global_timer()
telemetry.record_request_duration(provider, timer.get_global_time())
telemetry.record_request_overhead_duration(provider, timer.get_overhead_time())
telemetry.delete_overhead_timer(trace_id)
return resp
response = search_stac_items(
url=url,
arguments=arguments,
root=url_root,
provider=provider,
method=request.method,
)
resp = ORJSONResponse(
content=response, status_code=200, media_type="application/json"
)
timer.stop_global_timer()
telemetry.record_request_duration(provider, timer.get_global_time())
telemetry.record_request_overhead_duration(provider, timer.get_overhead_time())
telemetry.delete_overhead_timer(trace_id)
return resp


app.include_router(router)
Expand Down
4 changes: 0 additions & 4 deletions eodag/rest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1342,10 +1342,6 @@ def telemetry_init(app: FastAPI):
:param app: FastAPI to automatically instrument.
:type app: FastAPI"""

if not os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT"):
return None

telemetry.configure_instruments(eodag_api, app)


Expand Down
42 changes: 33 additions & 9 deletions eodag/utils/otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from __future__ import annotations

import logging
import os
import sys
from time import perf_counter
from typing import TYPE_CHECKING, Dict, Iterable

Expand All @@ -37,7 +39,7 @@
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.metrics import CallbackOptions, Instrument, Observation
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics._internal.export import PeriodicExportingMetricReader
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
Expand Down Expand Up @@ -100,7 +102,7 @@ def __init__(self):
self._eodag_app: FastAPI = None
self._instruments: Dict[str, Instrument] = {}
self._overhead_timers: Dict[str, OverheadTimer] = {}
self._instrumented: bool = False
self._is_instrumented: bool = False

def configure_instruments(self, eodag_api: EODataAccessGateway, eodag_app: FastAPI):
"""Configure the instrumentation.
Expand All @@ -110,6 +112,12 @@ def configure_instruments(self, eodag_api: EODataAccessGateway, eodag_app: FastA
:param eodag_app: EODAG's app
:type eodag_app: FastAPI
"""

if not os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT"):
return None
if "opentelemetry" not in sys.modules:
return None

self._eodag_api = eodag_api
self._eodag_app = eodag_app
self._instruments.clear()
Expand Down Expand Up @@ -167,7 +175,7 @@ def configure_instruments(self, eodag_api: EODataAccessGateway, eodag_app: FastA
unit="s",
description="Measure the duration of the outbound HTTP request",
)
self._instrumented = True
self._is_instrumented = True

def _available_providers_callback(
self, options: CallbackOptions
Expand Down Expand Up @@ -212,7 +220,7 @@ def record_downloaded_data(self, provider: str, byte_count: int):
:param byte_count: Number of bytes downloaded.
:type byte_count: int
"""
if not self._instrumented:
if not self._is_instrumented:
return

self._instruments["downloaded_data_bytes_total"].add(
Expand All @@ -226,7 +234,7 @@ def record_searched_product_type(self, product_type: str):
:param product_type: The product type.
:type product_type: str
"""
if not self._instrumented:
if not self._is_instrumented:
return

self._instruments["searched_product_types_total"].add(
Expand All @@ -241,7 +249,7 @@ def record_request_duration(self, provider: str, time: float):
:param time: Duration of the request.
:type time: float
"""
if not self._instrumented:
if not self._is_instrumented:
return

attributes = {"provider": str(provider)}
Expand All @@ -255,7 +263,7 @@ def record_request_overhead_duration(self, provider: str, time: float):
:param time: Duration of the overhead.
:type time: float
"""
if not self._instrumented:
if not self._is_instrumented:
return

attributes = {"provider": str(provider)}
Expand All @@ -269,7 +277,7 @@ def record_outbound_request_duration(self, provider: str, time: float):
:param time: Duration of the request.
:type time: float
"""
if not self._instrumented:
if not self._is_instrumented:
return

attributes = {"provider": str(provider)}
Expand All @@ -286,8 +294,10 @@ def create_overhead_timer(self, timer_id: str) -> OverheadTimer:
:returns: The new timer.
:rtype: OverheadTimer
"""
if not self._is_instrumented:
return OverheadTimer()
timer = OverheadTimer()
self._overhead_timers[timer_id] = OverheadTimer()
self._overhead_timers[timer_id] = timer
return timer

def delete_overhead_timer(self, timer_id: str):
Expand All @@ -296,6 +306,8 @@ def delete_overhead_timer(self, timer_id: str):
:param timer_id: The ID of the timer to delete.
:type timer_id: str
"""
if not self._is_instrumented:
return None
del self._overhead_timers[timer_id]

def get_overhead_timer(self, timer_id: str) -> OverheadTimer:
Expand All @@ -306,7 +318,19 @@ def get_overhead_timer(self, timer_id: str) -> OverheadTimer:
:returns: The timer.
:rtype: OverheadTimer
"""
if not self._is_instrumented:
return OverheadTimer()
return self._overhead_timers[timer_id]

def get_current_trace_id(self):
"""Get the trace ID of the current span
:returns: The trace ID.
:rtype: int
"""
if not self._is_instrumented:
return None
return trace.get_current_span().get_span_context().trace_id


telemetry: Telemetry = Telemetry()

0 comments on commit 8cf83ef

Please sign in to comment.