Skip to content

Commit

Permalink
feat(traces): add v1/traces HTTP endpoint to handle `ExportTraceSer…
Browse files Browse the repository at this point in the history
…viceRequest` (#1968)

feat(traces): add `v1/traces` HTTP endpoint to handle `ExportTraceServiceRequest`
  • Loading branch information
RogerHYang authored Dec 21, 2023
1 parent d5e5a85 commit 3c94dea
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 5 deletions.
11 changes: 6 additions & 5 deletions src/phoenix/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from phoenix.server.api.schema import schema
from phoenix.server.evaluation_handler import EvaluationHandler
from phoenix.server.span_handler import SpanHandler
from phoenix.server.trace_handler import TraceHandler

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -185,11 +186,11 @@ def create_app(
else [
Route(
"/v1/spans",
type(
"SpanEndpoint",
(SpanHandler,),
{"queue": traces},
),
type("SpanEndpoint", (SpanHandler,), {"queue": traces}),
),
Route(
"/v1/traces",
type("TraceEndpoint", (TraceHandler,), {"queue": traces}),
),
]
)
Expand Down
56 changes: 56 additions & 0 deletions src/phoenix/server/trace_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import asyncio
import gzip
import zlib
from typing import Protocol

from google.protobuf.message import DecodeError
from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import (
ExportTraceServiceRequest,
)
from opentelemetry.proto.trace.v1.trace_pb2 import Span
from starlette.endpoints import HTTPEndpoint
from starlette.requests import Request
from starlette.responses import Response
from starlette.status import HTTP_415_UNSUPPORTED_MEDIA_TYPE, HTTP_422_UNPROCESSABLE_ENTITY


class SupportsPutSpan(Protocol):
def put(self, span: Span) -> None:
...


class TraceHandler(HTTPEndpoint):
queue: SupportsPutSpan

async def post(self, request: Request) -> Response:
content_type = request.headers.get("content-type")
if content_type != "application/x-protobuf":
return Response(
content=f"Unsupported content type: {content_type}",
status_code=HTTP_415_UNSUPPORTED_MEDIA_TYPE,
)
content_encoding = request.headers.get("content-encoding")
if content_encoding and content_encoding not in ("gzip", "deflate"):
return Response(
content=f"Unsupported content encoding: {content_encoding}",
status_code=HTTP_415_UNSUPPORTED_MEDIA_TYPE,
)
body = await request.body()
if content_encoding == "gzip":
body = gzip.decompress(body)
elif content_encoding == "deflate":
body = zlib.decompress(body)
req = ExportTraceServiceRequest()
try:
req.ParseFromString(body)
except DecodeError:
return Response(
content="Request body is invalid ExportTraceServiceRequest",
status_code=HTTP_422_UNPROCESSABLE_ENTITY,
)
for resource_spans in req.resource_spans:
for scope_span in resource_spans.scope_spans:
for span in scope_span.spans:
self.queue.put(span)
await asyncio.sleep(0)
return Response()

0 comments on commit 3c94dea

Please sign in to comment.