Skip to content

Commit

Permalink
Trace all Oban events
Browse files Browse the repository at this point in the history
  • Loading branch information
danschultzer committed Dec 15, 2024
1 parent 8b9491d commit 350dc46
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 13 deletions.
16 changes: 9 additions & 7 deletions instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,17 @@ defmodule OpentelemetryOban do
@doc """
Initializes and configures telemetry handlers.
By default jobs and plugins are traced. If you wish to trace only jobs then
use:
By default everything is traced. If you wish to trace only jobs then use:
OpentelemetryOban.setup(trace: [:jobs])
Note that if you don't trace plugins, but inside the plugins, there are spans
from other instrumentation libraries (e.g. ecto) then these will still be
traced. This setting controls only the spans that are created by
opentelemetry_oban.
Note that if you don't trace plugins or internal, there will be spans from
other instrumentation libraries (e.g. ecto) that would be traced. This setting
controls only the spans that are created by opentelemetry_oban.
"""
@spec setup() :: :ok
def setup(opts \\ []) do
trace = Keyword.get(opts, :trace, [:jobs, :plugins])
trace = Keyword.get(opts, :trace, [:jobs, :plugins, :internal])

if Enum.member?(trace, :jobs) do
OpentelemetryOban.JobHandler.attach()
Expand All @@ -48,6 +46,10 @@ defmodule OpentelemetryOban do
OpentelemetryOban.PluginHandler.attach()
end

if Enum.member?(trace, :internal) do
OpentelemetryOban.InternalHandler.attach()
end

:ok
end

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
defmodule OpentelemetryOban.InternalHandler do
alias OpenTelemetry.Span
alias OpenTelemetry.SemanticConventions.Trace

require Trace

@tracer_id __MODULE__

def attach() do
:telemetry.attach_many(
{__MODULE__, :internal},
Enum.flat_map(
[
[:engine, :init],
[:engine, :refresh],
[:engine, :put_meta],
[:engine, :check_available],
[:engine, :cancel_all_jobs],
[:engine, :fetch_jobs],
[:engine, :insert_all_jobs],
[:engine, :prune_all_jobs],
[:engine, :stage_jobs],
[:engine, :cancel_job],
[:engine, :complete_job],
[:engine, :discard_job],
[:engine, :error_job],
[:engine, :insert_job],
[:engine, :snooze_job],
[:notifier, :notify],
[:peer, :election]
],
fn event ->
[
[:oban | event ++ [:start]],
[:oban | event ++ [:stop]],
[:oban | event ++ [:exception]]
]
end
),
&__MODULE__.handle_oban_event/4,
[]
)
end

def handle_oban_event(event, _measurements, metadata, _config) do
[op | rest] = Enum.reverse(event)

case op do
:start ->
OpentelemetryTelemetry.start_telemetry_span(__MODULE__, Enum.join(Enum.reverse(rest), "."), metadata, %{kind: :consumer})

:stop ->
OpentelemetryTelemetry.end_telemetry_span(__MODULE__, metadata)

:exception ->
ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, metadata)

Span.record_exception(ctx, metadata.reason, metadata.stacktrace)
Span.set_status(ctx, OpenTelemetry.status(:error, Exception.message(metadata.reason)))

OpentelemetryTelemetry.end_telemetry_span(__MODULE__, metadata)
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ defmodule OpentelemetryOban.JobHandler do

defp attach_job_start_handler() do
:telemetry.attach(
"#{__MODULE__}.job_start",
{__MODULE__, [:job, :start]},
[:oban, :job, :start],
&__MODULE__.handle_job_start/4,
[]
Expand All @@ -23,7 +23,7 @@ defmodule OpentelemetryOban.JobHandler do

defp attach_job_stop_handler() do
:telemetry.attach(
"#{__MODULE__}.job_stop",
{__MODULE__, [:job, :stop]},
[:oban, :job, :stop],
&__MODULE__.handle_job_stop/4,
[]
Expand All @@ -32,7 +32,7 @@ defmodule OpentelemetryOban.JobHandler do

defp attach_job_exception_handler() do
:telemetry.attach(
"#{__MODULE__}.job_exception",
{__MODULE__, [:job, :exception]},
[:oban, :job, :exception],
&__MODULE__.handle_job_exception/4,
[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule OpentelemetryOban.PluginHandler do

defp attach_plugin_start_handler() do
:telemetry.attach(
"#{__MODULE__}.plugin_start",
{__MODULE__, [:plugin, :start]},
[:oban, :plugin, :start],
&__MODULE__.handle_plugin_start/4,
[]
Expand All @@ -21,7 +21,7 @@ defmodule OpentelemetryOban.PluginHandler do

defp attach_plugin_stop_handler() do
:telemetry.attach(
"#{__MODULE__}.plugin_stop",
{__MODULE__, [:plugin, :stop]},
[:oban, :plugin, :stop],
&__MODULE__.handle_plugin_stop/4,
[]
Expand All @@ -30,7 +30,7 @@ defmodule OpentelemetryOban.PluginHandler do

defp attach_plugin_exception_handler() do
:telemetry.attach(
"#{__MODULE__}.plugin_exception",
{__MODULE__, [:plugin, :exception]},
[:oban, :plugin, :exception],
&__MODULE__.handle_plugin_exception/4,
[]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
defmodule OpentelemetryOban.InternalHandlerTest do
use DataCase

require OpenTelemetry.Tracer
require OpenTelemetry.Span
require Record

for {name, spec} <- Record.extract_all(from_lib: "opentelemetry/include/otel_span.hrl") do
Record.defrecord(name, spec)
end

for {name, spec} <- Record.extract_all(from_lib: "opentelemetry_api/include/opentelemetry.hrl") do
Record.defrecord(name, spec)
end

@events [
[:engine, :init],
[:engine, :refresh],
[:engine, :put_meta],
[:engine, :check_available],
[:engine, :cancel_all_jobs],
[:engine, :fetch_jobs],
[:engine, :insert_all_jobs],
[:engine, :prune_all_jobs],
[:engine, :stage_jobs],
[:engine, :cancel_job],
[:engine, :complete_job],
[:engine, :discard_job],
[:engine, :error_job],
[:engine, :insert_job],
[:engine, :snooze_job],
[:notifier, :notify],
[:peer, :election]
]

setup do
:application.stop(:opentelemetry)
:application.set_env(:opentelemetry, :tracer, :otel_tracer_default)

:application.set_env(:opentelemetry, :processors, [
{:otel_batch_processor, %{scheduled_delay_ms: 1, exporter: {:otel_exporter_pid, self()}}}
])

:application.start(:opentelemetry)

TestHelpers.remove_oban_handlers()
OpentelemetryOban.setup(trace: [:internal])

:ok
end

test "does not create spans when internal tracing is disabled" do
TestHelpers.remove_oban_handlers()
OpentelemetryOban.setup(trace: [])

execute_internal_event([:peer, :election])

refute_receive {:span, span(name: "oban.peer.election")}
end

test "records span on internal execution" do
execute_internal_event([:peer, :election])

assert_receive {:span, span(name: "oban.peer.election")}
end

test "records span on error" do
:telemetry.execute(
[:oban, :peer, :election, :start],
%{system_time: System.system_time()},
%{}
)

exception = %UndefinedFunctionError{
arity: 0,
function: :error,
message: nil,
module: Some,
reason: nil
}

:telemetry.execute(
[:oban, :peer, :election, :exception],
%{duration: 444},
%{
kind: :error,
stacktrace: [
{Some, :error, [], []}
],
reason: exception
}
)

expected_status = OpenTelemetry.status(:error, Exception.message(exception))

assert_receive {:span,
span(
name: "oban.peer.election",
events: events,
status: ^expected_status
)}

[
event(
name: :exception,
attributes: event_attributes
)
] = :otel_events.list(events)

assert [:"exception.message", :"exception.stacktrace", :"exception.type"] ==
Enum.sort(Map.keys(:otel_attributes.map(event_attributes)))
end

for event <- @events do
test "#{inspect([:oban | event])} spans" do
execute_internal_event(unquote(event))

assert_receive {:span, span(name: "oban.#{unquote(Enum.join(event, "."))}")}

:ok
end
end

defp execute_internal_event(event) do
:telemetry.execute(
[:oban | event ++ [:start]],
%{system_time: System.system_time()},
%{}
)

:telemetry.execute(
[:oban | event ++ [:stop]],
%{duration: 42069},
%{}
)
end
end

0 comments on commit 350dc46

Please sign in to comment.