diff --git a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex index 7a4a41a5..af43f90e 100644 --- a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex +++ b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex @@ -26,19 +26,17 @@ defmodule OpentelemetryOban do @doc """ Initializes and configures telemetry handlers. - By default jobs and plugins are traced. If you wish to trace only jobs then - use: + By default everything is traced. If you wish to trace only jobs then use: OpentelemetryOban.setup(trace: [:jobs]) - Note that if you don't trace plugins, but inside the plugins, there are spans - from other instrumentation libraries (e.g. ecto) then these will still be - traced. This setting controls only the spans that are created by - opentelemetry_oban. + Note that if you don't trace plugins or internal, there will be spans from + other instrumentation libraries (e.g. ecto) that would be traced. This setting + controls only the spans that are created by opentelemetry_oban. """ @spec setup() :: :ok def setup(opts \\ []) do - trace = Keyword.get(opts, :trace, [:jobs, :plugins]) + trace = Keyword.get(opts, :trace, [:jobs, :plugins, :internal]) if Enum.member?(trace, :jobs) do OpentelemetryOban.JobHandler.attach() @@ -48,6 +46,10 @@ defmodule OpentelemetryOban do OpentelemetryOban.PluginHandler.attach() end + if Enum.member?(trace, :internal) do + OpentelemetryOban.InternalHandler.attach() + end + :ok end diff --git a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/internal_handler.ex b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/internal_handler.ex new file mode 100644 index 00000000..47deaaf1 --- /dev/null +++ b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/internal_handler.ex @@ -0,0 +1,64 @@ +defmodule OpentelemetryOban.InternalHandler do + alias OpenTelemetry.Span + alias OpenTelemetry.SemanticConventions.Trace + + require Trace + + @tracer_id __MODULE__ + + def attach() do + :telemetry.attach_many( + {__MODULE__, :internal}, + Enum.flat_map( + [ + [:engine, :init], + [:engine, :refresh], + [:engine, :put_meta], + [:engine, :check_available], + [:engine, :cancel_all_jobs], + [:engine, :fetch_jobs], + [:engine, :insert_all_jobs], + [:engine, :prune_all_jobs], + [:engine, :stage_jobs], + [:engine, :cancel_job], + [:engine, :complete_job], + [:engine, :discard_job], + [:engine, :error_job], + [:engine, :insert_job], + [:engine, :snooze_job], + [:notifier, :notify], + [:peer, :election] + ], + fn event -> + [ + [:oban | event ++ [:start]], + [:oban | event ++ [:stop]], + [:oban | event ++ [:exception]] + ] + end + ), + &__MODULE__.handle_oban_event/4, + [] + ) + end + + def handle_oban_event(event, _measurements, metadata, _config) do + [op | rest] = Enum.reverse(event) + + case op do + :start -> + OpentelemetryTelemetry.start_telemetry_span(__MODULE__, Enum.join(Enum.reverse(rest), "."), metadata, %{kind: :consumer}) + + :stop -> + OpentelemetryTelemetry.end_telemetry_span(__MODULE__, metadata) + + :exception -> + ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, metadata) + + Span.record_exception(ctx, metadata.reason, metadata.stacktrace) + Span.set_status(ctx, OpenTelemetry.status(:error, Exception.message(metadata.reason))) + + OpentelemetryTelemetry.end_telemetry_span(__MODULE__, metadata) + end + end +end diff --git a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex index 8d14f781..2a1f584b 100644 --- a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex +++ b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex @@ -14,7 +14,7 @@ defmodule OpentelemetryOban.JobHandler do defp attach_job_start_handler() do :telemetry.attach( - "#{__MODULE__}.job_start", + {__MODULE__, [:job, :start]}, [:oban, :job, :start], &__MODULE__.handle_job_start/4, [] @@ -23,7 +23,7 @@ defmodule OpentelemetryOban.JobHandler do defp attach_job_stop_handler() do :telemetry.attach( - "#{__MODULE__}.job_stop", + {__MODULE__, [:job, :stop]}, [:oban, :job, :stop], &__MODULE__.handle_job_stop/4, [] @@ -32,7 +32,7 @@ defmodule OpentelemetryOban.JobHandler do defp attach_job_exception_handler() do :telemetry.attach( - "#{__MODULE__}.job_exception", + {__MODULE__, [:job, :exception]}, [:oban, :job, :exception], &__MODULE__.handle_job_exception/4, [] diff --git a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex index e77b8cb3..48c91360 100644 --- a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex +++ b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex @@ -12,7 +12,7 @@ defmodule OpentelemetryOban.PluginHandler do defp attach_plugin_start_handler() do :telemetry.attach( - "#{__MODULE__}.plugin_start", + {__MODULE__, [:plugin, :start]}, [:oban, :plugin, :start], &__MODULE__.handle_plugin_start/4, [] @@ -21,7 +21,7 @@ defmodule OpentelemetryOban.PluginHandler do defp attach_plugin_stop_handler() do :telemetry.attach( - "#{__MODULE__}.plugin_stop", + {__MODULE__, [:plugin, :stop]}, [:oban, :plugin, :stop], &__MODULE__.handle_plugin_stop/4, [] @@ -30,7 +30,7 @@ defmodule OpentelemetryOban.PluginHandler do defp attach_plugin_exception_handler() do :telemetry.attach( - "#{__MODULE__}.plugin_exception", + {__MODULE__, [:plugin, :exception]}, [:oban, :plugin, :exception], &__MODULE__.handle_plugin_exception/4, [] diff --git a/instrumentation/opentelemetry_oban/test/opentelemetry_oban/internal_handler_test.exs b/instrumentation/opentelemetry_oban/test/opentelemetry_oban/internal_handler_test.exs new file mode 100644 index 00000000..958455ed --- /dev/null +++ b/instrumentation/opentelemetry_oban/test/opentelemetry_oban/internal_handler_test.exs @@ -0,0 +1,137 @@ +defmodule OpentelemetryOban.InternalHandlerTest do + use DataCase + + require OpenTelemetry.Tracer + require OpenTelemetry.Span + require Record + + for {name, spec} <- Record.extract_all(from_lib: "opentelemetry/include/otel_span.hrl") do + Record.defrecord(name, spec) + end + + for {name, spec} <- Record.extract_all(from_lib: "opentelemetry_api/include/opentelemetry.hrl") do + Record.defrecord(name, spec) + end + + @events [ + [:engine, :init], + [:engine, :refresh], + [:engine, :put_meta], + [:engine, :check_available], + [:engine, :cancel_all_jobs], + [:engine, :fetch_jobs], + [:engine, :insert_all_jobs], + [:engine, :prune_all_jobs], + [:engine, :stage_jobs], + [:engine, :cancel_job], + [:engine, :complete_job], + [:engine, :discard_job], + [:engine, :error_job], + [:engine, :insert_job], + [:engine, :snooze_job], + [:notifier, :notify], + [:peer, :election] + ] + + setup do + :application.stop(:opentelemetry) + :application.set_env(:opentelemetry, :tracer, :otel_tracer_default) + + :application.set_env(:opentelemetry, :processors, [ + {:otel_batch_processor, %{scheduled_delay_ms: 1, exporter: {:otel_exporter_pid, self()}}} + ]) + + :application.start(:opentelemetry) + + TestHelpers.remove_oban_handlers() + OpentelemetryOban.setup(trace: [:internal]) + + :ok + end + + test "does not create spans when internal tracing is disabled" do + TestHelpers.remove_oban_handlers() + OpentelemetryOban.setup(trace: []) + + execute_internal_event([:peer, :election]) + + refute_receive {:span, span(name: "oban.peer.election")} + end + + test "records span on internal execution" do + execute_internal_event([:peer, :election]) + + assert_receive {:span, span(name: "oban.peer.election")} + end + + test "records span on error" do + :telemetry.execute( + [:oban, :peer, :election, :start], + %{system_time: System.system_time()}, + %{} + ) + + exception = %UndefinedFunctionError{ + arity: 0, + function: :error, + message: nil, + module: Some, + reason: nil + } + + :telemetry.execute( + [:oban, :peer, :election, :exception], + %{duration: 444}, + %{ + kind: :error, + stacktrace: [ + {Some, :error, [], []} + ], + reason: exception + } + ) + + expected_status = OpenTelemetry.status(:error, Exception.message(exception)) + + assert_receive {:span, + span( + name: "oban.peer.election", + events: events, + status: ^expected_status + )} + + [ + event( + name: :exception, + attributes: event_attributes + ) + ] = :otel_events.list(events) + + assert [:"exception.message", :"exception.stacktrace", :"exception.type"] == + Enum.sort(Map.keys(:otel_attributes.map(event_attributes))) + end + + for event <- @events do + test "#{inspect([:oban | event])} spans" do + execute_internal_event(unquote(event)) + + assert_receive {:span, span(name: "oban.#{unquote(Enum.join(event, "."))}")} + + :ok + end + end + + defp execute_internal_event(event) do + :telemetry.execute( + [:oban | event ++ [:start]], + %{system_time: System.system_time()}, + %{} + ) + + :telemetry.execute( + [:oban | event ++ [:stop]], + %{duration: 42069}, + %{} + ) + end +end