Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backend): add e2e useful tracing #2037

Merged
merged 2 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,16 @@ jobs:
run: echo "INFRAHUB_DB_BACKUP_PORT=$(shuf -n 1 -i 10000-30000)" >> $GITHUB_ENV
- name: Select vmagent port
run: echo "VMAGENT_PORT=$(shuf -n 1 -i 10000-30000)" >> $GITHUB_ENV

- name: Enable tracing
run: echo "INFRAHUB_TRACE_ENABLE=true" >> $GITHUB_ENV
- name: Set tracing configuration
run: echo "INFRAHUB_TRACE_INSECURE=false" >> $GITHUB_ENV
- name: Set tracing configuration
run: echo "INFRAHUB_TRACE_EXPORTER_ENDPOINT=${{ secrets.TRACING_ENDPOINT }}" >> $GITHUB_ENV
- name: Set tracing configuration
run: echo "OTEL_RESOURCE_ATTRIBUTES=github.run_id=${GITHUB_RUN_ID}" >> $GITHUB_ENV

- name: "Store start time"
run: echo TEST_START_TIME=$(date +%s)000 >> $GITHUB_ENV

Expand Down
13 changes: 12 additions & 1 deletion backend/infrahub/cli/git_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from prometheus_client import start_http_server
from rich.logging import RichHandler

from infrahub import config
from infrahub import __version__, config
from infrahub.components import ComponentType
from infrahub.core.initialization import initialization
from infrahub.database import InfrahubDatabase, get_db
Expand All @@ -20,6 +20,7 @@
from infrahub.services import InfrahubServices
from infrahub.services.adapters.cache.redis import RedisCache
from infrahub.services.adapters.message_bus.rabbitmq import RabbitMQMessageBus
from infrahub.trace import configure_trace

app = typer.Typer()

Expand Down Expand Up @@ -66,6 +67,16 @@ async def _start(debug: bool, port: int) -> None:
client = await InfrahubClient.init(address=config.SETTINGS.main.internal_address, retry_on_failure=True, log=log)
await client.branch.all()

# Initialize trace
if config.SETTINGS.trace.enable:
configure_trace(
service="infrahub-git-agent",
version=__version__,
exporter_type=config.SETTINGS.trace.exporter_type,
exporter_endpoint=config.SETTINGS.trace.exporter_endpoint,
exporter_protocol=config.SETTINGS.trace.exporter_protocol,
)

# Initialize the lock
initialize_lock()

Expand Down
29 changes: 0 additions & 29 deletions backend/infrahub/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,35 +279,6 @@ class TraceSettings(BaseSettings):
default=TraceTransportProtocol.GRPC, description="Protocol to be used for exporting traces"
)
exporter_endpoint: Optional[str] = Field(default=None, description="OTLP endpoint for exporting traces")
exporter_port: Optional[int] = Field(
default=None, ge=1, le=65535, description="Specified if running on a non default port (4317)"
)

@property
def service_port(self) -> int:
if self.exporter_protocol == TraceTransportProtocol.GRPC:
default_port = 4317
elif self.exporter_protocol == TraceTransportProtocol.HTTP_PROTOBUF:
default_port = 4318
else:
default_port = 4317

return self.exporter_port or default_port

@property
def trace_endpoint(self) -> Optional[str]:
if not self.exporter_endpoint:
return None
if self.insecure:
scheme = "http://"
else:
scheme = "https://"
endpoint = str(self.exporter_endpoint) + ":" + str(self.service_port)

if self.exporter_protocol == TraceTransportProtocol.HTTP_PROTOBUF:
endpoint += "/v1/traces"

return scheme + endpoint


@dataclass
Expand Down
21 changes: 14 additions & 7 deletions backend/infrahub/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
Record,
)
from neo4j.exceptions import ClientError, Neo4jError, ServiceUnavailable, TransientError
from opentelemetry import trace
from typing_extensions import Self

from infrahub import config
Expand Down Expand Up @@ -186,17 +187,23 @@ async def close(self):
async def execute_query(
self, query: str, params: Optional[Dict[str, Any]] = None, name: Optional[str] = "undefined"
) -> List[Record]:
with QUERY_EXECUTION_METRICS.labels(str(self._session_mode), name).time():
response = await self.run_query(query=query, params=params)
return [item async for item in response]
with trace.get_tracer(__name__).start_as_current_span("execute_db_query") as span:
span.set_attribute("query", query)

with QUERY_EXECUTION_METRICS.labels(str(self._session_mode), name).time():
response = await self.run_query(query=query, params=params)
return [item async for item in response]

async def execute_query_with_metadata(
self, query: str, params: Optional[Dict[str, Any]] = None, name: Optional[str] = "undefined"
) -> Tuple[List[Record], Dict[str, Any]]:
with QUERY_EXECUTION_METRICS.labels(str(self._session_mode), name).time():
response = await self.run_query(query=query, params=params)
results = [item async for item in response]
return results, response._metadata or {}
with trace.get_tracer(__name__).start_as_current_span("execute_db_query_with_metadata") as span:
span.set_attribute("query", query)

with QUERY_EXECUTION_METRICS.labels(str(self._session_mode), name).time():
response = await self.run_query(query=query, params=params)
results = [item async for item in response]
return results, response._metadata or {}

async def run_query(self, query: str, params: Optional[Dict[str, Any]] = None) -> AsyncResult:
if self.is_transaction:
Expand Down
26 changes: 15 additions & 11 deletions backend/infrahub/graphql/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from graphql.utilities import (
get_operation_ast,
)
from opentelemetry import trace
from starlette.datastructures import UploadFile
from starlette.requests import HTTPConnection, Request
from starlette.responses import JSONResponse, Response
Expand Down Expand Up @@ -222,17 +223,20 @@ async def _handle_http_request(
"query_id": "",
}

with GRAPHQL_DURATION_METRICS.labels(**labels).time():
result = await graphql(
schema=graphql_params.schema,
source=query,
context_value=graphql_params.context,
root_value=self.root_value,
middleware=self.middleware,
variable_values=variable_values,
operation_name=operation_name,
execution_context_class=self.execution_context_class,
)
with trace.get_tracer(__name__).start_as_current_span("execute_graphql") as span:
span.set_attributes(labels)

with GRAPHQL_DURATION_METRICS.labels(**labels).time():
result = await graphql(
schema=graphql_params.schema,
source=query,
context_value=graphql_params.context,
root_value=self.root_value,
middleware=self.middleware,
variable_values=variable_values,
operation_name=operation_name,
execution_context_class=self.execution_context_class,
)

response: Dict[str, Any] = {"data": result.data}
if result.errors:
Expand Down
2 changes: 2 additions & 0 deletions backend/infrahub/graphql/mutations/branch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pydantic
from graphene import Boolean, Field, InputObjectType, List, Mutation, String
from infrahub_sdk.utils import extract_fields, extract_fields_first_node
from opentelemetry import trace
from typing_extensions import Self

from infrahub import config, lock
Expand Down Expand Up @@ -55,6 +56,7 @@ class Arguments:

@classmethod
@retry_db_transaction(name="branch_create")
@trace.get_tracer(__name__).start_as_current_span("branch_create")
async def mutate(
cls, root: dict, info: GraphQLResolveInfo, data: BranchCreateInput, background_execution: bool = False
) -> Self:
Expand Down
36 changes: 22 additions & 14 deletions backend/infrahub/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from infrahub_sdk.timestamp import TimestampFormatError
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor, Span
from pydantic import ValidationError
from starlette_exporter import PrometheusMiddleware, handle_metrics

Expand All @@ -34,7 +34,7 @@
from infrahub.services import InfrahubServices, services
from infrahub.services.adapters.cache.redis import RedisCache
from infrahub.services.adapters.message_bus.rabbitmq import RabbitMQMessageBus
from infrahub.trace import add_span_exception, configure_trace, get_traceid, get_tracer
from infrahub.trace import add_span_exception, configure_trace, get_traceid
from infrahub.worker import WORKER_IDENTITY


Expand All @@ -44,9 +44,10 @@ async def app_initialization(application: FastAPI) -> None:
# Initialize trace
if config.SETTINGS.trace.enable:
configure_trace(
service="infrahub-server",
version=__version__,
exporter_type=config.SETTINGS.trace.exporter_type,
exporter_endpoint=config.SETTINGS.trace.trace_endpoint,
exporter_endpoint=config.SETTINGS.trace.exporter_endpoint,
exporter_protocol=config.SETTINGS.trace.exporter_protocol,
)

Expand Down Expand Up @@ -95,8 +96,13 @@ async def lifespan(application: FastAPI) -> AsyncGenerator:
redoc_url="/api/redoc",
)

FastAPIInstrumentor().instrument_app(app, excluded_urls=".*/metrics")
tracer = get_tracer()

def server_request_hook(span: Span, scope: dict) -> None: # pylint: disable=unused-argument
if span and span.is_recording():
span.set_attribute("worker", WORKER_IDENTITY)


FastAPIInstrumentor().instrument_app(app, excluded_urls=".*/metrics", server_request_hook=server_request_hook)

FRONTEND_DIRECTORY = os.environ.get("INFRAHUB_FRONTEND_DIRECTORY", os.path.abspath("frontend"))
FRONTEND_ASSET_DIRECTORY = f"{FRONTEND_DIRECTORY}/dist/assets"
Expand All @@ -118,15 +124,17 @@ async def lifespan(application: FastAPI) -> AsyncGenerator:
async def logging_middleware(request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response:
clear_log_context()
request_id = correlation_id.get()
with tracer.start_as_current_span(f"processing request {request_id}"):
trace_id = get_traceid()
set_log_data(key="request_id", value=request_id)
set_log_data(key="app", value="infrahub.api")
set_log_data(key="worker", value=WORKER_IDENTITY)
if trace_id:
set_log_data(key="trace_id", value=trace_id)
response = await call_next(request)
return response

set_log_data(key="request_id", value=request_id)
set_log_data(key="app", value="infrahub.api")
set_log_data(key="worker", value=WORKER_IDENTITY)

trace_id = get_traceid()
if trace_id:
set_log_data(key="trace_id", value=trace_id)

response = await call_next(request)
return response


@app.middleware("http")
Expand Down
29 changes: 29 additions & 0 deletions backend/infrahub/services/adapters/message_bus/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
from typing import TYPE_CHECKING, Awaitable, Callable, List, MutableMapping, Optional, Type, TypeVar

import aio_pika
import opentelemetry.instrumentation.aio_pika.span_builder
import ujson
from infrahub_sdk import UUIDT
from opentelemetry.instrumentation.aio_pika import AioPikaInstrumentor
from opentelemetry.semconv.trace import SpanAttributes

from infrahub import config
from infrahub.components import ComponentType
Expand All @@ -24,6 +27,7 @@
AbstractQueue,
AbstractRobustConnection,
)
from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder

from infrahub.config import BrokerSettings
from infrahub.services import InfrahubServices
Expand All @@ -32,6 +36,29 @@
ResponseClass = TypeVar("ResponseClass")


AioPikaInstrumentor().instrument()


# TODO: remove this once https://github.com/open-telemetry/opentelemetry-python-contrib/issues/1835 is resolved
def patch_spanbuilder_set_channel() -> None:
"""
The default SpanBuilder.set_channel does not work with aio_pika 9.1 and the refactored connection
attribute
"""

def set_channel(self: SpanBuilder, channel: AbstractChannel) -> None:
if hasattr(channel, "_connection"):
url = channel._connection.url
self._attributes.update(
{
SpanAttributes.NET_PEER_NAME: url.host,
SpanAttributes.NET_PEER_PORT: url.port,
}
)

opentelemetry.instrumentation.aio_pika.span_builder.SpanBuilder.set_channel = set_channel # type: ignore


async def _add_request_id(message: InfrahubMessage) -> None:
log_data = get_log_data()
message.meta.request_id = log_data.get("request_id", "")
Expand All @@ -54,6 +81,8 @@ def __init__(self, settings: Optional[BrokerSettings] = None) -> None:
self.futures: MutableMapping[str, asyncio.Future] = {}

async def initialize(self, service: InfrahubServices) -> None:
patch_spanbuilder_set_channel()

self.service = service
self.connection = await aio_pika.connect_robust(
host=self.settings.address,
Expand Down
25 changes: 18 additions & 7 deletions backend/infrahub/trace.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter as GRPCSpanExporter,
Expand All @@ -10,9 +12,7 @@
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.trace import StatusCode


def get_tracer(name: str = "infrahub") -> trace.Tracer:
return trace.get_tracer(name)
from infrahub.worker import WORKER_IDENTITY


def get_current_span_with_context() -> trace.Span:
Expand Down Expand Up @@ -55,7 +55,7 @@ def add_span_exception(exception: Exception) -> None:


def create_tracer_provider(
version: str, exporter_type: str, exporter_endpoint: str = None, exporter_protocol: str = None
service: str, version: str, exporter_type: str, exporter_endpoint: str = None, exporter_protocol: str = None
) -> TracerProvider:
# Create a BatchSpanProcessor exporter based on the type
if exporter_type == "console":
Expand All @@ -70,8 +70,19 @@ def create_tracer_provider(
else:
raise ValueError("Exporter type unsupported by Infrahub")

extra_attributes = {}
if os.getenv("OTEL_RESOURCE_ATTRIBUTES"):
extra_attributes = dict(attr.split("=") for attr in os.getenv("OTEL_RESOURCE_ATTRIBUTES").split(","))

# Resource can be required for some backends, e.g. Jaeger
resource = Resource(attributes={"service.name": "infrahub", "service.version": version})
resource = Resource(
attributes={
"service.name": service,
"service.version": version,
"worker.id": WORKER_IDENTITY,
**extra_attributes,
}
)
span_processor = BatchSpanProcessor(exporter)
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(span_processor)
Expand All @@ -80,16 +91,16 @@ def create_tracer_provider(


def configure_trace(
version: str, exporter_type: str, exporter_endpoint: str = None, exporter_protocol: str = None
service: str, version: str, exporter_type: str, exporter_endpoint: str | None = None, exporter_protocol: str = None
) -> None:
# Create a trace provider with the exporter
tracer_provider = create_tracer_provider(
service=service,
version=version,
exporter_type=exporter_type,
exporter_endpoint=exporter_endpoint,
exporter_protocol=exporter_protocol,
)
tracer_provider.get_tracer(__name__)

# Register the trace provider
trace.set_tracer_provider(tracer_provider)
Loading
Loading