From ea864d1e8e314a5789537b8cc0dd8668a0bd2676 Mon Sep 17 00:00:00 2001 From: Brent Roberts Date: Sat, 14 Oct 2023 21:06:41 -0600 Subject: [PATCH] make configurable --- .../lib/opentelemetry_oban.ex | 18 +- .../lib/opentelemetry_oban/job_handler.ex | 33 +- .../lib/opentelemetry_oban/plugin_handler.ex | 30 +- .../plugin_handler_test.exs | 158 +++-- .../test/opentelemetry_oban_test.exs | 608 +++++++++--------- 5 files changed, 465 insertions(+), 382 deletions(-) diff --git a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex index a6af0c08..d3272d54 100644 --- a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex +++ b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex @@ -26,10 +26,16 @@ defmodule OpentelemetryOban do @doc """ Initializes and configures telemetry handlers. - By default jobs and plugins are traced. If you wish to trace only jobs then - use: + Example: - OpentelemetryOban.setup(trace: [:jobs]) + OpentelemetryOban.setup(trace: [:jobs]) + + Options: + + * `:trace` - A list of events to trace. Supported values are `:jobs` and `:plugins`, + defaults to [:jobs, :plugins]. + * `:time_unit` - a time unit used to convert the timing values, defaults + to `:microsecond`. Note that if you don't trace plugins, but inside the plugins, there are spans from other instrumentation libraries (e.g. ecto) then these will still be @@ -39,13 +45,15 @@ defmodule OpentelemetryOban do @spec setup() :: :ok def setup(opts \\ []) do trace = Keyword.get(opts, :trace, [:jobs, :plugins]) + time_unit = Keyword.get(opts, :time_unit, :microsecond) + config = %{time_unit: time_unit} if Enum.member?(trace, :jobs) do - OpentelemetryOban.JobHandler.attach() + OpentelemetryOban.JobHandler.attach(config) end if Enum.member?(trace, :plugins) do - OpentelemetryOban.PluginHandler.attach() + OpentelemetryOban.PluginHandler.attach(config) end :ok diff --git a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex index 19a9200f..054cf0c5 100644 --- a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex +++ b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex @@ -6,10 +6,10 @@ defmodule OpentelemetryOban.JobHandler do @tracer_id __MODULE__ - def attach() do + def attach(config) do attach_job_start_handler() - attach_job_stop_handler() - attach_job_exception_handler() + attach_job_stop_handler(config) + attach_job_exception_handler(config) end defp attach_job_start_handler() do @@ -21,21 +21,21 @@ defmodule OpentelemetryOban.JobHandler do ) end - defp attach_job_stop_handler() do + defp attach_job_stop_handler(config) do :telemetry.attach( "#{__MODULE__}.job_stop", [:oban, :job, :stop], &__MODULE__.handle_job_stop/4, - [] + config ) end - defp attach_job_exception_handler() do + defp attach_job_exception_handler(config) do :telemetry.attach( "#{__MODULE__}.job_exception", [:oban, :job, :exception], &__MODULE__.handle_job_exception/4, - [] + config ) end @@ -83,8 +83,8 @@ defmodule OpentelemetryOban.JobHandler do }) end - def handle_job_stop(_event, measurements, metadata, _config) do - set_measurements_attributes(measurements) + def handle_job_stop(_event, measurements, metadata, config) do + set_measurements_attributes(measurements, config) OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata) end @@ -92,7 +92,7 @@ defmodule OpentelemetryOban.JobHandler do _event, measurements, %{stacktrace: stacktrace, error: error} = metadata, - _config + config ) do ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, metadata) @@ -100,16 +100,19 @@ defmodule OpentelemetryOban.JobHandler do Span.record_exception(ctx, error, stacktrace) Span.set_status(ctx, OpenTelemetry.status(:error, "")) - set_measurements_attributes(measurements) + set_measurements_attributes(measurements, config) OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata) end - defp set_measurements_attributes(%{duration: duration, queue_time: queue_time}) do + defp set_measurements_attributes(%{duration: duration, queue_time: queue_time}, %{ + time_unit: time_unit + }) do OpenTelemetry.Tracer.set_attributes(%{ - :"messaging.oban.duration_us" => System.convert_time_unit(duration, :native, :microsecond), - :"messaging.oban.queue_time_us" => - System.convert_time_unit(queue_time, :nanosecond, :microsecond) + :"messaging.oban.duration_#{time_unit}" => + System.convert_time_unit(duration, :native, time_unit), + :"messaging.oban.queue_time_#{time_unit}" => + System.convert_time_unit(queue_time, :nanosecond, time_unit) }) end end diff --git a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex index dfc94c12..a322b68a 100644 --- a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex +++ b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex @@ -3,10 +3,10 @@ defmodule OpentelemetryOban.PluginHandler do @tracer_id __MODULE__ - def attach() do + def attach(config) do attach_plugin_start_handler() - attach_plugin_stop_handler() - attach_plugin_exception_handler() + attach_plugin_stop_handler(config) + attach_plugin_exception_handler(config) end defp attach_plugin_start_handler() do @@ -18,21 +18,21 @@ defmodule OpentelemetryOban.PluginHandler do ) end - defp attach_plugin_stop_handler() do + defp attach_plugin_stop_handler(config) do :telemetry.attach( "#{__MODULE__}.plugin_stop", [:oban, :plugin, :stop], &__MODULE__.handle_plugin_stop/4, - [] + config ) end - defp attach_plugin_exception_handler() do + defp attach_plugin_exception_handler(config) do :telemetry.attach( "#{__MODULE__}.plugin_exception", [:oban, :plugin, :exception], &__MODULE__.handle_plugin_exception/4, - [] + config ) end @@ -45,15 +45,16 @@ defmodule OpentelemetryOban.PluginHandler do ) end - def handle_plugin_stop(_event, _measurements, metadata, _config) do + def handle_plugin_stop(_event, measurements, metadata, config) do + set_measurements_attributes(measurements, config) OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata) end def handle_plugin_exception( _event, - _measurements, + measurements, %{stacktrace: stacktrace, error: error} = metadata, - _config + config ) do ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, metadata) @@ -61,6 +62,15 @@ defmodule OpentelemetryOban.PluginHandler do Span.record_exception(ctx, error, stacktrace) Span.set_status(ctx, OpenTelemetry.status(:error, "")) + set_measurements_attributes(measurements, config) + OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata) end + + defp set_measurements_attributes(%{duration: duration}, %{time_unit: time_unit}) do + OpenTelemetry.Tracer.set_attributes(%{ + :"messaging.oban.duration_#{time_unit}" => + System.convert_time_unit(duration, :native, time_unit) + }) + end end diff --git a/instrumentation/opentelemetry_oban/test/opentelemetry_oban/plugin_handler_test.exs b/instrumentation/opentelemetry_oban/test/opentelemetry_oban/plugin_handler_test.exs index 81d596ff..270dbb79 100644 --- a/instrumentation/opentelemetry_oban/test/opentelemetry_oban/plugin_handler_test.exs +++ b/instrumentation/opentelemetry_oban/test/opentelemetry_oban/plugin_handler_test.exs @@ -24,47 +24,110 @@ defmodule OpentelemetryOban.PluginHandlerTest do :application.start(:opentelemetry) TestHelpers.remove_oban_handlers() - OpentelemetryOban.setup() :ok end - test "does not create spans when tracing plugins is disabled" do - TestHelpers.remove_oban_handlers() - OpentelemetryOban.setup(trace: [:jobs]) + describe "with the default config" do + setup do + OpentelemetryOban.setup() + end - :telemetry.execute( - [:oban, :plugin, :start], - %{system_time: System.system_time()}, - %{plugin: Elixir.Oban.Plugins.Stager} - ) + test "does not create spans when tracing plugins is disabled" do + TestHelpers.remove_oban_handlers() + OpentelemetryOban.setup(trace: [:jobs]) - :telemetry.execute( - [:oban, :plugin, :stop], - %{duration: 444}, - %{plugin: Elixir.Oban.Plugins.Stager} - ) + :telemetry.execute( + [:oban, :plugin, :start], + %{system_time: System.system_time()}, + %{plugin: Elixir.Oban.Plugins.Stager} + ) - refute_receive {:span, span(name: "Elixir.Oban.Plugins.Stager process")} - end + :telemetry.execute( + [:oban, :plugin, :stop], + %{duration: 444}, + %{plugin: Elixir.Oban.Plugins.Stager} + ) - test "records span on plugin execution" do - :telemetry.execute( - [:oban, :plugin, :start], - %{system_time: System.system_time()}, - %{plugin: Elixir.Oban.Plugins.Stager} - ) + refute_receive {:span, span(name: "Elixir.Oban.Plugins.Stager process")} + end - :telemetry.execute( - [:oban, :plugin, :stop], - %{duration: 444}, - %{plugin: Elixir.Oban.Plugins.Stager} - ) + test "records span on plugin execution" do + :telemetry.execute( + [:oban, :plugin, :start], + %{system_time: System.system_time()}, + %{plugin: Elixir.Oban.Plugins.Stager} + ) + + :telemetry.execute( + [:oban, :plugin, :stop], + %{duration: 444}, + %{plugin: Elixir.Oban.Plugins.Stager} + ) + + assert_receive {:span, + span(name: "Elixir.Oban.Plugins.Stager process", attributes: attributes)} + + assert %{ + "messaging.oban.duration_microsecond": _duration + } = :otel_attributes.map(attributes) + end + + test "records span on plugin error" do + :telemetry.execute( + [:oban, :plugin, :start], + %{system_time: System.system_time()}, + %{plugin: Elixir.Oban.Plugins.Stager} + ) - assert_receive {:span, span(name: "Elixir.Oban.Plugins.Stager process")} + :telemetry.execute( + [:oban, :plugin, :exception], + %{duration: 444}, + %{ + plugin: Elixir.Oban.Plugins.Stager, + kind: :error, + stacktrace: [ + {Some, :error, [], []} + ], + error: %UndefinedFunctionError{ + arity: 0, + function: :error, + message: nil, + module: Some, + reason: nil + } + } + ) + + expected_status = OpenTelemetry.status(:error, "") + + assert_receive {:span, + span( + name: "Elixir.Oban.Plugins.Stager process", + attributes: attributes, + events: events, + status: ^expected_status + )} + + assert %{ + "messaging.oban.duration_microsecond": _duration + } = :otel_attributes.map(attributes) + + [ + event( + name: "exception", + attributes: event_attributes + ) + ] = :otel_events.list(events) + + assert [:"exception.message", :"exception.stacktrace", :"exception.type"] == + Enum.sort(Map.keys(:otel_attributes.map(event_attributes))) + end end - test "records span on plugin error" do + test "can configure time_unit" do + OpentelemetryOban.setup(time_unit: :second) + :telemetry.execute( [:oban, :plugin, :start], %{system_time: System.system_time()}, @@ -72,41 +135,16 @@ defmodule OpentelemetryOban.PluginHandlerTest do ) :telemetry.execute( - [:oban, :plugin, :exception], + [:oban, :plugin, :stop], %{duration: 444}, - %{ - plugin: Elixir.Oban.Plugins.Stager, - kind: :error, - stacktrace: [ - {Some, :error, [], []} - ], - error: %UndefinedFunctionError{ - arity: 0, - function: :error, - message: nil, - module: Some, - reason: nil - } - } + %{plugin: Elixir.Oban.Plugins.Stager} ) - expected_status = OpenTelemetry.status(:error, "") - assert_receive {:span, - span( - name: "Elixir.Oban.Plugins.Stager process", - events: events, - status: ^expected_status - )} - - [ - event( - name: "exception", - attributes: event_attributes - ) - ] = :otel_events.list(events) + span(name: "Elixir.Oban.Plugins.Stager process", attributes: attributes)} - assert [:"exception.message", :"exception.stacktrace", :"exception.type"] == - Enum.sort(Map.keys(:otel_attributes.map(event_attributes))) + assert %{ + "messaging.oban.duration_second": _duration + } = :otel_attributes.map(attributes) end end diff --git a/instrumentation/opentelemetry_oban/test/opentelemetry_oban_test.exs b/instrumentation/opentelemetry_oban/test/opentelemetry_oban_test.exs index 5590966e..411e52e6 100644 --- a/instrumentation/opentelemetry_oban/test/opentelemetry_oban_test.exs +++ b/instrumentation/opentelemetry_oban/test/opentelemetry_oban_test.exs @@ -26,41 +26,68 @@ defmodule OpentelemetryObanTest do :application.start(:opentelemetry) TestHelpers.remove_oban_handlers() - OpentelemetryOban.setup() :ok end - test "records span on job insertion" do - OpentelemetryOban.insert(TestJob.new(%{})) - assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) + describe "with the default config" do + setup do + OpentelemetryOban.setup() + end - assert_receive {:span, - span( - name: "TestJob send", - attributes: attributes, - parent_span_id: :undefined, - kind: :producer, - status: :undefined - )} + test "records span on job insertion" do + OpentelemetryOban.insert(TestJob.new(%{})) + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) - assert %{ - "messaging.destination": "events", - "messaging.destination_kind": :queue, - "messaging.oban.job_id": _job_id, - "messaging.oban.max_attempts": 1, - "messaging.oban.priority": 0, - "messaging.oban.worker": "TestJob", - "messaging.system": :oban - } = :otel_attributes.map(attributes) - end + assert_receive {:span, + span( + name: "TestJob send", + attributes: attributes, + parent_span_id: :undefined, + kind: :producer, + status: :undefined + )} + + assert %{ + "messaging.destination": "events", + "messaging.destination_kind": :queue, + "messaging.oban.job_id": _job_id, + "messaging.oban.max_attempts": 1, + "messaging.oban.priority": 0, + "messaging.oban.worker": "TestJob", + "messaging.system": :oban + } = :otel_attributes.map(attributes) + end + + test "job creation uses existing trace if present" do + OpenTelemetry.Tracer.with_span "test span" do + ctx = OpenTelemetry.Tracer.current_span_ctx() + root_trace_id = OpenTelemetry.Span.trace_id(ctx) + root_span_id = OpenTelemetry.Span.span_id(ctx) + + OpentelemetryOban.insert(TestJob.new(%{})) + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) + + assert_receive {:span, + span( + name: "TestJob send", + attributes: _attributes, + trace_id: ^root_trace_id, + parent_span_id: ^root_span_id, + kind: :producer, + status: :undefined + )} + end + end + + test "keeps existing meta information" do + OpentelemetryOban.insert(TestJob.new(%{}, meta: %{foo: "bar"})) - test "job creation uses existing trace if present" do - OpenTelemetry.Tracer.with_span "test span" do - ctx = OpenTelemetry.Tracer.current_span_ctx() - root_trace_id = OpenTelemetry.Span.trace_id(ctx) - root_span_id = OpenTelemetry.Span.span_id(ctx) + assert [job] = all_enqueued() + assert job.meta["foo"] == "bar" + end + test "tracing information is propagated between send and process" do OpentelemetryOban.insert(TestJob.new(%{})) assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) @@ -68,329 +95,326 @@ defmodule OpentelemetryObanTest do span( name: "TestJob send", attributes: _attributes, - trace_id: ^root_trace_id, - parent_span_id: ^root_span_id, + trace_id: send_trace_id, + span_id: send_span_id, kind: :producer, status: :undefined )} - end - end - - test "keeps existing meta information" do - OpentelemetryOban.insert(TestJob.new(%{}, meta: %{foo: "bar"})) - - assert [job] = all_enqueued() - assert job.meta["foo"] == "bar" - end - test "tracing information is propagated between send and process" do - OpentelemetryOban.insert(TestJob.new(%{})) - assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) + assert_receive {:span, + span( + name: "TestJob process", + attributes: _attributes, + kind: :consumer, + status: :undefined, + trace_id: process_trace_id, + links: links + )} - assert_receive {:span, - span( - name: "TestJob send", - attributes: _attributes, - trace_id: send_trace_id, - span_id: send_span_id, - kind: :producer, - status: :undefined - )} + [link(trace_id: ^send_trace_id, span_id: ^send_span_id)] = :otel_links.list(links) - assert_receive {:span, - span( - name: "TestJob process", - attributes: _attributes, - kind: :consumer, - status: :undefined, - trace_id: process_trace_id, - links: links - )} + # Process is ran asynchronously so we create a new trace, but still link + # the traces together. + assert send_trace_id != process_trace_id + end - [link(trace_id: ^send_trace_id, span_id: ^send_span_id)] = :otel_links.list(links) + test "no link is created on process when tracing info was not propagated" do + # Using regular Oban, instead of OpentelemetryOban + Oban.insert(TestJob.new(%{})) + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) - # Process is ran asynchronously so we create a new trace, but still link - # the traces together. - assert send_trace_id != process_trace_id - end + assert_receive {:span, + span( + name: "TestJob process", + attributes: _attributes, + kind: :consumer, + status: :undefined, + trace_id: _trace_id, + links: links + )} - test "no link is created on process when tracing info was not propagated" do - # Using regular Oban, instead of OpentelemetryOban - Oban.insert(TestJob.new(%{})) - assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) + assert [] == :otel_links.list(links) + end - assert_receive {:span, - span( - name: "TestJob process", - attributes: _attributes, - kind: :consumer, - status: :undefined, - trace_id: _trace_id, - links: links - )} + test "records spans for successful Oban jobs" do + OpentelemetryOban.insert(TestJob.new(%{})) + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) - assert [] == :otel_links.list(links) - end + assert_receive {:span, + span( + name: "TestJob process", + attributes: attributes, + kind: :consumer, + status: :undefined + )} - test "records spans for successful Oban jobs" do - OpentelemetryOban.insert(TestJob.new(%{})) - assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) + assert %{ + "messaging.destination_kind": :queue, + "messaging.destination": "events", + "messaging.oban.attempt": 1, + "messaging.oban.duration_microsecond": _duration, + "messaging.oban.inserted_at": _inserted_at, + "messaging.oban.job_id": _job_id, + "messaging.oban.max_attempts": 1, + "messaging.oban.priority": 0, + "messaging.oban.queue_time_microsecond": _queue_time, + "messaging.oban.scheduled_at": _scheduled_at, + "messaging.oban.worker": "TestJob", + "messaging.operation": :process, + "messaging.system": :oban + } = :otel_attributes.map(attributes) + end - assert_receive {:span, - span( - name: "TestJob process", - attributes: attributes, - kind: :consumer, - status: :undefined - )} + test "records spans for Oban jobs that stop with {:error, :something}" do + OpentelemetryOban.insert(TestJobThatReturnsError.new(%{})) + assert %{success: 0, discard: 1} = Oban.drain_queue(queue: :events) - assert %{ - "messaging.destination_kind": :queue, - "messaging.destination": "events", - "messaging.oban.attempt": 1, - "messaging.oban.duration_us": _duration, - "messaging.oban.inserted_at": _inserted_at, - "messaging.oban.job_id": _job_id, - "messaging.oban.max_attempts": 1, - "messaging.oban.priority": 0, - "messaging.oban.queue_time_us": _queue_time, - "messaging.oban.scheduled_at": _scheduled_at, - "messaging.oban.worker": "TestJob", - "messaging.operation": :process, - "messaging.system": :oban - } = :otel_attributes.map(attributes) - end + expected_status = OpenTelemetry.status(:error, "") - test "records spans for Oban jobs that stop with {:error, :something}" do - OpentelemetryOban.insert(TestJobThatReturnsError.new(%{})) - assert %{success: 0, discard: 1} = Oban.drain_queue(queue: :events) + assert_receive {:span, + span( + name: "TestJobThatReturnsError process", + attributes: attributes, + kind: :consumer, + events: events, + status: ^expected_status + )} - expected_status = OpenTelemetry.status(:error, "") + assert %{ + "messaging.destination_kind": :queue, + "messaging.destination": "events", + "messaging.oban.attempt": 1, + "messaging.oban.duration_microsecond": _duration, + "messaging.oban.inserted_at": _inserted_at, + "messaging.oban.job_id": _job_id, + "messaging.oban.max_attempts": 1, + "messaging.oban.priority": 0, + "messaging.oban.queue_time_microsecond": _queue_time, + "messaging.oban.scheduled_at": _scheduled_at, + "messaging.oban.worker": "TestJobThatReturnsError", + "messaging.operation": :process, + "messaging.system": :oban + } = :otel_attributes.map(attributes) + + [ + event( + name: "exception", + attributes: event_attributes + ) + ] = :otel_events.list(events) + + assert [:"exception.message", :"exception.stacktrace", :"exception.type"] == + Enum.sort(Map.keys(:otel_attributes.map(event_attributes))) + end - assert_receive {:span, - span( - name: "TestJobThatReturnsError process", - attributes: attributes, - kind: :consumer, - events: events, - status: ^expected_status - )} + test "records spans for each retry" do + OpentelemetryOban.insert(TestJobThatReturnsError.new(%{}, max_attempts: 2)) - assert %{ - "messaging.destination_kind": :queue, - "messaging.destination": "events", - "messaging.oban.attempt": 1, - "messaging.oban.duration_us": _duration, - "messaging.oban.inserted_at": _inserted_at, - "messaging.oban.job_id": _job_id, - "messaging.oban.max_attempts": 1, - "messaging.oban.priority": 0, - "messaging.oban.queue_time_us": _queue_time, - "messaging.oban.scheduled_at": _scheduled_at, - "messaging.oban.worker": "TestJobThatReturnsError", - "messaging.operation": :process, - "messaging.system": :oban - } = :otel_attributes.map(attributes) + assert %{success: 0, failure: 1, discard: 1} = + Oban.drain_queue(queue: :events, with_scheduled: true, with_recursion: true) - [ - event( - name: "exception", - attributes: event_attributes - ) - ] = :otel_events.list(events) + expected_status = OpenTelemetry.status(:error, "") - assert [:"exception.message", :"exception.stacktrace", :"exception.type"] == - Enum.sort(Map.keys(:otel_attributes.map(event_attributes))) - end + assert_receive {:span, + span( + name: "TestJobThatReturnsError send", + trace_id: send_trace_id, + span_id: send_span_id + )} - test "records spans for each retry" do - OpentelemetryOban.insert(TestJobThatReturnsError.new(%{}, max_attempts: 2)) + assert_receive {:span, + span( + name: "TestJobThatReturnsError process", + status: ^expected_status, + trace_id: first_process_trace_id, + links: job_1_links + )} - assert %{success: 0, failure: 1, discard: 1} = - Oban.drain_queue(queue: :events, with_scheduled: true, with_recursion: true) + [link(trace_id: ^send_trace_id, span_id: ^send_span_id)] = :otel_links.list(job_1_links) - expected_status = OpenTelemetry.status(:error, "") + assert_receive {:span, + span( + name: "TestJobThatReturnsError process", + status: ^expected_status, + trace_id: second_process_trace_id, + links: job_2_links + )} - assert_receive {:span, - span( - name: "TestJobThatReturnsError send", - trace_id: send_trace_id, - span_id: send_span_id - )} + [link(trace_id: ^send_trace_id, span_id: ^send_span_id)] = :otel_links.list(job_2_links) - assert_receive {:span, - span( - name: "TestJobThatReturnsError process", - status: ^expected_status, - trace_id: first_process_trace_id, - links: job_1_links - )} + assert first_process_trace_id != second_process_trace_id + end - [link(trace_id: ^send_trace_id, span_id: ^send_span_id)] = :otel_links.list(job_1_links) + test "records spans for Oban jobs that stop with an exception" do + OpentelemetryOban.insert(TestJobThatThrowsException.new(%{})) + assert %{success: 0, discard: 1} = Oban.drain_queue(queue: :events) - assert_receive {:span, - span( - name: "TestJobThatReturnsError process", - status: ^expected_status, - trace_id: second_process_trace_id, - links: job_2_links - )} + expected_status = OpenTelemetry.status(:error, "") - [link(trace_id: ^send_trace_id, span_id: ^send_span_id)] = :otel_links.list(job_2_links) + assert_receive {:span, + span( + name: "TestJobThatThrowsException process", + attributes: attributes, + kind: :consumer, + events: events, + status: ^expected_status + )} - assert first_process_trace_id != second_process_trace_id - end + assert %{ + "messaging.destination": "events", + "messaging.destination_kind": :queue, + "messaging.oban.attempt": 1, + "messaging.oban.inserted_at": _inserted_at, + "messaging.oban.job_id": _job_id, + "messaging.oban.max_attempts": 1, + "messaging.oban.priority": 0, + "messaging.oban.scheduled_at": _scheduled_at, + "messaging.oban.worker": "TestJobThatThrowsException", + "messaging.operation": :process, + "messaging.system": :oban + } = :otel_attributes.map(attributes) + + [ + event( + name: "exception", + attributes: event_attributes + ) + ] = :otel_events.list(events) + + assert [:"exception.message", :"exception.stacktrace", :"exception.type"] == + Enum.sort(Map.keys(:otel_attributes.map(event_attributes))) + end - test "records spans for Oban jobs that stop with an exception" do - OpentelemetryOban.insert(TestJobThatThrowsException.new(%{})) - assert %{success: 0, discard: 1} = Oban.drain_queue(queue: :events) + test "spans inside the job are associated with the job trace" do + OpentelemetryOban.insert(TestJobWithInnerSpan.new(%{})) + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) - expected_status = OpenTelemetry.status(:error, "") + assert_receive {:span, + span( + name: "TestJobWithInnerSpan process", + kind: :consumer, + trace_id: trace_id, + span_id: process_span_id + )} - assert_receive {:span, - span( - name: "TestJobThatThrowsException process", - attributes: attributes, - kind: :consumer, - events: events, - status: ^expected_status - )} + assert_receive {:span, + span( + name: "span inside the job", + kind: :internal, + trace_id: ^trace_id, + parent_span_id: ^process_span_id + )} + end - assert %{ - "messaging.destination": "events", - "messaging.destination_kind": :queue, - "messaging.oban.attempt": 1, - "messaging.oban.inserted_at": _inserted_at, - "messaging.oban.job_id": _job_id, - "messaging.oban.max_attempts": 1, - "messaging.oban.priority": 0, - "messaging.oban.scheduled_at": _scheduled_at, - "messaging.oban.worker": "TestJobThatThrowsException", - "messaging.operation": :process, - "messaging.system": :oban - } = :otel_attributes.map(attributes) + test "OpentelemetryOban.insert!/2 returns job on successful insert" do + %Oban.Job{} = OpentelemetryOban.insert!(TestJob.new(%{})) + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) + assert_receive {:span, span(name: "TestJob send")} + assert_receive {:span, span(name: "TestJob process")} + end - [ - event( - name: "exception", - attributes: event_attributes + test "OpentelemetryOban.insert!/2 raises an error on failed insert" do + assert_raise( + Ecto.InvalidChangesetError, + fn -> OpentelemetryOban.insert!(TestJob.new(%{}, max_attempts: -1)) end ) - ] = :otel_events.list(events) - assert [:"exception.message", :"exception.stacktrace", :"exception.type"] == - Enum.sort(Map.keys(:otel_attributes.map(event_attributes))) - end + assert %{success: 0, failure: 0} = Oban.drain_queue(queue: :events) - test "spans inside the job are associated with the job trace" do - OpentelemetryOban.insert(TestJobWithInnerSpan.new(%{})) - assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) + expected_status = OpenTelemetry.status(:error, "") - assert_receive {:span, - span( - name: "TestJobWithInnerSpan process", - kind: :consumer, - trace_id: trace_id, - span_id: process_span_id - )} + assert_receive {:span, + span( + name: "TestJob send", + events: events, + status: ^expected_status + )} - assert_receive {:span, - span( - name: "span inside the job", - kind: :internal, - trace_id: ^trace_id, - parent_span_id: ^process_span_id - )} - end + [ + event( + name: "exception", + attributes: event_attributes + ) + ] = :otel_events.list(events) - test "OpentelemetryOban.insert!/2 returns job on successful insert" do - %Oban.Job{} = OpentelemetryOban.insert!(TestJob.new(%{})) - assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) - assert_receive {:span, span(name: "TestJob send")} - assert_receive {:span, span(name: "TestJob process")} - end + assert [:"exception.message", :"exception.stacktrace", :"exception.type"] == + Enum.sort(Map.keys(:otel_attributes.map(event_attributes))) - test "OpentelemetryOban.insert!/2 raises an error on failed insert" do - assert_raise( - Ecto.InvalidChangesetError, - fn -> OpentelemetryOban.insert!(TestJob.new(%{}, max_attempts: -1)) end - ) + refute_received {:span, span(name: "TestJob process")} + end - assert %{success: 0, failure: 0} = Oban.drain_queue(queue: :events) + test "tracing information is propagated when using insert_all/2" do + OpentelemetryOban.insert_all([ + TestJob.new(%{}), + TestJob.new(%{}) + ]) - expected_status = OpenTelemetry.status(:error, "") + assert %{success: 2, failure: 0} = Oban.drain_queue(queue: :events) - assert_receive {:span, - span( - name: "TestJob send", - events: events, - status: ^expected_status - )} + assert_receive {:span, + span( + name: :"Oban bulk insert", + attributes: _attributes, + trace_id: send_trace_id, + span_id: send_span_id, + kind: :producer, + status: :undefined + )} - [ - event( - name: "exception", - attributes: event_attributes - ) - ] = :otel_events.list(events) + assert_receive {:span, + span( + name: "TestJob process", + attributes: _attributes, + kind: :consumer, + status: :undefined, + trace_id: first_process_trace_id, + links: job_1_links + )} - assert [:"exception.message", :"exception.stacktrace", :"exception.type"] == - Enum.sort(Map.keys(:otel_attributes.map(event_attributes))) + [link(trace_id: ^send_trace_id, span_id: ^send_span_id)] = :otel_links.list(job_1_links) - refute_received {:span, span(name: "TestJob process")} - end + assert_receive {:span, + span( + name: "TestJob process", + attributes: _attributes, + kind: :consumer, + status: :undefined, + trace_id: second_process_trace_id, + links: job_2_links + )} - test "tracing information is propagated when using insert_all/2" do - OpentelemetryOban.insert_all([ - TestJob.new(%{}), - TestJob.new(%{}) - ]) + [link(trace_id: ^send_trace_id, span_id: ^send_span_id)] = :otel_links.list(job_2_links) - assert %{success: 2, failure: 0} = Oban.drain_queue(queue: :events) + # Process is ran asynchronously so we create a new trace, but still link + # the traces together. + assert send_trace_id != first_process_trace_id + assert send_trace_id != second_process_trace_id + assert first_process_trace_id != second_process_trace_id + end - assert_receive {:span, - span( - name: :"Oban bulk insert", - attributes: _attributes, - trace_id: send_trace_id, - span_id: send_span_id, - kind: :producer, - status: :undefined - )} + test "works with Oban.Testing.perform_job helper function" do + Oban.Testing.perform_job(TestJob, %{}, repo: TestRepo) - assert_receive {:span, - span( - name: "TestJob process", - attributes: _attributes, - kind: :consumer, - status: :undefined, - trace_id: first_process_trace_id, - links: job_1_links - )} + assert_receive {:span, span(name: "TestJob process")} + end + end - [link(trace_id: ^send_trace_id, span_id: ^send_span_id)] = :otel_links.list(job_1_links) + test "can configure time_unit" do + OpentelemetryOban.setup(time_unit: :second) + OpentelemetryOban.insert(TestJob.new(%{})) + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) assert_receive {:span, span( name: "TestJob process", - attributes: _attributes, + attributes: attributes, kind: :consumer, - status: :undefined, - trace_id: second_process_trace_id, - links: job_2_links + status: :undefined )} - [link(trace_id: ^send_trace_id, span_id: ^send_span_id)] = :otel_links.list(job_2_links) - - # Process is ran asynchronously so we create a new trace, but still link - # the traces together. - assert send_trace_id != first_process_trace_id - assert send_trace_id != second_process_trace_id - assert first_process_trace_id != second_process_trace_id - end - - test "works with Oban.Testing.perform_job helper function" do - Oban.Testing.perform_job(TestJob, %{}, repo: TestRepo) - - assert_receive {:span, span(name: "TestJob process")} + assert %{ + "messaging.oban.duration_second": _duration, + "messaging.oban.queue_time_second": _queue_time + } = :otel_attributes.map(attributes) end end