Skip to content

Commit

Permalink
make configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
brentjr committed Oct 15, 2023
1 parent 7f2162a commit ea864d1
Show file tree
Hide file tree
Showing 5 changed files with 465 additions and 382 deletions.
18 changes: 13 additions & 5 deletions instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,16 @@ defmodule OpentelemetryOban do
@doc """
Initializes and configures telemetry handlers.
By default jobs and plugins are traced. If you wish to trace only jobs then
use:
Example:
OpentelemetryOban.setup(trace: [:jobs])
OpentelemetryOban.setup(trace: [:jobs])
Options:
* `:trace` - A list of events to trace. Supported values are `:jobs` and `:plugins`,
defaults to [:jobs, :plugins].
* `:time_unit` - a time unit used to convert the timing values, defaults
to `:microsecond`.
Note that if you don't trace plugins, but inside the plugins, there are spans
from other instrumentation libraries (e.g. ecto) then these will still be
Expand All @@ -39,13 +45,15 @@ defmodule OpentelemetryOban do
@spec setup() :: :ok
def setup(opts \\ []) do
trace = Keyword.get(opts, :trace, [:jobs, :plugins])
time_unit = Keyword.get(opts, :time_unit, :microsecond)
config = %{time_unit: time_unit}

if Enum.member?(trace, :jobs) do
OpentelemetryOban.JobHandler.attach()
OpentelemetryOban.JobHandler.attach(config)
end

if Enum.member?(trace, :plugins) do
OpentelemetryOban.PluginHandler.attach()
OpentelemetryOban.PluginHandler.attach(config)
end

:ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ defmodule OpentelemetryOban.JobHandler do

@tracer_id __MODULE__

def attach() do
def attach(config) do
attach_job_start_handler()
attach_job_stop_handler()
attach_job_exception_handler()
attach_job_stop_handler(config)
attach_job_exception_handler(config)
end

defp attach_job_start_handler() do
Expand All @@ -21,21 +21,21 @@ defmodule OpentelemetryOban.JobHandler do
)
end

defp attach_job_stop_handler() do
defp attach_job_stop_handler(config) do
:telemetry.attach(
"#{__MODULE__}.job_stop",
[:oban, :job, :stop],
&__MODULE__.handle_job_stop/4,
[]
config
)
end

defp attach_job_exception_handler() do
defp attach_job_exception_handler(config) do
:telemetry.attach(
"#{__MODULE__}.job_exception",
[:oban, :job, :exception],
&__MODULE__.handle_job_exception/4,
[]
config
)
end

Expand Down Expand Up @@ -83,33 +83,36 @@ defmodule OpentelemetryOban.JobHandler do
})
end

def handle_job_stop(_event, measurements, metadata, _config) do
set_measurements_attributes(measurements)
def handle_job_stop(_event, measurements, metadata, config) do
set_measurements_attributes(measurements, config)
OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
end

def handle_job_exception(
_event,
measurements,
%{stacktrace: stacktrace, error: error} = metadata,
_config
config
) do
ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, metadata)

# Record exception and mark the span as errored
Span.record_exception(ctx, error, stacktrace)
Span.set_status(ctx, OpenTelemetry.status(:error, ""))

set_measurements_attributes(measurements)
set_measurements_attributes(measurements, config)

OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
end

defp set_measurements_attributes(%{duration: duration, queue_time: queue_time}) do
defp set_measurements_attributes(%{duration: duration, queue_time: queue_time}, %{
time_unit: time_unit
}) do
OpenTelemetry.Tracer.set_attributes(%{
:"messaging.oban.duration_us" => System.convert_time_unit(duration, :native, :microsecond),
:"messaging.oban.queue_time_us" =>
System.convert_time_unit(queue_time, :nanosecond, :microsecond)
:"messaging.oban.duration_#{time_unit}" =>
System.convert_time_unit(duration, :native, time_unit),
:"messaging.oban.queue_time_#{time_unit}" =>
System.convert_time_unit(queue_time, :nanosecond, time_unit)
})
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ defmodule OpentelemetryOban.PluginHandler do

@tracer_id __MODULE__

def attach() do
def attach(config) do
attach_plugin_start_handler()
attach_plugin_stop_handler()
attach_plugin_exception_handler()
attach_plugin_stop_handler(config)
attach_plugin_exception_handler(config)
end

defp attach_plugin_start_handler() do
Expand All @@ -18,21 +18,21 @@ defmodule OpentelemetryOban.PluginHandler do
)
end

defp attach_plugin_stop_handler() do
defp attach_plugin_stop_handler(config) do
:telemetry.attach(
"#{__MODULE__}.plugin_stop",
[:oban, :plugin, :stop],
&__MODULE__.handle_plugin_stop/4,
[]
config
)
end

defp attach_plugin_exception_handler() do
defp attach_plugin_exception_handler(config) do
:telemetry.attach(
"#{__MODULE__}.plugin_exception",
[:oban, :plugin, :exception],
&__MODULE__.handle_plugin_exception/4,
[]
config
)
end

Expand All @@ -45,22 +45,32 @@ defmodule OpentelemetryOban.PluginHandler do
)
end

def handle_plugin_stop(_event, _measurements, metadata, _config) do
def handle_plugin_stop(_event, measurements, metadata, config) do
set_measurements_attributes(measurements, config)
OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
end

def handle_plugin_exception(
_event,
_measurements,
measurements,
%{stacktrace: stacktrace, error: error} = metadata,
_config
config
) do
ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, metadata)

# Record exception and mark the span as errored
Span.record_exception(ctx, error, stacktrace)
Span.set_status(ctx, OpenTelemetry.status(:error, ""))

set_measurements_attributes(measurements, config)

OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
end

defp set_measurements_attributes(%{duration: duration}, %{time_unit: time_unit}) do
OpenTelemetry.Tracer.set_attributes(%{
:"messaging.oban.duration_#{time_unit}" =>
System.convert_time_unit(duration, :native, time_unit)
})
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -24,89 +24,127 @@ defmodule OpentelemetryOban.PluginHandlerTest do
:application.start(:opentelemetry)

TestHelpers.remove_oban_handlers()
OpentelemetryOban.setup()

:ok
end

test "does not create spans when tracing plugins is disabled" do
TestHelpers.remove_oban_handlers()
OpentelemetryOban.setup(trace: [:jobs])
describe "with the default config" do
setup do
OpentelemetryOban.setup()
end

:telemetry.execute(
[:oban, :plugin, :start],
%{system_time: System.system_time()},
%{plugin: Elixir.Oban.Plugins.Stager}
)
test "does not create spans when tracing plugins is disabled" do
TestHelpers.remove_oban_handlers()
OpentelemetryOban.setup(trace: [:jobs])

:telemetry.execute(
[:oban, :plugin, :stop],
%{duration: 444},
%{plugin: Elixir.Oban.Plugins.Stager}
)
:telemetry.execute(
[:oban, :plugin, :start],
%{system_time: System.system_time()},
%{plugin: Elixir.Oban.Plugins.Stager}
)

refute_receive {:span, span(name: "Elixir.Oban.Plugins.Stager process")}
end
:telemetry.execute(
[:oban, :plugin, :stop],
%{duration: 444},
%{plugin: Elixir.Oban.Plugins.Stager}
)

test "records span on plugin execution" do
:telemetry.execute(
[:oban, :plugin, :start],
%{system_time: System.system_time()},
%{plugin: Elixir.Oban.Plugins.Stager}
)
refute_receive {:span, span(name: "Elixir.Oban.Plugins.Stager process")}
end

:telemetry.execute(
[:oban, :plugin, :stop],
%{duration: 444},
%{plugin: Elixir.Oban.Plugins.Stager}
)
test "records span on plugin execution" do
:telemetry.execute(
[:oban, :plugin, :start],
%{system_time: System.system_time()},
%{plugin: Elixir.Oban.Plugins.Stager}
)

:telemetry.execute(
[:oban, :plugin, :stop],
%{duration: 444},
%{plugin: Elixir.Oban.Plugins.Stager}
)

assert_receive {:span,
span(name: "Elixir.Oban.Plugins.Stager process", attributes: attributes)}

assert %{
"messaging.oban.duration_microsecond": _duration
} = :otel_attributes.map(attributes)
end

test "records span on plugin error" do
:telemetry.execute(
[:oban, :plugin, :start],
%{system_time: System.system_time()},
%{plugin: Elixir.Oban.Plugins.Stager}
)

assert_receive {:span, span(name: "Elixir.Oban.Plugins.Stager process")}
:telemetry.execute(
[:oban, :plugin, :exception],
%{duration: 444},
%{
plugin: Elixir.Oban.Plugins.Stager,
kind: :error,
stacktrace: [
{Some, :error, [], []}
],
error: %UndefinedFunctionError{
arity: 0,
function: :error,
message: nil,
module: Some,
reason: nil
}
}
)

expected_status = OpenTelemetry.status(:error, "")

assert_receive {:span,
span(
name: "Elixir.Oban.Plugins.Stager process",
attributes: attributes,
events: events,
status: ^expected_status
)}

assert %{
"messaging.oban.duration_microsecond": _duration
} = :otel_attributes.map(attributes)

[
event(
name: "exception",
attributes: event_attributes
)
] = :otel_events.list(events)

assert [:"exception.message", :"exception.stacktrace", :"exception.type"] ==
Enum.sort(Map.keys(:otel_attributes.map(event_attributes)))
end
end

test "records span on plugin error" do
test "can configure time_unit" do
OpentelemetryOban.setup(time_unit: :second)

:telemetry.execute(
[:oban, :plugin, :start],
%{system_time: System.system_time()},
%{plugin: Elixir.Oban.Plugins.Stager}
)

:telemetry.execute(
[:oban, :plugin, :exception],
[:oban, :plugin, :stop],
%{duration: 444},
%{
plugin: Elixir.Oban.Plugins.Stager,
kind: :error,
stacktrace: [
{Some, :error, [], []}
],
error: %UndefinedFunctionError{
arity: 0,
function: :error,
message: nil,
module: Some,
reason: nil
}
}
%{plugin: Elixir.Oban.Plugins.Stager}
)

expected_status = OpenTelemetry.status(:error, "")

assert_receive {:span,
span(
name: "Elixir.Oban.Plugins.Stager process",
events: events,
status: ^expected_status
)}

[
event(
name: "exception",
attributes: event_attributes
)
] = :otel_events.list(events)
span(name: "Elixir.Oban.Plugins.Stager process", attributes: attributes)}

assert [:"exception.message", :"exception.stacktrace", :"exception.type"] ==
Enum.sort(Map.keys(:otel_attributes.map(event_attributes)))
assert %{
"messaging.oban.duration_second": _duration
} = :otel_attributes.map(attributes)
end
end
Loading

0 comments on commit ea864d1

Please sign in to comment.